Fledge
An open source edge computing platform for industrial users
storage_registry.h
1 #ifndef _STORAGE_REGISTRY_H
2 #define _STORAGE_REGISTRY_H
3 
4 #include <vector>
5 #include <queue>
6 #include <string>
7 #include <mutex>
8 #include <condition_variable>
9 #include <thread>
10 #include <map>
11 
17 #define MAX_REFUSALS 3
18 
19 typedef std::vector<std::pair<std::string *, std::string *> > REGISTRY;
20 
21 typedef struct {
22  std::string url;
23  std::string key;
24  std::vector<std::string> keyValues;
25  std::string operation;
27 
28 typedef std::vector<std::pair<std::string *, TableRegistration *> > REGISTRY_TABLE;
29 
30 
37  public:
40  void registerAsset(const std::string& asset, const std::string& url);
41  void unregisterAsset(const std::string& asset, const std::string& url);
42  void process(const std::string& payload);
43  void processTableInsert(const std::string& tableName, const std::string& payload);
44  void processTableUpdate(const std::string& tableName, const std::string& payload);
45  void processTableDelete(const std::string& tableName, const std::string& payload);
46  void registerTable(const std::string& table, const std::string& url);
47  void unregisterTable(const std::string& table, const std::string& url);
48  void run();
49  private:
50  void processPayload(char *payload);
51  void sendPayload(const std::string& url, const char *payload);
52  void filterPayload(const std::string& url, char *payload, const std::string& asset);
53  void processInsert(char *tableName, char *payload);
54  void processUpdate(char *tableName, char *payload);
55  void processDelete(char *tableName, char *payload);
57  parseTableSubscriptionPayload(const std::string& payload);
58  void processAssetRefusals();
59  void processTableRefusals();
60  void insertTestTableReg();
61  void removeTestTableReg(int n);
62 
63  typedef std::pair<time_t, char *> Item;
64  typedef std::tuple<time_t, char *, char *> TableItem;
65  REGISTRY m_registrations;
66  REGISTRY_TABLE m_tableRegistrations;
67 
68  std::queue<StorageRegistry::Item>
69  m_queue;
70  std::queue<StorageRegistry::TableItem>
71  m_tableInsertQueue;
72  std::queue<StorageRegistry::TableItem>
73  m_tableUpdateQueue;
74  std::queue<StorageRegistry::TableItem>
75  m_tableDeleteQueue;
76  std::mutex m_qMutex;
77  std::mutex m_registrationsMutex;
78  std::mutex m_tableRegistrationsMutex;
79  std::thread *m_thread;
80  std::condition_variable m_cv;
81  std::mutex m_cvMutex;
82  bool m_running;
83  std::map<const std::string, int> m_refusals;
84 };
85 
86 #endif
StorageRegistry::unregisterTable
void unregisterTable(const std::string &table, const std::string &url)
Handle a request to remove a registration of interest in a table.
Definition: storage_registry.cpp:335
StorageRegistry::registerTable
void registerTable(const std::string &table, const std::string &url)
Handle a registration request for a table from a client of the storage layer.
Definition: storage_registry.cpp:312
StorageRegistry::processTableInsert
void processTableInsert(const std::string &tableName, const std::string &payload)
Process a table insert payload and determine if any microservice has registered an interest in this t...
Definition: storage_registry.cpp:123
StorageRegistry::run
void run()
The worker function that processes the queue of payloads that may need to be sent to subscribers.
Definition: storage_registry.cpp:392
StorageRegistry::registerAsset
void registerAsset(const std::string &asset, const std::string &url)
Handle a registration request from a client of the storage layer.
Definition: storage_registry.cpp:221
StorageRegistry::processTableUpdate
void processTableUpdate(const std::string &tableName, const std::string &payload)
Process a table update payload and determine if any microservice has registered an interest in this t...
Definition: storage_registry.cpp:156
StorageRegistry::process
void process(const std::string &payload)
Process a reading append payload and determine if any microservice has registered an interest in this...
Definition: storage_registry.cpp:94
StorageRegistry
StorageRegistry - a class that manages requests from other microservices to register interest in new ...
Definition: storage_registry.h:36
StorageRegistry::~StorageRegistry
~StorageRegistry()
StorageRegistry destructor.
Definition: storage_registry.cpp:65
TableRegistration
Definition: storage_registry.h:21
StorageRegistry::StorageRegistry
StorageRegistry()
StorageRegistry constructor.
Definition: storage_registry.cpp:56
StorageRegistry::unregisterAsset
void unregisterAsset(const std::string &asset, const std::string &url)
Handle a request to remove a registration of interest.
Definition: storage_registry.cpp:234
StorageRegistry::processTableDelete
void processTableDelete(const std::string &tableName, const std::string &payload)
Process a table delete payload and determine if any microservice has registered an interest in this t...
Definition: storage_registry.cpp:189