Fledge
An open source edge computing platform for industrial users
ingest.h
1 #ifndef _INGEST_H
2 #define _INGEST_H
3 /*
4  * Fledge reading ingest.
5  *
6  * Copyright (c) 2018 OSisoft, LLC
7  *
8  * Released under the Apache 2.0 Licence
9  *
10  * Author: Mark Riddoch, Massimiliano Pinto, Amandeep Singh Arora
11  */
12 #include <storage_client.h>
13 #include <reading.h>
14 #include <logger.h>
15 #include <vector>
16 #include <queue>
17 #include <thread>
18 #include <chrono>
19 #include <mutex>
20 #include <sstream>
21 #include <unordered_set>
22 #include <condition_variable>
23 #include <filter_plugin.h>
24 #include <filter_pipeline.h>
25 #include <asset_tracking.h>
26 #include <service_handler.h>
27 #include <set>
28 #include <perfmonitors.h>
29 
30 #define SERVICE_NAME "Fledge South"
31 
32 #define INGEST_SUFFIX "-Ingest" // Suffix for per service ingest statistic
33 
34 #define FLUSH_STATS_INTERVAL 5 // Period between flushing of stats to storage (seconds)
35 
36 #define STATS_UPDATE_FAIL_THRESHOLD 10 // After this many update fails try creating new stats
37 
38 #define DEPRECATED_CACHE_AGE 600 // Maximum allowed aged of the deprecated asset cache
39 
40 /*
41  * Constants related to flow control for async south services.
42  *
43  */
44 #define AFC_SLEEP_INCREMENT 20 // Number of milliseconds to wait for readings to drain
45 #define AFC_SLEEP_MAX 200 // Maximum sleep tiem in ms between tests
46 #define AFC_MAX_WAIT 5000 // Maximum amount of time we wait for the queue to drain
47 
48 class IngestRate;
49 
56 class Ingest : public ServiceHandler {
57 
58 public:
59  Ingest(StorageClient& storage,
60  const std::string& serviceName,
61  const std::string& pluginName,
62  ManagementClient *mgmtClient);
63  ~Ingest();
64 
65  void ingest(const Reading& reading);
66  void ingest(const std::vector<Reading *> *vec);
67  void start(long timeout, unsigned int threshold);
68  bool running();
69  bool isStopping();
70  bool isRunning() { return !m_shutdown; };
71  void processQueue();
72  void waitForQueue();
73  size_t queueLength();
74  void updateStats(void);
75  int createStatsDbEntry(const std::string& assetName);
76 
77  bool loadFilters(const std::string& categoryName);
78  static void passToOnwardFilter(OUTPUT_HANDLE *outHandle,
79  READINGSET* readings);
80  static void useFilteredData(OUTPUT_HANDLE *outHandle,
81  READINGSET* readings);
82 
83  void setTimeout(const long timeout) { m_timeout = timeout; };
84  void setThreshold(const unsigned int threshold) { m_queueSizeThreshold = threshold; };
85  void configChange(const std::string&, const std::string&);
86  void configChildCreate(const std::string& , const std::string&, const std::string&){};
87  void configChildDelete(const std::string& , const std::string&){};
88  void shutdown() {}; // Satisfy ServiceHandler
89  void restart() {}; // Satisfy ServiceHandler
91  const std::string& assetName,
92  const std::string& event);
94  const std::string& assetName,
95  const std::string&,
96  const unsigned int&);
97  void setStatistics(const std::string& option);
98 
99  std::string getStringFromSet(const std::set<std::string> &dpSet);
100  void setFlowControl(unsigned int lowWater, unsigned int highWater) { m_lowWater = lowWater; m_highWater = highWater; };
101  void flowControl();
102  void setPerfMon(PerformanceMonitor *mon)
103  {
104  m_performance = mon;
105  };
106  void configureRateMonitor(long interval, long factor);
107 
108 private:
109  void signalStatsUpdate() {
110  // Signal stats thread to update stats
111  std::lock_guard<std::mutex> guard(m_statsMutex);
112  m_statsCv.notify_all();
113  };
114  void logDiscardedStat() {
115  std::lock_guard<std::mutex> guard(m_statsMutex);
116  m_discardedReadings++;
117  };
118  long calculateWaitTime();
119  int createServiceStatsDbEntry();
120 
121  StorageClient& m_storage;
122  long m_timeout;
123  bool m_shutdown;
124  unsigned int m_queueSizeThreshold;
125  bool m_running;
126  std::string m_serviceName;
127  std::string m_pluginName;
128  ManagementClient *m_mgtClient;
129  // New data: queued
130  std::vector<Reading *>* m_queue;
131  std::mutex m_qMutex;
132  std::mutex m_statsMutex;
133  std::mutex m_pipelineMutex;
134  std::thread* m_thread;
135  std::thread* m_statsThread;
136  Logger* m_logger;
137  std::condition_variable m_cv;
138  std::condition_variable m_statsCv;
139  // Data ready to be filtered/sent
140  std::vector<Reading *>* m_data;
141  std::vector<std::vector<Reading *>*>
142  m_resendQueues;
143  std::queue<std::vector<Reading *>*>
144  m_fullQueues;
145  std::mutex m_fqMutex;
146  unsigned int m_discardedReadings; // discarded readings since last update to statistics table
147  FilterPipeline* m_filterPipeline;
148 
149  std::unordered_set<std::string> statsDbEntriesCache; // confirmed stats table entries
150  std::map<std::string, int> statsPendingEntries; // pending stats table entries
151  bool m_highLatency; // Flag to indicate we are exceeding latency request
152  bool m_10Latency; // Latency within 10%
153  time_t m_reportedLatencyTime;// Last tiem we reported high latency
154  int m_failCnt;
155  bool m_storageFailed;
156  int m_storesFailed;
157  int m_statsUpdateFails;
158  enum { STATS_BOTH, STATS_ASSET, STATS_SERVICE }
159  m_statisticsOption;
160  unsigned int m_highWater;
161  unsigned int m_lowWater;
162  AssetTrackingTable *m_deprecated;
163  time_t m_deprecatedAgeOut;
164  time_t m_deprecatedAgeOutStorage;
165  PerformanceMonitor *m_performance;
166  std::mutex m_useDataMutex;
167  IngestRate *m_ingestRate;
168 };
169 
170 #endif
Fledge Logger class used to log to syslog.
Definition: logger.h:26
A class to hold a set of asset tracking tuples that allows lookup by name.
Definition: asset_tracking.h:305
void setStatistics(const std::string &option)
Set the statistics option.
Definition: ingest.cpp:1470
bool running()
Check if the ingest process is still running.
Definition: ingest.cpp:412
void configChange(const std::string &, const std::string &)
Configuration change for one of the filters or to the pipeline.
Definition: ingest.cpp:1098
void start(long timeout, unsigned int threshold)
Start the ingest threads.
Definition: ingest.cpp:328
The AssetTrackingTuple class is used to represent an asset tracking tuple.
Definition: asset_tracking.h:47
void processQueue()
Process the queue of readings.
Definition: ingest.cpp:559
bool isStopping()
Check if a shutdown is requested.
Definition: ingest.cpp:421
void unDeprecateAssetTrackingRecord(AssetTrackingTuple *currentTuple, const std::string &assetName, const std::string &event)
Load an up-to-date AssetTracking record for the given parameters and un-deprecate AssetTracking recor...
Definition: ingest.cpp:1209
The management client class used by services and tasks to communicate with the management API of the ...
Definition: management_client.h:43
void flowControl()
Implement flow control backoff for the async ingest mechanism.
Definition: ingest.cpp:1512
Class to handle the performance monitors.
Definition: perfmonitors.h:35
ServiceHandler abstract class - the interface that services using the management API must provide...
Definition: service_handler.h:20
An asset reading represented as a class.
Definition: reading.h:33
void updateStats(void)
Update statistics for this south service.
Definition: ingest.cpp:163
Reading set class.
Definition: reading_set.h:26
static void useFilteredData(OUTPUT_HANDLE *outHandle, READINGSET *readings)
Use the current input readings (they have been filtered by all filters)
Definition: ingest.cpp:1069
size_t queueLength()
Return the number of queued readings in the south service.
Definition: ingest.cpp:1189
The ingest class is used to ingest asset readings.
Definition: ingest.h:56
void waitForQueue()
Wait for a period of time to allow the queue to build.
Definition: ingest.cpp:532
~Ingest()
Destructor for the Ingest class.
Definition: ingest.cpp:345
void configureRateMonitor(long interval, long factor)
Configure the ingest rate class with the collection interval and the sigma factor allowed before repo...
Definition: ingest.cpp:1545
void ingest(const Reading &reading)
Add a reading to the reading queue.
Definition: ingest.cpp:431
Client for accessing the storage service.
Definition: storage_client.h:43
The FilterPipeline class is used to represent a pipeline of filters applicable to a task/service...
Definition: filter_pipeline.h:28
bool loadFilters(const std::string &categoryName)
Load filter plugins.
Definition: ingest.cpp:992
void unDeprecateStorageAssetTrackingRecord(StorageAssetTrackingTuple *currentTuple, const std::string &assetName, const std::string &, const unsigned int &)
Load an up-to-date StorageAssetTracking record for the given parameters and un-deprecate StorageAsset...
Definition: ingest.cpp:1363
A class used to track and report on the ingest rates of a data stream.
Definition: ingest_rate.h:49
Ingest(StorageClient &storage, const std::string &serviceName, const std::string &pluginName, ManagementClient *mgmtClient)
Construct an Ingest class to handle the readings queue.
Definition: ingest.cpp:282
static void passToOnwardFilter(OUTPUT_HANDLE *outHandle, READINGSET *readings)
Pass the current readings set to the next filter in the pipeline.
Definition: ingest.cpp:1038
int createStatsDbEntry(const std::string &assetName)
Create a row for given assetName in statistics DB table, if not present already The key checked/creat...
Definition: ingest.cpp:59
Definition: asset_tracking.h:129