Fledge
An open source edge computing platform for industrial users
storage_api.h
1 #ifndef _STORAGE_API_H
2 #define _STORAGE_API_H
3 /*
4  * Fledge storage service.
5  *
6  * Copyright (c) 2017 OSisoft, LLC
7  *
8  * Released under the Apache 2.0 Licence
9  *
10  * Author: Mark Riddoch, Massimiliano Pinto
11  */
12 
13 #include <server_http.hpp>
14 #include <storage_plugin.h>
15 #include <storage_stats.h>
16 #include <storage_registry.h>
17 #include <stream_handler.h>
18 #include <perfmonitors.h>
19 
20 using namespace std;
21 using HttpServer = SimpleWeb::Server<SimpleWeb::HTTP>;
22 
23 /*
24  * The URL for each entry point
25  */
26 #define COMMON_ACCESS "^/storage/table/([A-Za-z][a-zA-Z0-9_]*)$"
27 #define COMMON_QUERY "^/storage/table/([A-Za-z][a-zA-Z_0-9]*)/query$"
28 #define READING_ACCESS "^/storage/reading$"
29 #define READING_QUERY "^/storage/reading/query"
30 #define READING_PURGE "^/storage/reading/purge"
31 #define READING_INTEREST "^/storage/reading/interest/([A-Za-z0-9\\*][a-zA-Z0-9_%\\.\\-]*)$"
32 #define TABLE_INTEREST "^/storage/table/interest/([A-Za-z\\*][a-zA-Z0-9_%\\.\\-]*)$"
33 
34 #define GET_TABLE_SNAPSHOTS "^/storage/table/([A-Za-z][a-zA-Z_0-9_]*)/snapshot$"
35 #define CREATE_TABLE_SNAPSHOT GET_TABLE_SNAPSHOTS
36 #define LOAD_TABLE_SNAPSHOT "^/storage/table/([A-Za-z][a-zA-Z_0-9_]*)/snapshot/([a-zA-Z_0-9_]*)$"
37 #define DELETE_TABLE_SNAPSHOT LOAD_TABLE_SNAPSHOT
38 #define CREATE_STORAGE_STREAM "^/storage/reading/stream$"
39 #define STORAGE_SCHEMA "^/storage/schema"
40 #define STORAGE_TABLE_ACCESS "^/storage/schema/([A-Za-z][a-zA-Z0-9_]*)/table/([A-Za-z][a-zA-Z0-9_]*)$"
41 #define STORAGE_TABLE_QUERY "^/storage/schema/([A-Za-z][a-zA-Z0-9_]*)/table/([A-Za-z][a-zA-Z_0-9]*)/query$"
42 
43 #define PURGE_FLAG_RETAIN "retain"
44 #define PURGE_FLAG_RETAIN_ANY "retainany"
45 #define PURGE_FLAG_RETAIN_ALL "retainall"
46 #define PURGE_FLAG_PURGE "purge"
47 
48 #define TABLE_NAME_COMPONENT 1
49 #define STORAGE_SCHEMA_NAME_COMPONENT 1
50 #define STORAGE_TABLE_NAME_COMPONENT 2
51 #define ASSET_NAME_COMPONENT 1
52 #define SNAPSHOT_ID_COMPONENT 2
53 
58 class StorageOperation {
59  public:
60  enum Operations { ReadingAppend, ReadingPurge, ReadingFetch, ReadingQuery };
61  public:
62  StorageOperation(StorageOperation::Operations operation, shared_ptr<HttpServer::Request> request,
63  shared_ptr<HttpServer::Response> response) :
64  m_operation(operation),
65  m_request(request),
66  m_response(response)
67  {
68  };
70  {
71  };
72  public:
73  StorageOperation::Operations m_operation;
74  shared_ptr<HttpServer::Request> m_request;
75  shared_ptr<HttpServer::Response> m_response;
76 };
77 
84 class StorageApi {
85 
86 public:
87  StorageApi(const unsigned short port, const unsigned int threads, const unsigned int workerPoolSize);
88  ~StorageApi();
89  static StorageApi *getInstance();
90  void initResources();
91  void setPlugin(StoragePlugin *);
92  void setReadingPlugin(StoragePlugin *);
93  void start();
94  void startServer();
95  void wait();
96  void stopServer();
97  unsigned short getListenerPort();
98  void commonInsert(shared_ptr<HttpServer::Response> response, shared_ptr<HttpServer::Request> request);
99  void commonSimpleQuery(shared_ptr<HttpServer::Response> response, shared_ptr<HttpServer::Request> request);
100  void commonQuery(shared_ptr<HttpServer::Response> response, shared_ptr<HttpServer::Request> request);
101  void commonUpdate(shared_ptr<HttpServer::Response> response, shared_ptr<HttpServer::Request> request);
102  void commonDelete(shared_ptr<HttpServer::Response> response, shared_ptr<HttpServer::Request> request);
103  void defaultResource(shared_ptr<HttpServer::Response> response, shared_ptr<HttpServer::Request> request);
104  void readingAppend(shared_ptr<HttpServer::Response> response, shared_ptr<HttpServer::Request> request);
105  void readingFetch(shared_ptr<HttpServer::Response> response, shared_ptr<HttpServer::Request> request);
106  void readingQuery(shared_ptr<HttpServer::Response> response, shared_ptr<HttpServer::Request> request);
107  void readingPurge(shared_ptr<HttpServer::Response> response, shared_ptr<HttpServer::Request> request);
108  void readingRegister(shared_ptr<HttpServer::Response> response, shared_ptr<HttpServer::Request> request);
109  void readingUnregister(shared_ptr<HttpServer::Response> response, shared_ptr<HttpServer::Request> request);
110  void tableRegister(shared_ptr<HttpServer::Response> response, shared_ptr<HttpServer::Request> request);
111  void tableUnregister(shared_ptr<HttpServer::Response> response, shared_ptr<HttpServer::Request> request);
112  void createTableSnapshot(shared_ptr<HttpServer::Response> response, shared_ptr<HttpServer::Request> request);
113  void loadTableSnapshot(shared_ptr<HttpServer::Response> response, shared_ptr<HttpServer::Request> request);
114  void deleteTableSnapshot(shared_ptr<HttpServer::Response> response, shared_ptr<HttpServer::Request> request);
115  void getTableSnapshots(shared_ptr<HttpServer::Response> response, shared_ptr<HttpServer::Request> request);
116  void createStorageStream(shared_ptr<HttpServer::Response> response, shared_ptr<HttpServer::Request> request);
117  bool readingStream(ReadingStream **readings, bool commit);
118  void createStorageSchema(shared_ptr<HttpServer::Response> response, shared_ptr<HttpServer::Request> request);
119  void storageTableInsert(shared_ptr<HttpServer::Response> response, shared_ptr<HttpServer::Request> request);
120  void storageTableUpdate(shared_ptr<HttpServer::Response> response, shared_ptr<HttpServer::Request> request);
121  void storageTableDelete(shared_ptr<HttpServer::Response> response, shared_ptr<HttpServer::Request> request);
122  void storageTableSimpleQuery(shared_ptr<HttpServer::Response> response, shared_ptr<HttpServer::Request> request);
123  void storageTableQuery(shared_ptr<HttpServer::Response> response, shared_ptr<HttpServer::Request> request);
124 
125 
126  void printList();
127  bool createSchema(const std::string& schema);
128  void setTimeout(long timeout)
129  {
130  if (m_server)
131  {
132  m_server->config.timeout_request = timeout;
133  }
134  };
135 
136  StoragePlugin *getStoragePlugin() { return plugin; };
138  *getPerformanceMonitor() { return m_perfMonitor; };
139  void worker();
140  void queue(StorageOperation::Operations op, shared_ptr<HttpServer::Request> request, shared_ptr<HttpServer::Response> response);
141 public:
142  std::atomic<int> m_workers_count;
143 
144 private:
145  static StorageApi *m_instance;
146  HttpServer *m_server;
147  unsigned short m_port;
148  unsigned int m_threads;
149  thread *m_thread;
150  StoragePlugin *plugin;
151  StoragePlugin *readingPlugin;
152  StorageStats stats;
153  std::map<string, pair<int,std::list<std::string>::iterator>> m_seqnum_map;
154  const unsigned int max_entries_in_seqnum_map = 16;
155  std::list<std::string> seqnum_map_lru_list; // has the most recently accessed elements of m_seqnum_map at front of the dequeue
156  std::mutex mtx_seqnum_map;
157  StorageRegistry registry;
158  void respond(shared_ptr<HttpServer::Response>, const string&);
159  void respond(shared_ptr<HttpServer::Response>, SimpleWeb::StatusCode, const string&);
160  void internalError(shared_ptr<HttpServer::Response>, const exception&);
161  void mapError(string&, PLUGIN_ERROR *);
162  StreamHandler *streamHandler;
164  *m_perfMonitor;
165  std::mutex m_queueMutex;
166  std::condition_variable m_queueCV;
167  std::queue<StorageOperation *>
168  m_queue;
169  std::vector<std::thread *>
170  m_workers;
171  unsigned int m_workerPoolSize;
172  bool m_shutdown;
173 };
174 
180  public:
181  // Constructor with StorageApi pointer passed (also calling parent PerformanceMonitor constructor)
182  StoragePerformanceMonitor(const std::string& name, StorageApi *api) :
183  PerformanceMonitor(name, NULL), m_name(name), m_instance(api) {
184  };
185  // Direct write to storage of monitor data
186  void writeData(const std::string& table, const InsertValues& values) {
187  m_instance->getStoragePlugin()->commonInsert(table,
188  values.toJSON());
189  }
190  private:
191  std::string m_name;
192  StorageApi *m_instance;
193 };
194 
195 #endif
Definition: stream_handler.h:26
Class that represents a storage plugin.
Definition: storage_plugin.h:34
StoragePerformanceMonitor is a derived class from PerformanceMonitor It allows direct writing of moni...
Definition: storage_api.h:179
Definition: asset_tracking.h:108
Definition: reading_stream.h:42
Class to handle the performance monitors.
Definition: perfmonitors.h:35
StorageRegistry - a class that manages requests from other microservices to register interest in new ...
Definition: storage_registry.h:28
Definition: storage_stats.h:15
The Storage API class - this class is responsible for the registration of all API entry points in the...
Definition: storage_api.h:84
Structure used by plugins to return error information.
Definition: plugin_api.h:44
Definition: insert.h:145
StorageOperation.
Definition: configuration_manager.h:146