 |
Fledge
An open source edge computing platform for industrial users
|
12 #include <storage_client.h>
21 #include <unordered_set>
22 #include <condition_variable>
23 #include <filter_plugin.h>
24 #include <filter_pipeline.h>
25 #include <asset_tracking.h>
26 #include <service_handler.h>
28 #include <perfmonitors.h>
30 #define SERVICE_NAME "Fledge South"
32 #define INGEST_SUFFIX "-Ingest" // Suffix for per service ingest statistic
34 #define FLUSH_STATS_INTERVAL 5 // Period between flushing of stats to storage (seconds)
36 #define STATS_UPDATE_FAIL_THRESHOLD 10 // After this many update fails try creating new stats
38 #define DEPRECATED_CACHE_AGE 600 // Maximum allowed aged of the deprecated asset cache
44 #define AFC_SLEEP_INCREMENT 20 // Number of milliseconds to wait for readings to drain
45 #define AFC_SLEEP_MAX 200 // Maximum sleep tiem in ms between tests
46 #define AFC_MAX_WAIT 5000 // Maximum amount of time we wait for the queue to drain
51 enum class ServiceBufferingType {
57 enum class DiscardPolicy {
63 #define SERVICE_BUFFER_BUFFER_TYPE_DEFAULT ServiceBufferingType::UNLIMITED
64 #define SERVICE_BUFFER_DISCARD_POLICY_DEFAULT DiscardPolicy::DISCARD_OLDEST
65 #define SERVICE_BUFFER_SIZE_DEFAULT 1000
66 #define SERVICE_BUFFER_SIZE_MIN 1000
78 const std::string& serviceName,
79 const std::string& pluginName,
84 void ingest(
const std::vector<Reading *> *vec);
85 void start(
long timeout,
unsigned int threshold);
88 bool isRunning() {
return !m_shutdown; };
101 void setTimeout(
const long timeout) { m_timeout = timeout; };
102 void setThreshold(
const unsigned int threshold) { m_queueSizeThreshold = threshold; };
103 void configChange(
const std::string&,
const std::string&);
104 void configChildCreate(
const std::string& ,
const std::string&,
const std::string&){};
105 void configChildDelete(
const std::string& ,
const std::string&){};
109 const std::string& assetName,
110 const std::string& event);
112 const std::string& assetName,
114 const unsigned int&);
117 std::string getStringFromSet(
const std::set<std::string> &dpSet);
118 void setFlowControl(
unsigned int lowWater,
unsigned int highWater) { m_lowWater = lowWater; m_highWater = highWater; };
119 void setResourceLimit(ServiceBufferingType serviceBufferingType,
unsigned long serviceBufferSize, DiscardPolicy discardPolicy);
128 bool attachDebugger()
130 if (m_filterPipeline)
132 m_debuggerAttached =
true;
137 void detachDebugger()
139 if (m_filterPipeline)
141 m_debuggerAttached =
false;
142 m_debuggerBufferSize = 1;
146 void setDebuggerBuffer(
unsigned int size)
148 if (m_filterPipeline)
150 m_debuggerBufferSize = size;
154 std::string getDebuggerBuffer()
157 if (m_filterPipeline)
161 void isolate(
bool isolate)
163 std::lock_guard<std::mutex> guard(m_isolateMutex);
168 std::lock_guard<std::mutex> guard(m_isolateMutex);
171 void replayDebugger()
173 if (m_filterPipeline)
178 void signalStatsUpdate() {
180 std::lock_guard<std::mutex> guard(m_statsMutex);
181 m_statsCv.notify_all();
183 void logDiscardedStat() {
184 std::lock_guard<std::mutex> guard(m_statsMutex);
185 m_discardedReadings++;
187 long calculateWaitTime();
188 int createServiceStatsDbEntry();
189 void discardOldest();
190 void discardNewest();
191 void reduceFidelity();
192 void enforceResourceLimits();
197 unsigned int m_queueSizeThreshold;
199 std::string m_serviceName;
200 std::string m_pluginName;
203 std::vector<Reading *>* m_queue;
205 std::mutex m_statsMutex;
206 std::mutex m_pipelineMutex;
207 std::thread* m_thread;
208 std::thread* m_statsThread;
210 std::condition_variable m_cv;
211 std::condition_variable m_statsCv;
213 std::vector<Reading *>* m_data;
214 std::vector<std::vector<Reading *>*>
216 std::queue<std::vector<Reading *>*>
218 std::mutex m_fqMutex;
219 unsigned int m_discardedReadings;
222 std::unordered_set<std::string> statsDbEntriesCache;
223 std::map<std::string, int> statsPendingEntries;
226 time_t m_reportedLatencyTime;
228 bool m_storageFailed;
230 int m_statsUpdateFails;
231 enum { STATS_BOTH, STATS_ASSET, STATS_SERVICE }
233 unsigned int m_highWater;
234 unsigned int m_lowWater;
236 time_t m_deprecatedAgeOut;
237 time_t m_deprecatedAgeOutStorage;
239 std::mutex m_useDataMutex;
241 std::mutex m_isolateMutex;
243 bool m_debuggerAttached;
244 unsigned int m_debuggerBufferSize;
245 std::atomic<ServiceBufferingType> m_serviceBufferingType;
246 std::atomic<unsigned int> m_serviceBufferSize;
247 std::atomic<DiscardPolicy> m_discardPolicy;
248 bool m_resourceGovernorActive{
false};
249 time_t m_lastFidelityReductionTimestamp{0};
bool running()
Check if the ingest process is still running.
Definition: ingest.cpp:415
A class to hold a set of asset tracking tuples that allows lookup by name.
Definition: asset_tracking.h:305
std::string getDebuggerBuffer()
Get the debugger buffer contents for all the pipeline elements.
Definition: filter_pipeline.cpp:520
int createStatsDbEntry(const std::string &assetName)
Create a row for given assetName in statistics DB table, if not present already The key checked/creat...
Definition: ingest.cpp:59
bool attachDebugger()
Attach the debugger to the pipeline elements.
Definition: filter_pipeline.cpp:419
void waitForQueue()
Wait for a period of time to allow the queue to build.
Definition: ingest.cpp:715
void setDebuggerBuffer(unsigned int size)
Set the debugger buffer size to the pipeline elements.
Definition: filter_pipeline.cpp:491
void ingest(const Reading &reading)
Add a reading to the reading queue.
Definition: ingest.cpp:600
An asset reading represented as a class.
Definition: reading.h:33
void detachDebugger()
Detach the debugger from the pipeline elements.
Definition: filter_pipeline.cpp:463
~Ingest()
Destructor for the Ingest class.
Definition: ingest.cpp:348
The FilterPipeline class is used to represent a pipeline of filters applicable to a task/service.
Definition: filter_pipeline.h:28
The AssetTrackingTuple class is used to represent an asset tracking tuple.
Definition: asset_tracking.h:47
The ingest class is used to ingest asset readings.
Definition: ingest.h:74
void configureRateMonitor(long interval, long factor)
Configure the ingest rate class with the collection interval and the sigma factor allowed before repo...
Definition: ingest.cpp:1739
void unDeprecateStorageAssetTrackingRecord(StorageAssetTrackingTuple *currentTuple, const std::string &assetName, const std::string &, const unsigned int &)
Load an up-to-date StorageAssetTracking record for the given parameters and un-deprecate StorageAsset...
Definition: ingest.cpp:1557
Reading set class.
Definition: reading_set.h:26
A class used to track and report on the ingest rates of a data stream.
Definition: ingest_rate.h:49
static void passToOnwardFilter(OUTPUT_HANDLE *outHandle, READINGSET *readings)
Pass the current readings set to the next filter in the pipeline.
Definition: ingest.cpp:1226
void configChange(const std::string &, const std::string &)
Configuration change for one of the filters or to the pipeline.
Definition: ingest.cpp:1292
The management client class used by services and tasks to communicate with the management API of the ...
Definition: management_client.h:43
Ingest(StorageClient &storage, const std::string &serviceName, const std::string &pluginName, ManagementClient *mgmtClient)
Construct an Ingest class to handle the readings queue.
Definition: ingest.cpp:282
void processQueue()
Process the queue of readings.
Definition: ingest.cpp:742
void setResourceLimit(ServiceBufferingType serviceBufferingType, unsigned long serviceBufferSize, DiscardPolicy discardPolicy)
Set the resource limit for the service.
Definition: ingest.cpp:436
void flowControl()
Implement flow control backoff for the async ingest mechanism.
Definition: ingest.cpp:1706
void start(long timeout, unsigned int threshold)
Start the ingest threads.
Definition: ingest.cpp:331
void updateStats(void)
Update statistics for this south service.
Definition: ingest.cpp:163
Definition: asset_tracking.h:129
bool isStopping()
Check if a shutdown is requested.
Definition: ingest.cpp:424
Client for accessing the storage service.
Definition: storage_client.h:43
static void useFilteredData(OUTPUT_HANDLE *outHandle, READINGSET *readings)
Use the current input readings (they have been filtered by all filters)
Definition: ingest.cpp:1257
Fledge Logger class used to log to syslog.
Definition: logger.h:42
void replayDebugger()
Replay the data in the first saved buffer to the filter pipeline.
Definition: filter_pipeline.cpp:612
void unDeprecateAssetTrackingRecord(AssetTrackingTuple *currentTuple, const std::string &assetName, const std::string &event)
Load an up-to-date AssetTracking record for the given parameters and un-deprecate AssetTracking recor...
Definition: ingest.cpp:1403
bool loadFilters(const std::string &categoryName)
Load filter plugins.
Definition: ingest.cpp:1174
void setStatistics(const std::string &option)
Set the statistics option.
Definition: ingest.cpp:1664
ServiceHandler abstract class - the interface that services using the management API must provide.
Definition: service_handler.h:20
size_t queueLength()
Return the number of queued readings in the south service.
Definition: ingest.cpp:1383