13 #include <server_http.hpp>
14 #include <storage_plugin.h>
15 #include <storage_stats.h>
16 #include <storage_registry.h>
17 #include <stream_handler.h>
18 #include <perfmonitors.h>
21 using HttpServer = SimpleWeb::Server<SimpleWeb::HTTP>;
26 #define COMMON_ACCESS "^/storage/table/([A-Za-z][a-zA-Z0-9_]*)$"
27 #define COMMON_QUERY "^/storage/table/([A-Za-z][a-zA-Z_0-9]*)/query$"
28 #define READING_ACCESS "^/storage/reading$"
29 #define READING_QUERY "^/storage/reading/query"
30 #define READING_PURGE "^/storage/reading/purge"
31 #define READING_INTEREST "^/storage/reading/interest/([A-Za-z0-9\\*][a-zA-Z0-9_%\\.\\-]*)$"
32 #define TABLE_INTEREST "^/storage/table/interest/([A-Za-z\\*][a-zA-Z0-9_%\\.\\-]*)$"
34 #define GET_TABLE_SNAPSHOTS "^/storage/table/([A-Za-z][a-zA-Z_0-9_]*)/snapshot$"
35 #define CREATE_TABLE_SNAPSHOT GET_TABLE_SNAPSHOTS
36 #define LOAD_TABLE_SNAPSHOT "^/storage/table/([A-Za-z][a-zA-Z_0-9_]*)/snapshot/([a-zA-Z_0-9_]*)$"
37 #define DELETE_TABLE_SNAPSHOT LOAD_TABLE_SNAPSHOT
38 #define CREATE_STORAGE_STREAM "^/storage/reading/stream$"
39 #define STORAGE_SCHEMA "^/storage/schema"
40 #define STORAGE_TABLE_ACCESS "^/storage/schema/([A-Za-z][a-zA-Z0-9_]*)/table/([A-Za-z][a-zA-Z0-9_]*)$"
41 #define STORAGE_TABLE_QUERY "^/storage/schema/([A-Za-z][a-zA-Z0-9_]*)/table/([A-Za-z][a-zA-Z_0-9]*)/query$"
43 #define PURGE_FLAG_RETAIN "retain"
44 #define PURGE_FLAG_RETAIN_ANY "retainany"
45 #define PURGE_FLAG_RETAIN_ALL "retainall"
46 #define PURGE_FLAG_PURGE "purge"
48 #define TABLE_NAME_COMPONENT 1
49 #define STORAGE_SCHEMA_NAME_COMPONENT 1
50 #define STORAGE_TABLE_NAME_COMPONENT 2
51 #define ASSET_NAME_COMPONENT 1
52 #define SNAPSHOT_ID_COMPONENT 2
60 enum Operations { ReadingAppend, ReadingPurge, ReadingFetch, ReadingQuery };
62 StorageOperation(StorageOperation::Operations operation, shared_ptr<HttpServer::Request> request,
63 shared_ptr<HttpServer::Response> response) :
64 m_operation(operation),
73 StorageOperation::Operations m_operation;
74 shared_ptr<HttpServer::Request> m_request;
75 shared_ptr<HttpServer::Response> m_response;
87 StorageApi(
const unsigned short port,
const unsigned int threads,
const unsigned int workerPoolSize);
97 unsigned short getListenerPort();
98 void commonInsert(shared_ptr<HttpServer::Response> response, shared_ptr<HttpServer::Request> request);
99 void commonSimpleQuery(shared_ptr<HttpServer::Response> response, shared_ptr<HttpServer::Request> request);
100 void commonQuery(shared_ptr<HttpServer::Response> response, shared_ptr<HttpServer::Request> request);
101 void commonUpdate(shared_ptr<HttpServer::Response> response, shared_ptr<HttpServer::Request> request);
102 void commonDelete(shared_ptr<HttpServer::Response> response, shared_ptr<HttpServer::Request> request);
103 void defaultResource(shared_ptr<HttpServer::Response> response, shared_ptr<HttpServer::Request> request);
104 void readingAppend(shared_ptr<HttpServer::Response> response, shared_ptr<HttpServer::Request> request);
105 void readingFetch(shared_ptr<HttpServer::Response> response, shared_ptr<HttpServer::Request> request);
106 void readingQuery(shared_ptr<HttpServer::Response> response, shared_ptr<HttpServer::Request> request);
107 void readingPurge(shared_ptr<HttpServer::Response> response, shared_ptr<HttpServer::Request> request);
108 void readingRegister(shared_ptr<HttpServer::Response> response, shared_ptr<HttpServer::Request> request);
109 void readingUnregister(shared_ptr<HttpServer::Response> response, shared_ptr<HttpServer::Request> request);
110 void tableRegister(shared_ptr<HttpServer::Response> response, shared_ptr<HttpServer::Request> request);
111 void tableUnregister(shared_ptr<HttpServer::Response> response, shared_ptr<HttpServer::Request> request);
112 void createTableSnapshot(shared_ptr<HttpServer::Response> response, shared_ptr<HttpServer::Request> request);
113 void loadTableSnapshot(shared_ptr<HttpServer::Response> response, shared_ptr<HttpServer::Request> request);
114 void deleteTableSnapshot(shared_ptr<HttpServer::Response> response, shared_ptr<HttpServer::Request> request);
115 void getTableSnapshots(shared_ptr<HttpServer::Response> response, shared_ptr<HttpServer::Request> request);
116 void createStorageStream(shared_ptr<HttpServer::Response> response, shared_ptr<HttpServer::Request> request);
118 void createStorageSchema(shared_ptr<HttpServer::Response> response, shared_ptr<HttpServer::Request> request);
119 void storageTableInsert(shared_ptr<HttpServer::Response> response, shared_ptr<HttpServer::Request> request);
120 void storageTableUpdate(shared_ptr<HttpServer::Response> response, shared_ptr<HttpServer::Request> request);
121 void storageTableDelete(shared_ptr<HttpServer::Response> response, shared_ptr<HttpServer::Request> request);
122 void storageTableSimpleQuery(shared_ptr<HttpServer::Response> response, shared_ptr<HttpServer::Request> request);
123 void storageTableQuery(shared_ptr<HttpServer::Response> response, shared_ptr<HttpServer::Request> request);
127 bool createSchema(
const std::string& schema);
128 void setTimeout(
long timeout)
132 m_server->config.timeout_request = timeout;
138 *getPerformanceMonitor() {
return m_perfMonitor; };
140 void queue(StorageOperation::Operations op, shared_ptr<HttpServer::Request> request, shared_ptr<HttpServer::Response> response);
142 std::atomic<int> m_workers_count;
146 HttpServer *m_server;
147 unsigned short m_port;
148 unsigned int m_threads;
153 std::map<string, pair<int,std::list<std::string>::iterator>> m_seqnum_map;
154 const unsigned int max_entries_in_seqnum_map = 16;
155 std::list<std::string> seqnum_map_lru_list;
156 std::mutex mtx_seqnum_map;
158 void respond(shared_ptr<HttpServer::Response>,
const string&);
159 void respond(shared_ptr<HttpServer::Response>, SimpleWeb::StatusCode,
const string&);
160 void internalError(shared_ptr<HttpServer::Response>,
const exception&);
165 std::mutex m_queueMutex;
166 std::condition_variable m_queueCV;
167 std::queue<StorageOperation *>
169 std::vector<std::thread *>
171 unsigned int m_workerPoolSize;
186 void writeData(
const std::string& table,
const InsertValues& values) {
187 m_instance->getStoragePlugin()->commonInsert(table,