Fledge
An open source edge computing platform for industrial users
ingest.h
1 #ifndef _INGEST_H
2 #define _INGEST_H
3 /*
4  * Fledge reading ingest.
5  *
6  * Copyright (c) 2018 OSisoft, LLC
7  *
8  * Released under the Apache 2.0 Licence
9  *
10  * Author: Mark Riddoch, Massimiliano Pinto, Amandeep Singh Arora
11  */
12 #include <storage_client.h>
13 #include <reading.h>
14 #include <logger.h>
15 #include <vector>
16 #include <queue>
17 #include <thread>
18 #include <chrono>
19 #include <mutex>
20 #include <sstream>
21 #include <unordered_set>
22 #include <condition_variable>
23 #include <filter_plugin.h>
24 #include <filter_pipeline.h>
25 #include <asset_tracking.h>
26 #include <service_handler.h>
27 #include <set>
28 #include <perfmonitors.h>
29 
30 #define SERVICE_NAME "Fledge South"
31 
32 #define INGEST_SUFFIX "-Ingest" // Suffix for per service ingest statistic
33 
34 #define FLUSH_STATS_INTERVAL 5 // Period between flushing of stats to storage (seconds)
35 
36 #define STATS_UPDATE_FAIL_THRESHOLD 10 // After this many update fails try creating new stats
37 
38 #define DEPRECATED_CACHE_AGE 600 // Maximum allowed aged of the deprecated asset cache
39 
40 /*
41  * Constants related to flow control for async south services.
42  *
43  */
44 #define AFC_SLEEP_INCREMENT 20 // Number of milliseconds to wait for readings to drain
45 #define AFC_SLEEP_MAX 200 // Maximum sleep tiem in ms between tests
46 #define AFC_MAX_WAIT 5000 // Maximum amount of time we wait for the queue to drain
47 
48 class IngestRate;
49 
50 // Enum for service buffering type
51 enum class ServiceBufferingType {
52  UNLIMITED,
53  LIMITED
54 };
55 
56 // Enum for discard policy
57 enum class DiscardPolicy {
58  DISCARD_OLDEST,
59  REDUCE_FIDELITY,
60  DISCARD_NEWEST
61 };
62 
63 #define SERVICE_BUFFER_BUFFER_TYPE_DEFAULT ServiceBufferingType::UNLIMITED
64 #define SERVICE_BUFFER_DISCARD_POLICY_DEFAULT DiscardPolicy::DISCARD_OLDEST
65 #define SERVICE_BUFFER_SIZE_DEFAULT 1000
66 #define SERVICE_BUFFER_SIZE_MIN 1000
67 
74 class Ingest : public ServiceHandler {
75 
76 public:
77  Ingest(StorageClient& storage,
78  const std::string& serviceName,
79  const std::string& pluginName,
80  ManagementClient *mgmtClient);
81  ~Ingest();
82 
83  void ingest(const Reading& reading);
84  void ingest(const std::vector<Reading *> *vec);
85  void start(long timeout, unsigned int threshold);
86  bool running();
87  bool isStopping();
88  bool isRunning() { return !m_shutdown; };
89  void processQueue();
90  void waitForQueue();
91  size_t queueLength();
92  void updateStats(void);
93  int createStatsDbEntry(const std::string& assetName);
94 
95  bool loadFilters(const std::string& categoryName);
96  static void passToOnwardFilter(OUTPUT_HANDLE *outHandle,
97  READINGSET* readings);
98  static void useFilteredData(OUTPUT_HANDLE *outHandle,
99  READINGSET* readings);
100 
101  void setTimeout(const long timeout) { m_timeout = timeout; };
102  void setThreshold(const unsigned int threshold) { m_queueSizeThreshold = threshold; };
103  void configChange(const std::string&, const std::string&);
104  void configChildCreate(const std::string& , const std::string&, const std::string&){};
105  void configChildDelete(const std::string& , const std::string&){};
106  void shutdown() {}; // Satisfy ServiceHandler
107  void restart() {}; // Satisfy ServiceHandler
109  const std::string& assetName,
110  const std::string& event);
112  const std::string& assetName,
113  const std::string&,
114  const unsigned int&);
115  void setStatistics(const std::string& option);
116 
117  std::string getStringFromSet(const std::set<std::string> &dpSet);
118  void setFlowControl(unsigned int lowWater, unsigned int highWater) { m_lowWater = lowWater; m_highWater = highWater; };
119  void setResourceLimit(ServiceBufferingType serviceBufferingType, unsigned long serviceBufferSize, DiscardPolicy discardPolicy);
120  void flowControl();
121  void setPerfMon(PerformanceMonitor *mon)
122  {
123  m_performance = mon;
124  };
125  void configureRateMonitor(long interval, long factor);
126 
127  // Debugger entry points
128  bool attachDebugger()
129  {
130  if (m_filterPipeline)
131  {
132  m_debuggerAttached = true;
133  return m_filterPipeline->attachDebugger();
134  }
135  return false;
136  };
137  void detachDebugger()
138  {
139  if (m_filterPipeline)
140  {
141  m_debuggerAttached = false;
142  m_debuggerBufferSize = 1;
143  m_filterPipeline->detachDebugger();
144  }
145  };
146  void setDebuggerBuffer(unsigned int size)
147  {
148  if (m_filterPipeline)
149  {
150  m_debuggerBufferSize = size;
151  m_filterPipeline->setDebuggerBuffer(size);
152  }
153  };
154  std::string getDebuggerBuffer()
155  {
156  std::string rval;
157  if (m_filterPipeline)
158  rval = m_filterPipeline->getDebuggerBuffer();
159  return rval;
160  };
161  void isolate(bool isolate)
162  {
163  std::lock_guard<std::mutex> guard(m_isolateMutex);
164  m_isolate = isolate;
165  };
166  bool isolated()
167  {
168  std::lock_guard<std::mutex> guard(m_isolateMutex);
169  return m_isolate;
170  };
171  void replayDebugger()
172  {
173  if (m_filterPipeline)
174  m_filterPipeline->replayDebugger();
175  };
176 
177 private:
178  void signalStatsUpdate() {
179  // Signal stats thread to update stats
180  std::lock_guard<std::mutex> guard(m_statsMutex);
181  m_statsCv.notify_all();
182  };
183  void logDiscardedStat() {
184  std::lock_guard<std::mutex> guard(m_statsMutex);
185  m_discardedReadings++;
186  };
187  long calculateWaitTime();
188  int createServiceStatsDbEntry();
189  void discardOldest();
190  void discardNewest();
191  void reduceFidelity();
192  void enforceResourceLimits();
193 
194  StorageClient& m_storage;
195  long m_timeout;
196  bool m_shutdown;
197  unsigned int m_queueSizeThreshold;
198  bool m_running;
199  std::string m_serviceName;
200  std::string m_pluginName;
201  ManagementClient *m_mgtClient;
202  // New data: queued
203  std::vector<Reading *>* m_queue;
204  std::mutex m_qMutex;
205  std::mutex m_statsMutex;
206  std::mutex m_pipelineMutex;
207  std::thread* m_thread;
208  std::thread* m_statsThread;
209  Logger* m_logger;
210  std::condition_variable m_cv;
211  std::condition_variable m_statsCv;
212  // Data ready to be filtered/sent
213  std::vector<Reading *>* m_data;
214  std::vector<std::vector<Reading *>*>
215  m_resendQueues;
216  std::queue<std::vector<Reading *>*>
217  m_fullQueues;
218  std::mutex m_fqMutex;
219  unsigned int m_discardedReadings; // discarded readings since last update to statistics table
220  FilterPipeline* m_filterPipeline;
221 
222  std::unordered_set<std::string> statsDbEntriesCache; // confirmed stats table entries
223  std::map<std::string, int> statsPendingEntries; // pending stats table entries
224  bool m_highLatency; // Flag to indicate we are exceeding latency request
225  bool m_10Latency; // Latency within 10%
226  time_t m_reportedLatencyTime;// Last tiem we reported high latency
227  int m_failCnt;
228  bool m_storageFailed;
229  int m_storesFailed;
230  int m_statsUpdateFails;
231  enum { STATS_BOTH, STATS_ASSET, STATS_SERVICE }
232  m_statisticsOption;
233  unsigned int m_highWater;
234  unsigned int m_lowWater;
235  AssetTrackingTable *m_deprecated;
236  time_t m_deprecatedAgeOut;
237  time_t m_deprecatedAgeOutStorage;
238  PerformanceMonitor *m_performance;
239  std::mutex m_useDataMutex;
240  IngestRate *m_ingestRate;
241  std::mutex m_isolateMutex;
242  bool m_isolate;
243  bool m_debuggerAttached;
244  unsigned int m_debuggerBufferSize;
245  std::atomic<ServiceBufferingType> m_serviceBufferingType;
246  std::atomic<unsigned int> m_serviceBufferSize;
247  std::atomic<DiscardPolicy> m_discardPolicy;
248  bool m_resourceGovernorActive{false}; // Tracks if the resource governor is active
249  time_t m_lastFidelityReductionTimestamp{0}; // Used for "Reduce Fidelity"
250 };
251 
252 #endif
Ingest::running
bool running()
Check if the ingest process is still running.
Definition: ingest.cpp:415
AssetTrackingTable
A class to hold a set of asset tracking tuples that allows lookup by name.
Definition: asset_tracking.h:305
FilterPipeline::getDebuggerBuffer
std::string getDebuggerBuffer()
Get the debugger buffer contents for all the pipeline elements.
Definition: filter_pipeline.cpp:520
Ingest::createStatsDbEntry
int createStatsDbEntry(const std::string &assetName)
Create a row for given assetName in statistics DB table, if not present already The key checked/creat...
Definition: ingest.cpp:59
FilterPipeline::attachDebugger
bool attachDebugger()
Attach the debugger to the pipeline elements.
Definition: filter_pipeline.cpp:419
Ingest::waitForQueue
void waitForQueue()
Wait for a period of time to allow the queue to build.
Definition: ingest.cpp:715
FilterPipeline::setDebuggerBuffer
void setDebuggerBuffer(unsigned int size)
Set the debugger buffer size to the pipeline elements.
Definition: filter_pipeline.cpp:491
Ingest::ingest
void ingest(const Reading &reading)
Add a reading to the reading queue.
Definition: ingest.cpp:600
Reading
An asset reading represented as a class.
Definition: reading.h:33
FilterPipeline::detachDebugger
void detachDebugger()
Detach the debugger from the pipeline elements.
Definition: filter_pipeline.cpp:463
Ingest::~Ingest
~Ingest()
Destructor for the Ingest class.
Definition: ingest.cpp:348
FilterPipeline
The FilterPipeline class is used to represent a pipeline of filters applicable to a task/service.
Definition: filter_pipeline.h:28
AssetTrackingTuple
The AssetTrackingTuple class is used to represent an asset tracking tuple.
Definition: asset_tracking.h:47
Ingest
The ingest class is used to ingest asset readings.
Definition: ingest.h:74
Ingest::configureRateMonitor
void configureRateMonitor(long interval, long factor)
Configure the ingest rate class with the collection interval and the sigma factor allowed before repo...
Definition: ingest.cpp:1739
Ingest::unDeprecateStorageAssetTrackingRecord
void unDeprecateStorageAssetTrackingRecord(StorageAssetTrackingTuple *currentTuple, const std::string &assetName, const std::string &, const unsigned int &)
Load an up-to-date StorageAssetTracking record for the given parameters and un-deprecate StorageAsset...
Definition: ingest.cpp:1557
ReadingSet
Reading set class.
Definition: reading_set.h:26
IngestRate
A class used to track and report on the ingest rates of a data stream.
Definition: ingest_rate.h:49
Ingest::passToOnwardFilter
static void passToOnwardFilter(OUTPUT_HANDLE *outHandle, READINGSET *readings)
Pass the current readings set to the next filter in the pipeline.
Definition: ingest.cpp:1226
Ingest::configChange
void configChange(const std::string &, const std::string &)
Configuration change for one of the filters or to the pipeline.
Definition: ingest.cpp:1292
ManagementClient
The management client class used by services and tasks to communicate with the management API of the ...
Definition: management_client.h:43
Ingest::Ingest
Ingest(StorageClient &storage, const std::string &serviceName, const std::string &pluginName, ManagementClient *mgmtClient)
Construct an Ingest class to handle the readings queue.
Definition: ingest.cpp:282
Ingest::processQueue
void processQueue()
Process the queue of readings.
Definition: ingest.cpp:742
Ingest::setResourceLimit
void setResourceLimit(ServiceBufferingType serviceBufferingType, unsigned long serviceBufferSize, DiscardPolicy discardPolicy)
Set the resource limit for the service.
Definition: ingest.cpp:436
Ingest::flowControl
void flowControl()
Implement flow control backoff for the async ingest mechanism.
Definition: ingest.cpp:1706
Ingest::start
void start(long timeout, unsigned int threshold)
Start the ingest threads.
Definition: ingest.cpp:331
Ingest::updateStats
void updateStats(void)
Update statistics for this south service.
Definition: ingest.cpp:163
StorageAssetTrackingTuple
Definition: asset_tracking.h:129
Ingest::isStopping
bool isStopping()
Check if a shutdown is requested.
Definition: ingest.cpp:424
StorageClient
Client for accessing the storage service.
Definition: storage_client.h:43
Ingest::useFilteredData
static void useFilteredData(OUTPUT_HANDLE *outHandle, READINGSET *readings)
Use the current input readings (they have been filtered by all filters)
Definition: ingest.cpp:1257
Logger
Fledge Logger class used to log to syslog.
Definition: logger.h:42
FilterPipeline::replayDebugger
void replayDebugger()
Replay the data in the first saved buffer to the filter pipeline.
Definition: filter_pipeline.cpp:612
PerformanceMonitor
Class to handle the performance monitors.
Definition: perfmonitors.h:35
Ingest::unDeprecateAssetTrackingRecord
void unDeprecateAssetTrackingRecord(AssetTrackingTuple *currentTuple, const std::string &assetName, const std::string &event)
Load an up-to-date AssetTracking record for the given parameters and un-deprecate AssetTracking recor...
Definition: ingest.cpp:1403
Ingest::loadFilters
bool loadFilters(const std::string &categoryName)
Load filter plugins.
Definition: ingest.cpp:1174
Ingest::setStatistics
void setStatistics(const std::string &option)
Set the statistics option.
Definition: ingest.cpp:1664
ServiceHandler
ServiceHandler abstract class - the interface that services using the management API must provide.
Definition: service_handler.h:20
Ingest::queueLength
size_t queueLength()
Return the number of queued readings in the south service.
Definition: ingest.cpp:1383