Fledge
An open source edge computing platform for industrial users
connection.h
1 #ifndef _CONNECTION_H
2 #define _CONNECTION_H
3 /*
4  * Fledge storage service.
5  *
6  * Copyright (c) 2018 OSisoft, LLC
7  *
8  * Released under the Apache 2.0 Licence
9  *
10  * Author: Massimiliano Pinto
11  */
12 
13 #include <sql_buffer.h>
14 #include <string>
15 #include <rapidjson/document.h>
16 #include <sqlite3.h>
17 #include <mutex>
18 #include <reading_stream.h>
19 #include <schema.h>
20 #include <map>
21 #include <vector>
22 #include <atomic>
23 
24 class ConnectionManager;
25 
26 #define TRACK_CONNECTION_USER 0 // Set to 1 to get dianositcs about connection pool use
27 
28 #define READINGS_DB_FILE_NAME "/" READINGS_DB_NAME_BASE "_1.db"
29 #define READINGS_DB READINGS_DB_NAME_BASE "_1"
30 #define READINGS_TABLE "readings"
31 #define READINGS_TABLE_MEM READINGS_TABLE "_1"
32 
33 
34 // Set plugin name for log messages
35 #ifndef PLUGIN_LOG_NAME
36 #define PLUGIN_LOG_NAME "SQLite3"
37 #endif
38 
39 // Retry mechanism
40 #define PREP_CMD_MAX_RETRIES 50 // Maximum no. of retries when a lock is encountered
41 #define PREP_CMD_RETRY_BASE 50 // Base time to wait for
42 #define PREP_CMD_RETRY_BACKOFF 50 // Variable time to wait for
43 
44 #define MAX_RETRIES 40 // Maximum no. of retries when a lock is encountered
45 #define RETRY_BACKOFF 50 // Multipler to backoff DB retry on lock
46 
47 /*
48  * Control the way purge deletes readings. The block size sets a limit as to how many rows
49  * get deleted in each call, whilst the sleep interval controls how long the thread sleeps
50  * between deletes. The idea is to not keep the database locked too long and allow other threads
51  * to have access to the database between blocks.
52  */
53 #define PURGE_SLEEP_MS 500
54 
55 #define PURGE_DELETE_BLOCK_SIZE 10000
56 #define MIN_PURGE_DELETE_BLOCK_SIZE 1000
57 #define MAX_PURGE_DELETE_BLOCK_SIZE 10000
58 
59 #define TARGET_PURGE_BLOCK_DEL_TIME (70*1000) // 70 msec
60 #define PURGE_BLOCK_SZ_GRANULARITY 5 // 5 rows
61 #define RECALC_PURGE_BLOCK_SIZE_NUM_BLOCKS 30 // recalculate purge block size after every 30 blocks
62 
63 #define PURGE_SLOWDOWN_AFTER_BLOCKS 5
64 #define PURGE_SLOWDOWN_SLEEP_MS 500
65 
66 #define SECONDS_PER_DAY "86400.0"
67 // 2440587.5 is the julian day at 1/1/1970 0:00 UTC.
68 #define JULIAN_DAY_START_UNIXTIME "2440587.5"
69 
70 
71 #define START_TIME std::chrono::high_resolution_clock::time_point t1 = std::chrono::high_resolution_clock::now();
72 #define END_TIME std::chrono::high_resolution_clock::time_point t2 = std::chrono::high_resolution_clock::now(); \
73  auto usecs = std::chrono::duration_cast<std::chrono::microseconds>( t2 - t1 ).count();
74 
75 
76 int dateCallback(void *data, int nCols, char **colValues, char **colNames);
77 bool applyColumnDateFormat(const std::string& inFormat,
78  const std::string& colName,
79  std::string& outFormat,
80  bool roundMs = false);
81 
82 bool applyColumnDateFormatLocaltime(const std::string& inFormat,
83  const std::string& colName,
84  std::string& outFormat,
85  bool roundMs = false);
86 
87 int rowidCallback(void *data,
88  int nCols,
89  char **colValues,
90  char **colNames);
91 
92 int selectCallback(void *data,
93  int nCols,
94  char **colValues,
95  char **colNames);
96 
97 int countCallback(void *data,
98  int nCols,
99  char **colValues,
100  char **colNames);
101 
102 bool applyDateFormat(const std::string& inFormat, std::string& outFormat);
103 
104 class Connection {
105  public:
106  Connection(ConnectionManager *manager);
107  ~Connection();
108 #ifndef SQLITE_SPLIT_READINGS
109  bool createSchema(const std::string& schema);
110  bool retrieve(const std::string& schema,
111  const std::string& table,
112  const std::string& condition,
113  std::string& resultSet);
114  int insert(const std::string& schema,
115  const std::string& table,
116  const std::string& data);
117  int update(const std::string& schema,
118  const std::string& table,
119  const std::string& data);
120  int deleteRows(const std::string& schema,
121  const std::string& table,
122  const std::string& condition);
123  int create_table_snapshot(const std::string& table, const std::string& id);
124  int load_table_snapshot(const std::string& table, const std::string& id);
125  int delete_table_snapshot(const std::string& table, const std::string& id);
126  bool get_table_snapshots(const std::string& table, std::string& resultSet);
127 #endif
128  int appendReadings(const char *readings);
129  int readingStream(ReadingStream **readings, bool commit);
130  bool fetchReadings(unsigned long id, unsigned int blksize,
131  std::string& resultSet);
132  bool retrieveReadings(const std::string& condition,
133  std::string& resultSet);
134  unsigned int purgeReadings(unsigned long age, unsigned int flags,
135  unsigned long sent, std::string& results);
136  unsigned int purgeReadingsByRows(unsigned long rowcount, unsigned int flags,
137  unsigned long sent, std::string& results);
138  long tableSize(const std::string& table);
139  void setTrace(bool);
140  bool formatDate(char *formatted_date, size_t formatted_date_size, const char *date);
141  bool aggregateQuery(const rapidjson::Value& payload, std::string& resultSet);
142  bool getNow(std::string& Now);
143 
144  sqlite3 *getDbHandle() {return dbHandle;};
145  void setUsedDbId(int dbId);
146 
147  void shutdownAppendReadings();
148  unsigned int purgeReadingsAsset(const std::string& asset);
149  bool vacuum();
150  bool supportsReadings() { return ! m_noReadings; };
151 #if TRACK_CONNECTION_USER
152  void setUsage(std::string usage) { m_usage = usage; };
153  void clearUsage() { m_usage = ""; };
154  std::string getUsage() { return m_usage; };
155 #endif
156 
157  private:
158  std::string operation(const char *sql);
159  std::vector<int>
160  m_NewDbIdList; // Newly created databases that should be attached
161 
162  bool m_streamOpenTransaction;
163  int m_queuing;
164  std::mutex m_qMutex;
165  int SQLPrepare(sqlite3 *dbHandle, const char *sqlCmd, sqlite3_stmt **readingsStmt);
166  int SQLexec(sqlite3 *db, const std::string& table, const char *sql,
167  int (*callback)(void*,int,char**,char**),
168  void *cbArg, char **errmsg);
169 
170  int SQLstep(sqlite3_stmt *statement);
171  bool m_logSQL;
172  void raiseError(const char *operation, const char *reason,...);
173  sqlite3 *dbHandle;
174  SchemaManager *m_schemaManager;
175  int mapResultSet(void *res, std::string& resultSet, unsigned long *rowsCount = nullptr);
176 #ifndef SQLITE_SPLIT_READINGS
177  bool jsonWhereClause(const rapidjson::Value& whereClause, SQLBuffer&, std::vector<std::string> &asset_codes, bool convertLocaltime = false, std::string prefix = "");
178 #else
179  bool jsonWhereClause(const rapidjson::Value& whereClause, SQLBuffer&, bool convertLocaltime = false, std::string prefix = "");
180 #endif
181  bool jsonModifiers(const rapidjson::Value&, SQLBuffer&, bool isTableReading = false);
182 #ifndef SQLITE_SPLIT_READINGS
183  bool jsonAggregates(const rapidjson::Value&,
184  const rapidjson::Value&,
185  SQLBuffer&,
186  SQLBuffer&,
187  bool &isOptAggregate,
188  bool isTableReading = false,
189  bool isExtQuery = false
190  );
191 #else
192  bool jsonAggregates(const rapidjson::Value&,
193  const rapidjson::Value&,
194  SQLBuffer&,
195  SQLBuffer&,
196  bool isTableReading = false);
197 #endif
198  bool returnJson(const rapidjson::Value&, SQLBuffer&, SQLBuffer&);
199  char *trim(char *str);
200  const std::string
201  escape(const std::string&);
202  bool applyColumnDateTimeFormat(sqlite3_stmt *pStmt,
203  int i,
204  std::string& newDate);
205  void logSQL(const char *, const char *);
206  bool selectColumns(const rapidjson::Value& document, SQLBuffer& sql, int level);
207  bool appendTables(const std::string& schema, const rapidjson::Value& document, SQLBuffer& sql, int level);
208  bool processJoinQueryWhereClause(const rapidjson::Value& query, SQLBuffer& sql, std::vector<std::string> &asset_codes, int level);
209  bool m_noReadings;
210 #if TRACK_CONNECTION_USER
211  std::string m_usage;
212 #endif
214  *m_manager;
215 };
216 
217 #endif
int readingStream(ReadingStream **readings, bool commit)
Append a stream of readings to SQLite db.
Definition: readings.cpp:396
Singleton class to manage Postgres connection pool.
Definition: connection_manager.h:22
bool createSchema(const std::string &schema)
Create schema and populate with tables and indexes as defined in the JSON schema definition.
Definition: connection.cpp:3841
~Connection()
Destructor for the database connection.
Definition: connection.cpp:373
Definition: reading_stream.h:42
void shutdownAppendReadings()
Wait until all the threads executing the appendReadings are shutted down.
Definition: readings.cpp:653
int insert(const std::string &table, const std::string &data)
Insert data into a table.
Definition: connection.cpp:921
Buffer class designed to hold SQL statement that can as required but have minimal copy semantics...
Definition: sql_buffer.h:22
The singleton SchemaManager class used to interact with the extension schemas created by various exte...
Definition: schema.h:85
static bool formatDate(char *formatted_date, size_t formatted_date_size, const char *date)
Format a date to a fixed format with milliseconds, microseconds and timezone expressed, examples :
Definition: connection.cpp:1496
Connection()
Create a database connection.
Definition: connection.cpp:335
bool vacuum()
Execute a SQLite VACUUM command on the database.
Definition: connection.cpp:3849
unsigned int purgeReadingsByRows(unsigned long rowcount, unsigned int flags, unsigned long sent, std::string &results)
Purge readings from the reading table leaving a number of rows equal to the parameter rows...
Definition: connection.cpp:2105
void setTrace(bool flag)
Enable or disable the tracing of SQL statements.
Definition: connection.h:50
void setUsedDbId(int dbId)
Append a set of readings to the readings table.
Definition: readings.cpp:645
Definition: connection.h:32
unsigned int purgeReadings(unsigned long age, unsigned int flags, unsigned long sent, std::string &results)
Purge readings from the reading table.
Definition: connection.cpp:1782
int appendReadings(const char *readings)
Append a set of readings to the readings table.
Definition: connection.cpp:1597
bool fetchReadings(unsigned long id, unsigned int blksize, std::string &resultSet)
Fetch a block of readings from the reading table.
Definition: connection.cpp:1757