Fledge
An open source edge computing platform for industrial users
storage_registry.h
1 #ifndef _STORAGE_REGISTRY_H
2 #define _STORAGE_REGISTRY_H
3 
4 #include <vector>
5 #include <queue>
6 #include <string>
7 #include <mutex>
8 #include <condition_variable>
9 #include <thread>
10 
11 typedef std::vector<std::pair<std::string *, std::string *> > REGISTRY;
12 
13 typedef struct {
14  std::string url;
15  std::string key;
16  std::vector<std::string> keyValues;
17  std::string operation;
19 
20 typedef std::vector<std::pair<std::string *, TableRegistration *> > REGISTRY_TABLE;
21 
22 
29  public:
32  void registerAsset(const std::string& asset, const std::string& url);
33  void unregisterAsset(const std::string& asset, const std::string& url);
34  void process(const std::string& payload);
35  void processTableInsert(const std::string& tableName, const std::string& payload);
36  void processTableUpdate(const std::string& tableName, const std::string& payload);
37  void processTableDelete(const std::string& tableName, const std::string& payload);
38  void registerTable(const std::string& table, const std::string& url);
39  void unregisterTable(const std::string& table, const std::string& url);
40  void run();
41  private:
42  void processPayload(char *payload);
43  void sendPayload(const std::string& url, const char *payload);
44  void filterPayload(const std::string& url, char *payload, const std::string& asset);
45  void processInsert(char *tableName, char *payload);
46  void processUpdate(char *tableName, char *payload);
47  void processDelete(char *tableName, char *payload);
49  parseTableSubscriptionPayload(const std::string& payload);
50  void insertTestTableReg();
51  void removeTestTableReg(int n);
52 
53  typedef std::pair<time_t, char *> Item;
54  typedef std::tuple<time_t, char *, char *> TableItem;
55  REGISTRY m_registrations;
56  REGISTRY_TABLE m_tableRegistrations;
57 
58  std::queue<StorageRegistry::Item>
59  m_queue;
60  std::queue<StorageRegistry::TableItem>
61  m_tableInsertQueue;
62  std::queue<StorageRegistry::TableItem>
63  m_tableUpdateQueue;
64  std::queue<StorageRegistry::TableItem>
65  m_tableDeleteQueue;
66  std::mutex m_qMutex;
67  std::mutex m_registrationsMutex;
68  std::mutex m_tableRegistrationsMutex;
69  std::thread *m_thread;
70  std::condition_variable m_cv;
71  std::mutex m_cvMutex;
72  bool m_running;
73 };
74 
75 #endif
void processTableDelete(const std::string &tableName, const std::string &payload)
Process a table delete payload and determine if any microservice has registered an interest in this t...
Definition: storage_registry.cpp:189
void processTableInsert(const std::string &tableName, const std::string &payload)
Process a table insert payload and determine if any microservice has registered an interest in this t...
Definition: storage_registry.cpp:123
StorageRegistry()
StorageRegistry constructor.
Definition: storage_registry.cpp:56
void unregisterAsset(const std::string &asset, const std::string &url)
Handle a request to remove a registration of interest.
Definition: storage_registry.cpp:234
void registerAsset(const std::string &asset, const std::string &url)
Handle a registration request from a client of the storage layer.
Definition: storage_registry.cpp:221
void process(const std::string &payload)
Process a reading append payload and determine if any microservice has registered an interest in this...
Definition: storage_registry.cpp:94
StorageRegistry - a class that manages requests from other microservices to register interest in new ...
Definition: storage_registry.h:28
void run()
The worker function that processes the queue of payloads that may need to be sent to subscribers...
Definition: storage_registry.cpp:392
void registerTable(const std::string &table, const std::string &url)
Handle a registration request for a table from a client of the storage layer.
Definition: storage_registry.cpp:312
~StorageRegistry()
StorageRegistry destructor.
Definition: storage_registry.cpp:65
void unregisterTable(const std::string &table, const std::string &url)
Handle a request to remove a registration of interest in a table.
Definition: storage_registry.cpp:335
void processTableUpdate(const std::string &tableName, const std::string &payload)
Process a table update payload and determine if any microservice has registered an interest in this t...
Definition: storage_registry.cpp:156
Definition: storage_registry.h:13