Fledge
An open source edge computing platform for industrial users
connection.h
1 #ifndef _CONNECTION_H
2 #define _CONNECTION_H
3 /*
4  * Fledge storage service.
5  *
6  * Copyright (c) 2017 OSisoft, LLC
7  *
8  * Released under the Apache 2.0 Licence
9  *
10  * Author: Mark Riddoch
11  */
12 
13 #include <sql_buffer.h>
14 #include <string>
15 #include <rapidjson/document.h>
16 #include <libpq-fe.h>
17 #include <unordered_map>
18 #include <unordered_set>
19 #include <functional>
20 #include <vector>
21 
22 #define STORAGE_PURGE_RETAIN_ANY 0x0001U
23 #define STORAGE_PURGE_RETAIN_ALL 0x0002U
24 #define STORAGE_PURGE_SIZE 0x0004U
25 
30 #define INSERT_ROW_LIMIT 5000
31 
32 class Connection {
33  public:
34  Connection();
35  ~Connection();
36  bool retrieve(const std::string& schema,
37  const std::string& table, const std::string& condition,
38  std::string& resultSet);
39  bool retrieveReadings(const std::string& condition, std::string& resultSet);
40  int insert(const std::string& table, const std::string& data);
41  int update(const std::string& table, const std::string& data);
42  int deleteRows(const std::string& table, const std::string& condition);
43  int appendReadings(const char *readings);
44  bool fetchReadings(unsigned long id, unsigned int blksize, std::string& resultSet);
45  unsigned int purgeReadings(unsigned long age, unsigned int flags, unsigned long sent, std::string& results);
46  unsigned int purgeReadingsByRows(unsigned long rowcount, unsigned int flags,unsigned long sent, std::string& results);
47  unsigned long purgeOperation(const char *sql, const char *logSection, const char *phase, bool retrieve);
48 
49  long tableSize(const std::string& table);
50  void setTrace(bool flag) { m_logSQL = flag; };
51  static bool formatDate(char *formatted_date, size_t formatted_date_size, const char *date);
52  int create_table_snapshot(const std::string& table, const std::string& id);
53  int load_table_snapshot(const std::string& table, const std::string& id);
54  int delete_table_snapshot(const std::string& table, const std::string& id);
55  bool get_table_snapshots(const std::string& table,
56  std::string& resultSet);
57  bool aggregateQuery(const rapidjson::Value& payload, std::string& resultSet);
58  int create_schema(const std::string &payload);
59  bool findSchemaFromDB(const std::string &service,
60  const std::string &name,
61  std::string &resultSet);
62  unsigned int purgeReadingsAsset(const std::string& asset);
63  void setMaxReadingRows(long rows)
64  {
65  m_maxReadingRows = rows;
66  }
67 
68  private:
69  bool m_logSQL;
70  void raiseError(const char *operation, const char *reason,...);
71  PGconn *dbConnection;
72  void mapResultSet(PGresult *res, std::string& resultSet);
73  bool jsonModifiers(const rapidjson::Value&, SQLBuffer&);
74  bool jsonAggregates(const rapidjson::Value&,
75  const rapidjson::Value&,
77  bool isTableReading = false);
78  bool jsonWhereClause(const rapidjson::Value& whereClause,
79  SQLBuffer&, std::vector<std::string> &asset_codes,
80  bool convertLocaltime = false,
81  std::string prefix = "");
82  bool returnJson(const rapidjson::Value&, SQLBuffer&, SQLBuffer&);
83  char *trim(char *str);
84  const std::string escape_double_quotes(const std::string&);
85  const std::string escape(const std::string&);
86  const std::string double_quote_reserved_column_name(const std::string &column_name);
87  void logSQL(const char *, const char *);
88  bool isFunction(const char *) const;
89  bool selectColumns(const rapidjson::Value& document, SQLBuffer& sql, int level);
90  bool appendTables(const std::string &schema,
91  const rapidjson::Value& document,
92  SQLBuffer& sql,
93  int level);
94  bool processJoinQueryWhereClause(const rapidjson::Value& query,
95  SQLBuffer& sql,
96  std::vector<std::string> &asset_codes,
97  int level);
98 
99  std::string getIndexName(std::string s);
100  bool checkValidDataType(const std::string &s);
101  long m_maxReadingRows;
102 
103 
104  typedef struct{
105  std::string column;
106  std::string type;
107  int sz;
108  bool key = false;
109  } columnRec;
110 
111  // Custom Hash Functor that will compute the hash on the
112  // passed objects column data member
113  struct columnRecHasher
114  {
115  size_t operator()(const columnRec & obj) const
116  {
117  return std::hash<std::string>()(obj.column);
118  }
119  };
120 
121  struct columnRecComparator
122  {
123  bool operator()(const columnRec & obj1, const columnRec & obj2) const
124  {
125  if (obj1.column == obj2.column)
126  return true;
127  return false;
128  }
129  };
130 
131  typedef struct{
132  std::string query;
133  std::string purgeOpArg;
134  std::string logMsg;
135  } sqlQuery;
136 
137  public:
138 
139  bool parseDatabaseStorageSchema(int &version, const std::string &res,
140  std::unordered_map<std::string, std::unordered_set<columnRec, columnRecHasher, columnRecComparator> > &tableColumnMap, std::unordered_map<std::string, std::vector<std::string> > &tableIndexMap, bool &schemaCreationRequest);
141 
142 
143 };
144 #endif
~Connection()
Destructor for the database connection.
Definition: connection.cpp:373
unsigned long purgeOperation(const char *sql, const char *logSection, const char *phase, bool retrieve)
Execute a SQL command for the purge task.
Definition: connection.cpp:2053
int create_schema(const std::string &payload)
Create schema of tables.
Definition: connection.cpp:4139
int insert(const std::string &table, const std::string &data)
Insert data into a table.
Definition: connection.cpp:921
bool findSchemaFromDB(const std::string &service, const std::string &name, std::string &resultSet)
Find existing payload schema from the DB fledge.service_schema table.
Definition: connection.cpp:3818
Buffer class designed to hold SQL statement that can as required but have minimal copy semantics...
Definition: sql_buffer.h:22
static bool formatDate(char *formatted_date, size_t formatted_date_size, const char *date)
Format a date to a fixed format with milliseconds, microseconds and timezone expressed, examples :
Definition: connection.cpp:1496
Connection()
Create a database connection.
Definition: connection.cpp:335
bool parseDatabaseStorageSchema(int &version, const std::string &res, std::unordered_map< std::string, std::unordered_set< columnRec, columnRecHasher, columnRecComparator > > &tableColumnMap, std::unordered_map< std::string, std::vector< std::string > > &tableIndexMap, bool &schemaCreationRequest)
This function parses the fledge.service_schema table payload retrieved in and outputs a set of data s...
Definition: connection.cpp:3877
unsigned int purgeReadingsByRows(unsigned long rowcount, unsigned int flags, unsigned long sent, std::string &results)
Purge readings from the reading table leaving a number of rows equal to the parameter rows...
Definition: connection.cpp:2105
void setTrace(bool flag)
Enable or disable the tracing of SQL statements.
Definition: connection.h:50
Definition: connection.h:32
unsigned int purgeReadings(unsigned long age, unsigned int flags, unsigned long sent, std::string &results)
Purge readings from the reading table.
Definition: connection.cpp:1782
int appendReadings(const char *readings)
Append a set of readings to the readings table.
Definition: connection.cpp:1597
bool fetchReadings(unsigned long id, unsigned int blksize, std::string &resultSet)
Fetch a block of readings from the reading table.
Definition: connection.cpp:1757