Fledge
An open source edge computing platform for industrial users
connection.h
1 #ifndef _CONNECTION_H
2 #define _CONNECTION_H
3 /*
4  * Fledge storage service.
5  *
6  * Copyright (c) 2018 OSisoft, LLC
7  *
8  * Released under the Apache 2.0 Licence
9  *
10  * Author: Massimiliano Pinto
11  */
12 
13 #include <sql_buffer.h>
14 #include <string>
15 #include <rapidjson/document.h>
16 #include <sqlite3.h>
17 #include <mutex>
18 #include <vector>
19 #include <reading_stream.h>
20 #ifndef MEMORY_READING_PLUGIN
21 #include <schema.h>
22 #endif
23 
24 #define _DB_NAME "/fledge.db"
25 #define READINGS_DB_NAME_BASE "readings"
26 #define READINGS_DB_FILE_NAME "/" READINGS_DB_NAME_BASE ".db"
27 #define READINGS_DB READINGS_DB_NAME_BASE
28 #define READINGS_TABLE "readings"
29 #define READINGS_TABLE_MEM READINGS_TABLE
30 
31 #define MAX_RETRIES 80 // Maximum no. of retries when a lock is encountered
32 #define RETRY_BACKOFF 100 // Multipler to backoff DB retry on lock
33 #define RETRY_BACKOFF_EXEC 1000 // Multipler to backoff DB retry on lock
34 
35 #define LEN_BUFFER_DATE 100
36 #define F_TIMEH24_S "%H:%M:%S"
37 #define F_DATEH24_S "%Y-%m-%d %H:%M:%S"
38 #define F_DATEH24_M "%Y-%m-%d %H:%M"
39 #define F_DATEH24_H "%Y-%m-%d %H"
40 // This is the default datetime format in Fledge: 2018-05-03 18:15:00.622
41 #define F_DATEH24_MS "%Y-%m-%d %H:%M:%f"
42 // Format up to seconds
43 #define F_DATEH24_SEC "%Y-%m-%d %H:%M:%S"
44 #define SQLITE3_NOW "strftime('%Y-%m-%d %H:%M:%f', 'now', 'localtime')"
45 // The default precision is milliseconds, it adds microseconds and timezone
46 #define SQLITE3_NOW_READING "strftime('%Y-%m-%d %H:%M:%f000+00:00', 'now')"
47 #define SQLITE3_FLEDGE_DATETIME_TYPE "DATETIME"
48 
49 // Set plugin name for log messages
50 #ifndef PLUGIN_LOG_NAME
51 #define PLUGIN_LOG_NAME "SQLite3"
52 #endif
53 
54 int dateCallback(void *data, int nCols, char **colValues, char **colNames);
55 bool applyColumnDateFormat(const std::string& inFormat,
56  const std::string& colName,
57  std::string& outFormat,
58  bool roundMs = false);
59 
60 bool applyColumnDateFormatLocaltime(const std::string& inFormat,
61  const std::string& colName,
62  std::string& outFormat,
63  bool roundMs = false);
64 
65 int rowidCallback(void *data,
66  int nCols,
67  char **colValues,
68  char **colNames);
69 
70 int selectCallback(void *data,
71  int nCols,
72  char **colValues,
73  char **colNames);
74 
75 int countCallback(void *data,
76  int nCols,
77  char **colValues,
78  char **colNames);
79 
80 bool applyDateFormat(const std::string& inFormat, std::string& outFormat);
81 
82 class Connection {
83  public:
84  Connection();
85  ~Connection();
86 #ifndef SQLITE_SPLIT_READINGS
87  bool createSchema(const std::string& schema);
88  bool retrieve(const std::string& schema,
89  const std::string& table,
90  const std::string& condition,
91  std::string& resultSet);
92  int insert(const std::string& schema,
93  const std::string& table,
94  const std::string& data);
95  int update(const std::string& schema,
96  const std::string& table,
97  const std::string& data);
98  int deleteRows(const std::string& schema,
99  const std::string& table,
100  const std::string& condition);
101  int create_table_snapshot(const std::string& table, const std::string& id);
102  int load_table_snapshot(const std::string& table, const std::string& id);
103  int delete_table_snapshot(const std::string& table, const std::string& id);
104  bool get_table_snapshots(const std::string& table, std::string& resultSet);
105 #endif
106  int appendReadings(const char *readings);
107  int readingStream(ReadingStream **readings, bool commit);
108  bool fetchReadings(unsigned long id, unsigned int blksize,
109  std::string& resultSet);
110  bool retrieveReadings(const std::string& condition,
111  std::string& resultSet);
112  unsigned int purgeReadings(unsigned long age, unsigned int flags,
113  unsigned long sent, std::string& results);
114  unsigned int purgeReadingsByRows(unsigned long rowcount, unsigned int flags,
115  unsigned long sent, std::string& results);
116  long tableSize(const std::string& table);
117  void setTrace(bool);
118  bool formatDate(char *formatted_date, size_t formatted_date_size, const char *date);
119  bool aggregateQuery(const rapidjson::Value& payload, std::string& resultSet);
120  bool getNow(std::string& Now);
121  unsigned int purgeReadingsAsset(const std::string& asset);
122  bool vacuum();
123 #ifdef MEMORY_READING_PLUGIN
124  bool loadDatabase(const std::string& filname);
125  bool saveDatabase(const std::string& filname);
126 #endif
127  void setPurgeBlockSize(unsigned long purgeBlockSize)
128  {
129  m_purgeBlockSize = purgeBlockSize;
130  };
131 
132  private:
133 #ifndef MEMORY_READING_PLUGIN
134  SchemaManager *m_schemaManager;
135 #endif
136  bool m_streamOpenTransaction;
137  int m_queuing;
138  std::mutex m_qMutex;
139  unsigned long m_purgeBlockSize;
140  std::string operation(const char *sql);
141  int SQLexec(sqlite3 *db, const std::string& table, const char *sql,
142  int (*callback)(void*,int,char**,char**),
143  void *cbArg, char **errmsg);
144  int SQLstep(sqlite3_stmt *statement);
145  bool m_logSQL;
146  void raiseError(const char *operation, const char *reason,...);
147  sqlite3 *dbHandle;
148  int mapResultSet(void *res, std::string& resultSet);
149  bool jsonWhereClause(const rapidjson::Value& whereClause,
150  SQLBuffer&, std::vector<std::string> &asset_codes,
151  bool convertLocaltime = false,
152  std::string prefix = "");
153  bool jsonModifiers(const rapidjson::Value&, SQLBuffer&, bool isTableReading = false);
154  bool jsonAggregates(const rapidjson::Value&,
155  const rapidjson::Value&,
156  SQLBuffer&,
157  SQLBuffer&,
158  bool isTableReading = false);
159  bool returnJson(const rapidjson::Value&, SQLBuffer&, SQLBuffer&);
160  char *trim(char *str);
161  const std::string
162  escape(const std::string&);
163  bool applyColumnDateTimeFormat(sqlite3_stmt *pStmt,
164  int i,
165  std::string& newDate);
166  void logSQL(const char *, const char *);
167  bool appendTables(const std::string& schema, const rapidjson::Value& document, SQLBuffer& sql, int level);
168  bool processJoinQueryWhereClause(const rapidjson::Value& query,
169  SQLBuffer& sql,
170  std::vector<std::string> &asset_codes,
171  int level);
172  bool selectColumns(const rapidjson::Value& document, SQLBuffer& sql, int level);
173 };
174 #endif
int readingStream(ReadingStream **readings, bool commit)
Append a stream of readings to SQLite db.
Definition: readings.cpp:396
bool createSchema(const std::string &schema)
Create schema and populate with tables and indexes as defined in the JSON schema definition.
Definition: connection.cpp:3841
~Connection()
Destructor for the database connection.
Definition: connection.cpp:373
Definition: reading_stream.h:42
int insert(const std::string &table, const std::string &data)
Insert data into a table.
Definition: connection.cpp:921
Buffer class designed to hold SQL statement that can as required but have minimal copy semantics...
Definition: sql_buffer.h:22
The singleton SchemaManager class used to interact with the extension schemas created by various exte...
Definition: schema.h:85
static bool formatDate(char *formatted_date, size_t formatted_date_size, const char *date)
Format a date to a fixed format with milliseconds, microseconds and timezone expressed, examples :
Definition: connection.cpp:1496
Connection()
Create a database connection.
Definition: connection.cpp:335
bool vacuum()
Execute a SQLite VACUUM command on the database.
Definition: connection.cpp:3849
unsigned int purgeReadingsByRows(unsigned long rowcount, unsigned int flags, unsigned long sent, std::string &results)
Purge readings from the reading table leaving a number of rows equal to the parameter rows...
Definition: connection.cpp:2105
void setTrace(bool flag)
Enable or disable the tracing of SQL statements.
Definition: connection.h:50
Definition: connection.h:32
unsigned int purgeReadings(unsigned long age, unsigned int flags, unsigned long sent, std::string &results)
Purge readings from the reading table.
Definition: connection.cpp:1782
int appendReadings(const char *readings)
Append a set of readings to the readings table.
Definition: connection.cpp:1597
bool fetchReadings(unsigned long id, unsigned int blksize, std::string &resultSet)
Fetch a block of readings from the reading table.
Definition: connection.cpp:1757