![]() |
Fledge
An open source edge computing platform for industrial users
|
The ingest class is used to ingest asset readings. More...
#include <ingest.h>
Public Member Functions | |
Ingest (StorageClient &storage, const std::string &serviceName, const std::string &pluginName, ManagementClient *mgmtClient) | |
Construct an Ingest class to handle the readings queue. More... | |
~Ingest () | |
Destructor for the Ingest class. More... | |
void | ingest (const Reading &reading) |
Add a reading to the reading queue. More... | |
void | ingest (const std::vector< Reading *> *vec) |
Add a set of readings to the reading queue. More... | |
void | start (long timeout, unsigned int threshold) |
Start the ingest threads. More... | |
bool | running () |
Check if the ingest process is still running. More... | |
bool | isStopping () |
Check if a shutdown is requested. | |
bool | isRunning () |
void | processQueue () |
Process the queue of readings. More... | |
void | waitForQueue () |
Wait for a period of time to allow the queue to build. | |
size_t | queueLength () |
Return the number of queued readings in the south service. | |
void | updateStats (void) |
Update statistics for this south service. More... | |
int | createStatsDbEntry (const std::string &assetName) |
Create a row for given assetName in statistics DB table, if not present already The key checked/created in the table is "<assetName>". More... | |
bool | loadFilters (const std::string &categoryName) |
Load filter plugins. More... | |
void | setTimeout (const long timeout) |
void | setThreshold (const unsigned int threshold) |
void | configChange (const std::string &, const std::string &) |
Configuration change for one of the filters or to the pipeline. More... | |
void | configChildCreate (const std::string &, const std::string &, const std::string &) |
void | configChildDelete (const std::string &, const std::string &) |
void | shutdown () |
void | restart () |
void | unDeprecateAssetTrackingRecord (AssetTrackingTuple *currentTuple, const std::string &assetName, const std::string &event) |
Load an up-to-date AssetTracking record for the given parameters and un-deprecate AssetTracking record it has been found as deprecated Existing cache element is updated. More... | |
void | unDeprecateStorageAssetTrackingRecord (StorageAssetTrackingTuple *currentTuple, const std::string &assetName, const std::string &, const unsigned int &) |
Load an up-to-date StorageAssetTracking record for the given parameters and un-deprecate StorageAssetTracking record it has been found as deprecated Existing cache element is updated. More... | |
void | setStatistics (const std::string &option) |
Set the statistics option. More... | |
std::string | getStringFromSet (const std::set< std::string > &dpSet) |
void | setFlowControl (unsigned int lowWater, unsigned int highWater) |
void | flowControl () |
Implement flow control backoff for the async ingest mechanism. More... | |
void | setPerfMon (PerformanceMonitor *mon) |
void | configureRateMonitor (long interval, long factor) |
Configure the ingest rate class with the collection interval and the sigma factor allowed before reporting. More... | |
![]() | |
virtual bool | securityChange (const std::string &payload) |
Static Public Member Functions | |
static void | passToOnwardFilter (OUTPUT_HANDLE *outHandle, READINGSET *readings) |
Pass the current readings set to the next filter in the pipeline. More... | |
static void | useFilteredData (OUTPUT_HANDLE *outHandle, READINGSET *readings) |
Use the current input readings (they have been filtered by all filters) More... | |
The ingest class is used to ingest asset readings.
It maintains a queue of readings to be sent to storage, these are sent using a background thread that regularly wakes up and sends the queued readings.
Ingest::Ingest | ( | StorageClient & | storage, |
const std::string & | serviceName, | ||
const std::string & | pluginName, | ||
ManagementClient * | mgmtClient | ||
) |
Construct an Ingest class to handle the readings queue.
A seperate thread is used to send the readings to the storage layer based on time. This thread in created in the constructor and will terminate when the destructor is called.
storage | The storage client to use |
Ingest::~Ingest | ( | ) |
Destructor for the Ingest class.
Set's the running flag to false. This will cause the processing thread to drain the queue and then exit. Once this thread has exited the destructor will return.
|
virtual |
Configuration change for one of the filters or to the pipeline.
category | The name of the configuration category |
newConfig | The new category contents |
The category that has changed is the one for the south service itself. The only item that concerns us here is the filter item that defines the filter pipeline. We extract that item and check to see if it defines a pipeline that is different to the one we currently have.
If it is we destroy the current pipeline and create a new one.
Implements ServiceHandler.
void Ingest::configureRateMonitor | ( | long | interval, |
long | factor | ||
) |
Configure the ingest rate class with the collection interval and the sigma factor allowed before reporting.
interval | Number of minutes to average ingest stats over |
factor | Number of standard deviations to allow before reporting |
int Ingest::createStatsDbEntry | ( | const std::string & | assetName | ) |
Create a row for given assetName in statistics DB table, if not present already The key checked/created in the table is "<assetName>".
assetName | Asset name for the plugin that is sending readings |
void Ingest::flowControl | ( | ) |
Implement flow control backoff for the async ingest mechanism.
The flow control is "soft" in that it will only wait for a maximum amount of time before continuing regardless of the queue length.
The mechanism is to have a high water and low water mark. When the queue get longer than the high water mark we wait until the queue drains below the low water mark before proceeding.
The wait is done with a backoff algorithm that start at AFC_SLEEP_INCREMENT and doubles each time we have not dropped below the low water mark. It will sleep for a maximum of AFC_SLEEP_MAX before testing again.
void Ingest::ingest | ( | const Reading & | reading | ) |
Add a reading to the reading queue.
reading | The single reading to ingest |
void Ingest::ingest | ( | const std::vector< Reading *> * | vec | ) |
Add a set of readings to the reading queue.
vec | A vector of readings to ingest |
bool Ingest::loadFilters | ( | const std::string & | categoryName | ) |
Load filter plugins.
Filters found in configuration are loaded and add to the Ingest class instance
categoryName | Configuration category name |
ingest | The Ingest class reference Filters are added to m_filters member False for errors. |
|
static |
Pass the current readings set to the next filter in the pipeline.
Note: This routine must be passed to all filters "plugin_init" except the last one
Static method
outHandle | Pointer to next filter |
readings | Current readings set |
void Ingest::processQueue | ( | ) |
Process the queue of readings.
Send them to the storage layer as a block. If the append call fails requeue the readings for the next transmission.
In order not to lock the queue for an excessie time a new queue is created and the old one moved to a local variable. This minimise the time we hold the queue mutex to the time it takes to swap two variables.
'm_data' vector is ready to be sent to storage service.
Note: m_data might contain:
bool Ingest::running | ( | ) |
Check if the ingest process is still running.
This becomes false when the service is shutdown and is used to allow the queue to drain and then the processing routine to terminate.
void Ingest::setStatistics | ( | const std::string & | option | ) |
Set the statistics option.
The statistics collection regime may be one of "per asset", "per service" or "per asset & service".
option | The desired statistics collection regime |
void Ingest::start | ( | long | timeout, |
unsigned int | threshold | ||
) |
Start the ingest threads.
timeout | Maximum time before sending a queue of readings in milliseconds |
threshold | Length of queue before sending readings |
void Ingest::unDeprecateAssetTrackingRecord | ( | AssetTrackingTuple * | currentTuple, |
const std::string & | assetName, | ||
const std::string & | event | ||
) |
Load an up-to-date AssetTracking record for the given parameters and un-deprecate AssetTracking record it has been found as deprecated Existing cache element is updated.
currentTuple | Current AssetTracking record for given assetName |
assetName | AssetName to fetch from AssetTracking |
event | The event type to fetch |
void Ingest::unDeprecateStorageAssetTrackingRecord | ( | StorageAssetTrackingTuple * | currentTuple, |
const std::string & | assetName, | ||
const std::string & | , | ||
const unsigned int & | |||
) |
Load an up-to-date StorageAssetTracking record for the given parameters and un-deprecate StorageAssetTracking record it has been found as deprecated Existing cache element is updated.
currentTuple | Current StorageAssetTracking record for given assetName |
assetName | AssetName to fetch from AssetTracking |
datapoints | The datapoints comma separated list |
count | The number of datapoints per asset |
void Ingest::updateStats | ( | void | ) |
Update statistics for this south service.
Successfully processed readings are reflected against plugin asset name and READINGS keys. Discarded readings stats are updated against DISCARDED key.
|
static |
Use the current input readings (they have been filtered by all filters)
The assumption is that one of two things has happened.
Note: This routine must be passed to last filter "plugin_init" only
Static method
outHandle | Pointer to Ingest class instance |
readingSet | Filtered reading set being added to Ingest::m_data |