Fledge
An open source edge computing platform for industrial users
readings_catalogue.h
1 #ifndef _READINGS_CATALOGUE_H
2 #define _READINGS_CATALOGUE_H
3 /*
4  * Fledge storage service - Readings catalogue handling
5  *
6  * Copyright (c) 2020 OSisoft, LLC
7  *
8  * Released under the Apache 2.0 Licence
9  *
10  * Author: Stefano Simonelli, Massimiliano Pinto
11  */
12 
13 #include "connection.h"
14 #include <thread>
15 
16 #define OVERFLOW_TABLE_ID 0 // Table ID to use for the overflow table
17 
22  public:
24  unsigned long GetMinReadingId();
25  void SetThreadTransactionStart(std::thread::id tid,
26  unsigned long id);
27  void ClearThreadTransaction(std::thread::id);
28 
29  private:
30  std::map<std::thread::id, unsigned long>
31  m_boundaries;
32  std::mutex m_boundaryLock;
33 };
34 
35 
44 typedef struct
45 {
46  int poolSize = 5;
47  int nReadingsPerDb = 14;
48  int nDbPreallocate = 3;
49  int nDbLeftFreeBeforeAllocate = 1;
50  int nDbToAllocate = 2;
51 
53 
58  public:
59  TableReference(int dbId, int tableId) : m_dbId(dbId), m_tableId(tableId)
60  {
61  m_issued = time(0);
62  };
63  time_t lastIssued()
64  {
65  return m_issued;
66  };
67  int getTable()
68  {
69  return m_tableId;
70  };
71  int getDatabase()
72  {
73  return m_dbId;
74  };
75  void issue()
76  {
77  m_issued = time(0);
78  };
79  private:
80  int m_dbId;
81  int m_tableId;
82  time_t m_issued;
83 };
84 
128 
129 public:
130  typedef struct ReadingReference {
131  int dbId;
132  int tableId;
133 
135 
136  static ReadingsCatalogue *getInstance()
137  {
138  static ReadingsCatalogue *instance = 0;
139 
140  if (!instance)
141  {
142  instance = new ReadingsCatalogue;
143  }
144  return instance;
145  }
146 
147  void multipleReadingsInit(STORAGE_CONFIGURATION &storageConfig);
148  std::string generateDbAlias(int dbId);
149  std::string generateDbName(int tableId);
150  std::string generateDbFileName(int dbId);
151  std::string generateDbNameFromTableId(int tableId);
152  std::string generateReadingsName(int dbId, int tableId);
153  void getAllDbs(std::vector<int> &dbIdList);
154  void getNewDbs(std::vector<int> &dbIdList);
155  int getMaxReadingsId(int dbId);
156  int getReadingsCount();
157  int getReadingPosition(int dbId, int tableId);
158  int getNReadingsAvailable() const {return m_nReadingsAvailable;}
159  long getIncGlobalId() { return m_ReadingsGlobalId.fetch_add(1); } // returns the value before the add operation
160  long getMinGlobalId (sqlite3 *dbHandle);
161  long getGlobalId() {return m_ReadingsGlobalId;};
162  bool evaluateGlobalId();
163  bool storeGlobalId ();
164 
165  void preallocateReadingsTables(int dbId);
166  bool loadAssetReadingCatalogue();
167  bool loadEmptyAssetReadingCatalogue(bool clean = true);
168 
169  bool latestDbUpdate(sqlite3 *dbHandle, int newDbId);
170  int preallocateNewDbsRange(int dbIdStart, int dbIdEnd);
171  tyReadingReference getEmptyReadingTableReference(std::string& asset);
172  tyReadingReference getReadingReference(Connection *connection, const char *asset_code);
173  bool attachDbsToAllConnections();
174  std::string sqlConstructMultiDb(std::string &sqlCmdBase, std::vector<std::string> &assetCodes, bool considerExclusion=false);
175  std::string sqlConstructOverflow(std::string &sqlCmdBase, std::vector<std::string> &assetCodes, bool considerExclusion=false, bool groupBy = false);
176  int purgeAllReadings(sqlite3 *dbHandle, const char *sqlCmdBase, char **errMsg = NULL, unsigned long *rowsAffected = NULL);
177 
178  bool connectionAttachAllDbs(sqlite3 *dbHandle);
179  bool connectionAttachDbList(sqlite3 *dbHandle, std::vector<int> &dbIdList);
180  bool attachDb(sqlite3 *dbHandle, std::string &path, std::string &alias, int dbId);
181  void detachDb(sqlite3 *dbHandle, std::string &alias);
182 
183  void setUsedDbId(int dbId);
184  int extractReadingsIdFromName(std::string tableName);
185  int extractDbIdFromName(std::string tableName);
186  int SQLExec(sqlite3 *dbHandle, const char *sqlCmd, char **errMsg = NULL);
187  bool createReadingsOverflowTable(sqlite3 *dbHandle, int dbId);
188  int getMaxAttached() { return m_attachLimit; };
189 
190 
191 private:
192  STORAGE_CONFIGURATION m_storageConfigCurrent; // The current configuration of the multiple readings
193  STORAGE_CONFIGURATION m_storageConfigApi; // The parameters retrieved from the API
194 
195  enum NEW_DB_OPERATION {
196  NEW_DB_ATTACH_ALL,
197  NEW_DB_ATTACH_REQUEST,
198  NEW_DB_DETACH
199  };
200 
201  enum ACTION {
202  ACTION_DB_ADD,
203  ACTION_DB_REMOVE,
204  ACTION_DB_NONE,
205  ACTION_TB_ADD,
206  ACTION_TB_REMOVE,
207  ACTION_TB_NONE,
208  ACTION_INVALID
209  };
210 
211  typedef struct ReadingAvailable {
212  int lastReadings;
213  int tableCount;
214 
215  } tyReadingsAvailable;
216 
218 
219  bool createNewDB(sqlite3 *dbHandle, int newDbId, int startId, NEW_DB_OPERATION attachAllDb);
220  int getUsedTablesDbId(int dbId);
221  int getNReadingsAllocate() const {return m_storageConfigCurrent.nReadingsPerDb;}
222  bool createReadingsTables(sqlite3 *dbHandle, int dbId, int idStartFrom, int nTables);
223  bool isReadingAvailable() const;
224  void allocateReadingAvailable();
225  tyReadingsAvailable evaluateLastReadingAvailable(sqlite3 *dbHandle, int dbId);
226  long calculateGlobalId (sqlite3 *dbHandle);
227  std::string generateDbFilePath(int dbId);
228 
229  void raiseError(const char *operation, const char *reason,...);
230  int SQLStep(sqlite3_stmt *statement);
231  bool enableWAL(std::string &dbPathReadings);
232 
233  bool configurationRetrieve(sqlite3 *dbHandle);
234  void prepareAllDbs();
235  bool applyStorageConfigChanges(sqlite3 *dbHandle);
236  void dbFileDelete(std::string dbPath);
237  void dbsRemove(int startId, int endId);
238  void storeReadingsConfiguration (sqlite3 *dbHandle);
239  ACTION changesLogicDBs(int dbIdCurrent , int dbIdLast, int nDbPreallocateCurrent, int nDbPreallocateRequest, int nDbLeftFreeBeforeAllocate);
240  ACTION changesLogicTables(int maxUsed ,int Current, int Request);
241  int retrieveDbIdFromTableId(int tableId);
242 
243  void configChangeAddDb(sqlite3 *dbHandle);
244  void configChangeRemoveDb(sqlite3 *dbHandle);
245  void configChangeAddTables(sqlite3 *dbHandle , int startId, int endId);
246  void configChangeRemoveTables(sqlite3 *dbHandle , int startId, int endId);
247 
248  int calcMaxReadingUsed();
249  void dropReadingsTables(sqlite3 *dbHandle, int dbId, int idStart, int idEnd);
250 
251 
252  int m_dbIdCurrent; // Current database in use
253  int m_dbIdLast; // Last database available not already in use
254  int m_dbNAvailable; // Number of databases available
255  std::vector<int>
256  m_dbIdList; // Databases already created but not in use
257 
258  std::atomic<long>
259  m_ReadingsGlobalId; // Global row id shared among all the readings table
260  int
261  m_nReadingsAvailable = 0; // Number of readings tables available
262  std::map <std::string, TableReference> m_AssetReadingCatalogue={ // In memory structure to identify in which database/table an asset is stored
263 
264  // asset_code - reading Table Id, Db Id
265  // {"", ,{1 ,1 }}
266  };
267  std::map <std::string, std::pair<int, int>> m_EmptyAssetReadingCatalogue={ // In memory structure to identify in which database/table an asset is empty
268  // asset_code - reading Table Id, Db Id
269  // {"", ,{1 ,1 }}
270  };
271  int m_nextOverflow; // The next database to use for overflow assets
272  int m_attachLimit;
273  int m_maxOverflowUsed;
274  int m_compounds; // Max number of compound statements
275  std::mutex m_emptyReadingTableMutex;
276 public:
277  TransactionBoundary m_tx;
278 
279 };
280 
285 
286 public:
287  static AttachDbSync *getInstance()
288  {
289  static AttachDbSync instance;
290  return &instance;
291  }
292 
293  void lock() {m_dbLock.lock();}
294  void unlock() {m_dbLock.unlock();}
295 
296 private:
297  AttachDbSync(){};
298 
299  std::mutex m_dbLock;
300 };
301 
302 #endif
Implements the handling of multiples readings tables stored among multiple SQLite databases...
Definition: readings_catalogue.h:127
void ClearThreadTransaction(std::thread::id)
Remove committed TRANSACTION for the given thread.
Definition: readings_catalogue.cpp:2897
Definition: readings_catalogue.h:130
This class handles per thread started transaction boundaries:
Definition: readings_catalogue.h:21
Used to synchronize the attach database operation.
Definition: readings_catalogue.h:284
void SetThreadTransactionStart(std::thread::id tid, unsigned long id)
Set BEGIN of a transaction for a given thread, reading id.
Definition: readings_catalogue.cpp:2926
Class used to store table references.
Definition: readings_catalogue.h:57
unsigned long GetMinReadingId()
Fetch the minimum safe global reading id among all UNCOMMITTED per thread transactions.
Definition: readings_catalogue.cpp:2948
Definition: connection.h:32
Definition: readings_catalogue.h:44