Fledge
An open source edge computing platform for industrial users
storage_client.h
1 #ifndef _STORAGE_CLIENT_H
2 #define _STORAGE_CLIENT_H
3 /*
4  * Fledge storage client.
5  *
6  * Copyright (c) 2018 OSisoft, LLC
7  *
8  * Released under the Apache 2.0 Licence
9  *
10  * Author: Mark Riddoch, Massimiliano Pinto
11  */
12 #include <client_http.hpp>
13 #include <reading.h>
14 #include <reading_set.h>
15 #include <resultset.h>
16 #include <purge_result.h>
17 #include <query.h>
18 #include <insert.h>
19 #include <json_properties.h>
20 #include <expression.h>
21 #include <update_modifier.h>
22 #include <logger.h>
23 #include <string>
24 #include <vector>
25 #include <thread>
26 
27 using HttpClient = SimpleWeb::Client<SimpleWeb::HTTP>;
28 
29 #define STREAM_BLK_SIZE 100 // Readings to send per write call to a stream
30 #define STREAM_THRESHOLD 25 // Switch to streamed mode above this number of readings per second
31 
32 // Backup values for repeated storage client exception messages
33 #define SC_INITIAL_BACKOFF 100
34 #define SC_MAX_BACKOFF 1000
35 
36 #define DEFAULT_SCHEMA "fledge"
37 
38 class ManagementClient;
39 
44  public:
45  StorageClient(HttpClient *client);
46  StorageClient(const std::string& hostname, const unsigned short port);
48  ResultSet *queryTable(const std::string& schema, const std::string& tablename, const Query& query);
49  ResultSet *queryTable(const std::string& tablename, const Query& query);
50  ReadingSet *queryTableToReadings(const std::string& tableName, const Query& query);
51  int insertTable(const std::string& schema, const std::string& tableName, const InsertValues& values);
52  int insertTable(const std::string& schema, const std::string& tableName,
53  const std::vector<InsertValues>& values);
54  int insertTable(const std::string& tableName, const std::vector<InsertValues>& values);
55 
56 
57 
58  int updateTable(const std::string& schema, const std::string& tableName, const InsertValues& values,
59  const Where& where, const UpdateModifier *modifier = NULL);
60  int updateTable(const std::string& schema, const std::string& tableName, const JSONProperties& json,
61  const Where& where, const UpdateModifier *modifier = NULL);
62  int updateTable(const std::string& schema, const std::string& tableName, const InsertValues& values,
63  const JSONProperties& json, const Where& where, const UpdateModifier *modifier = NULL);
64  int updateTable(const std::string& schema, const std::string& tableName, const ExpressionValues& values,
65  const Where& where, const UpdateModifier *modifier = NULL);
66  int updateTable(const std::string& schema, const std::string& tableName,
67  std::vector<std::pair<ExpressionValues *, Where *>>& updates, const UpdateModifier *modifier = NULL);
68  int updateTable(const std::string& schema, const std::string& tableName, const InsertValues& values,
69  const ExpressionValues& expressoins, const Where& where, const UpdateModifier *modifier = NULL);
70  int deleteTable(const std::string& schema, const std::string& tableName, const Query& query);
71  int insertTable(const std::string& tableName, const InsertValues& values);
72  int updateTable(const std::string& tableName, const InsertValues& values, const Where& where, const UpdateModifier *modifier = NULL);
73  int updateTable(const std::string& tableName, const JSONProperties& json, const Where& where, const UpdateModifier *modifier = NULL);
74  int updateTable(const std::string& tableName, const InsertValues& values, const JSONProperties& json,
75  const Where& where, const UpdateModifier *modifier = NULL);
76  int updateTable(const std::string& tableName, const ExpressionValues& values, const Where& where,
77  const UpdateModifier *modifier = NULL);
78  int updateTable(const std::string& tableName, std::vector<std::pair<ExpressionValues *, Where *>>& updates,
79  const UpdateModifier *modifier = NULL);
80  int updateTable(const std::string& tableName, const InsertValues& values, const ExpressionValues& expressions,
81  const Where& where, const UpdateModifier *modifier = NULL);
82  int updateTable(const std::string& schema, const std::string& tableName,
83  std::vector<std::pair<InsertValue*, Where* > > &updates, const UpdateModifier *modifier);
84 
85  int updateTable(const std::string& tableName, std::vector<std::pair<InsertValue*, Where*> >& updates,
86  const UpdateModifier *modifier = NULL);
87 
88  int deleteTable(const std::string& tableName, const Query& query);
89  bool readingAppend(Reading& reading);
90  bool readingAppend(const std::vector<Reading *> & readings);
91  ResultSet *readingQuery(const Query& query);
93  ReadingSet *readingFetch(const unsigned long readingId, const unsigned long count);
94  PurgeResult readingPurgeByAge(unsigned long age, unsigned long sent, bool purgeUnsent);
95  PurgeResult readingPurgeBySize(unsigned long size, unsigned long sent, bool purgeUnsent);
96  PurgeResult readingPurgeByAsset(const std::string& asset);
97  bool registerAssetNotification(const std::string& assetName,
98  const std::string& callbackUrl);
99  bool unregisterAssetNotification(const std::string& assetName,
100  const std::string& callbackUrl);
101  bool registerTableNotification(const std::string& tableName, const std::string& key,
102  std::vector<std::string> keyValues, const std::string& operation, const std::string& callbackUrl);
103  bool unregisterTableNotification(const std::string& tableName, const std::string& key,
104  std::vector<std::string> keyValues, const std::string& operation, const std::string& callbackUrl);
105  void registerManagement(ManagementClient *mgmnt) { m_management = mgmnt; };
106  bool createSchema(const std::string&);
107  bool deleteHttpClient();
108 
109  private:
110  void handleUnexpectedResponse(const char *operation,
111  const std::string& table,
112  const std::string& responseCode,
113  const std::string& payload);
114  void handleUnexpectedResponse(const char *operation,
115  const std::string& responseCode,
116  const std::string& payload);
117  void handleException(const std::exception& ex, const char *operation, ...);
118  HttpClient *getHttpClient(void);
119  bool openStream();
120  bool streamReadings(const std::vector<Reading *> & readings);
121 
122  std::ostringstream m_urlbase;
123  std::string m_host;
124  std::map<std::thread::id, HttpClient *> m_client_map;
125  std::map<std::thread::id, std::atomic<int>> m_seqnum_map;
126  Logger *m_logger;
127  pid_t m_pid;
128  bool m_streaming;
129  int m_stream;
130  uint32_t m_readingBlock;
131  std::string m_lastException;
132  int m_exRepeat;
133  int m_backoff;
134  ManagementClient *m_management;
135 };
136 
137 #endif
138 
Fledge Logger class used to log to syslog.
Definition: logger.h:26
ReadingSet * readingQueryToReadings(const Query &query)
Perform a generic query against the readings data, returning ReadingSet object.
Definition: storage_client.cpp:302
~StorageClient()
Destructor for storage client.
Definition: storage_client.cpp:75
ResultSet * readingQuery(const Query &query)
Perform a generic query against the readings data.
Definition: storage_client.cpp:267
PurgeResult readingPurgeBySize(unsigned long size, unsigned long sent, bool purgeUnsent)
Purge the readings by size.
Definition: storage_client.cpp:409
Update modifier.
Definition: update_modifier.h:18
bool registerAssetNotification(const std::string &assetName, const std::string &callbackUrl)
Register interest for a Reading asset name.
Definition: storage_client.cpp:1216
Class that defines JSON properties for update.
Definition: json_properties.h:53
bool registerTableNotification(const std::string &tableName, const std::string &key, std::vector< std::string > keyValues, const std::string &operation, const std::string &callbackUrl)
Register interest for a table.
Definition: storage_client.cpp:1300
bool readingAppend(Reading &reading)
Append a single reading.
Definition: storage_client.cpp:145
Storage layer query container.
Definition: query.h:25
The management client class used by services and tasks to communicate with the management API of the ...
Definition: management_client.h:43
Result set.
Definition: resultset.h:32
bool unregisterTableNotification(const std::string &tableName, const std::string &key, std::vector< std::string > keyValues, const std::string &operation, const std::string &callbackUrl)
Unregister interest for a table name.
Definition: storage_client.cpp:1356
StorageClient(HttpClient *client)
Storage Client constructor stores the provided HttpClient into the map.
Definition: storage_client.cpp:61
An asset reading represented as a class.
Definition: reading.h:33
Reading set class.
Definition: reading_set.h:26
ReadingSet * queryTableToReadings(const std::string &tableName, const Query &query)
Query a table and return a ReadingSet pointer.
Definition: storage_client.cpp:518
bool createSchema(const std::string &)
Function to create Storage Schema.
Definition: storage_client.cpp:1752
Client for accessing the storage service.
Definition: storage_client.h:43
ResultSet * queryTable(const std::string &schema, const std::string &tablename, const Query &query)
Query a table.
Definition: storage_client.cpp:482
PurgeResult readingPurgeByAsset(const std::string &asset)
Purge the readings by asset name.
Definition: storage_client.cpp:439
bool deleteHttpClient()
Delete HttpClient object for current thread.
Definition: storage_client.cpp:90
Definition: expression.h:68
PurgeResult readingPurgeByAge(unsigned long age, unsigned long sent, bool purgeUnsent)
Purge the readings by age.
Definition: storage_client.cpp:376
int deleteTable(const std::string &schema, const std::string &tableName, const Query &query)
Delete from a table.
Definition: storage_client.cpp:1122
bool unregisterAssetNotification(const std::string &assetName, const std::string &callbackUrl)
Unregister interest for a Reading asset name.
Definition: storage_client.cpp:1258
Definition: purge_result.h:22
Definition: insert.h:145
ReadingSet * readingFetch(const unsigned long readingId, const unsigned long count)
Retrieve a set of readings for sending on the northbound interface of Fledge.
Definition: storage_client.cpp:338
Where clause in a selection of records.
Definition: where.h:31