Fledge
An open source edge computing platform for industrial users
Ingest Class Reference

The ingest class is used to ingest asset readings. More...

#include <ingest.h>

Inheritance diagram for Ingest:
Collaboration diagram for Ingest:

Public Member Functions

 Ingest (StorageClient &storage, const std::string &serviceName, const std::string &pluginName, ManagementClient *mgmtClient)
 Construct an Ingest class to handle the readings queue. More...
 
 ~Ingest ()
 Destructor for the Ingest class. More...
 
void ingest (const Reading &reading)
 Add a reading to the reading queue. More...
 
void ingest (const std::vector< Reading *> *vec)
 Add a set of readings to the reading queue. More...
 
void start (long timeout, unsigned int threshold)
 Start the ingest threads. More...
 
bool running ()
 Check if the ingest process is still running. More...
 
bool isStopping ()
 Check if a shutdown is requested.
 
bool isRunning ()
 
void processQueue ()
 Process the queue of readings. More...
 
void waitForQueue ()
 Wait for a period of time to allow the queue to build.
 
size_t queueLength ()
 Return the number of queued readings in the south service.
 
void updateStats (void)
 Update statistics for this south service. More...
 
int createStatsDbEntry (const std::string &assetName)
 Create a row for given assetName in statistics DB table, if not present already The key checked/created in the table is "<assetName>". More...
 
bool loadFilters (const std::string &categoryName)
 Load filter plugins. More...
 
void setTimeout (const long timeout)
 
void setThreshold (const unsigned int threshold)
 
void configChange (const std::string &, const std::string &)
 Configuration change for one of the filters or to the pipeline. More...
 
void configChildCreate (const std::string &, const std::string &, const std::string &)
 
void configChildDelete (const std::string &, const std::string &)
 
void shutdown ()
 
void restart ()
 
void unDeprecateAssetTrackingRecord (AssetTrackingTuple *currentTuple, const std::string &assetName, const std::string &event)
 Load an up-to-date AssetTracking record for the given parameters and un-deprecate AssetTracking record it has been found as deprecated Existing cache element is updated. More...
 
void unDeprecateStorageAssetTrackingRecord (StorageAssetTrackingTuple *currentTuple, const std::string &assetName, const std::string &, const unsigned int &)
 Load an up-to-date StorageAssetTracking record for the given parameters and un-deprecate StorageAssetTracking record it has been found as deprecated Existing cache element is updated. More...
 
void setStatistics (const std::string &option)
 Set the statistics option. More...
 
std::string getStringFromSet (const std::set< std::string > &dpSet)
 
void setFlowControl (unsigned int lowWater, unsigned int highWater)
 
void flowControl ()
 Implement flow control backoff for the async ingest mechanism. More...
 
void setPerfMon (PerformanceMonitor *mon)
 
void configureRateMonitor (long interval, long factor)
 Configure the ingest rate class with the collection interval and the sigma factor allowed before reporting. More...
 
- Public Member Functions inherited from ServiceHandler
virtual bool securityChange (const std::string &payload)
 

Static Public Member Functions

static void passToOnwardFilter (OUTPUT_HANDLE *outHandle, READINGSET *readings)
 Pass the current readings set to the next filter in the pipeline. More...
 
static void useFilteredData (OUTPUT_HANDLE *outHandle, READINGSET *readings)
 Use the current input readings (they have been filtered by all filters) More...
 

Detailed Description

The ingest class is used to ingest asset readings.

It maintains a queue of readings to be sent to storage, these are sent using a background thread that regularly wakes up and sends the queued readings.

Constructor & Destructor Documentation

◆ Ingest()

Ingest::Ingest ( StorageClient storage,
const std::string &  serviceName,
const std::string &  pluginName,
ManagementClient mgmtClient 
)

Construct an Ingest class to handle the readings queue.

A seperate thread is used to send the readings to the storage layer based on time. This thread in created in the constructor and will terminate when the destructor is called.

Parameters
storageThe storage client to use

◆ ~Ingest()

Ingest::~Ingest ( )

Destructor for the Ingest class.

Set's the running flag to false. This will cause the processing thread to drain the queue and then exit. Once this thread has exited the destructor will return.

Member Function Documentation

◆ configChange()

void Ingest::configChange ( const std::string &  ,
const std::string &   
)
virtual

Configuration change for one of the filters or to the pipeline.

Parameters
categoryThe name of the configuration category
newConfigThe new category contents

The category that has changed is the one for the south service itself. The only item that concerns us here is the filter item that defines the filter pipeline. We extract that item and check to see if it defines a pipeline that is different to the one we currently have.

If it is we destroy the current pipeline and create a new one.

Implements ServiceHandler.

◆ configureRateMonitor()

void Ingest::configureRateMonitor ( long  interval,
long  factor 
)

Configure the ingest rate class with the collection interval and the sigma factor allowed before reporting.

Parameters
intervalNumber of minutes to average ingest stats over
factorNumber of standard deviations to allow before reporting

◆ createStatsDbEntry()

int Ingest::createStatsDbEntry ( const std::string &  assetName)

Create a row for given assetName in statistics DB table, if not present already The key checked/created in the table is "<assetName>".

Parameters
assetNameAsset name for the plugin that is sending readings
Returns
int Return -1 on error, 0 if not required or 1 if the entry exists

◆ flowControl()

void Ingest::flowControl ( )

Implement flow control backoff for the async ingest mechanism.

The flow control is "soft" in that it will only wait for a maximum amount of time before continuing regardless of the queue length.

The mechanism is to have a high water and low water mark. When the queue get longer than the high water mark we wait until the queue drains below the low water mark before proceeding.

The wait is done with a backoff algorithm that start at AFC_SLEEP_INCREMENT and doubles each time we have not dropped below the low water mark. It will sleep for a maximum of AFC_SLEEP_MAX before testing again.

◆ ingest() [1/2]

void Ingest::ingest ( const Reading reading)

Add a reading to the reading queue.

Parameters
readingThe single reading to ingest

◆ ingest() [2/2]

void Ingest::ingest ( const std::vector< Reading *> *  vec)

Add a set of readings to the reading queue.

Parameters
vecA vector of readings to ingest

◆ loadFilters()

bool Ingest::loadFilters ( const std::string &  categoryName)

Load filter plugins.

Filters found in configuration are loaded and add to the Ingest class instance

Parameters
categoryNameConfiguration category name
ingestThe Ingest class reference Filters are added to m_filters member False for errors.
Returns
True if filters were loaded and initialised or there are no filters False with load/init errors

◆ passToOnwardFilter()

void Ingest::passToOnwardFilter ( OUTPUT_HANDLE *  outHandle,
READINGSET readingSet 
)
static

Pass the current readings set to the next filter in the pipeline.

Note: This routine must be passed to all filters "plugin_init" except the last one

Static method

Parameters
outHandlePointer to next filter
readingsCurrent readings set

◆ processQueue()

void Ingest::processQueue ( )

Process the queue of readings.

Send them to the storage layer as a block. If the append call fails requeue the readings for the next transmission.

In order not to lock the queue for an excessie time a new queue is created and the old one moved to a local variable. This minimise the time we hold the queue mutex to the time it takes to swap two variables.

'm_data' vector is ready to be sent to storage service.

Note: m_data might contain:

  • Readings set by the configured service "plugin" OR
  • filtered readings by filter plugins in 'readingSet' object: 1- values only 2- some readings removed 3- New set of readings

◆ running()

bool Ingest::running ( )

Check if the ingest process is still running.

This becomes false when the service is shutdown and is used to allow the queue to drain and then the processing routine to terminate.

◆ setStatistics()

void Ingest::setStatistics ( const std::string &  option)

Set the statistics option.

The statistics collection regime may be one of "per asset", "per service" or "per asset & service".

Parameters
optionThe desired statistics collection regime

◆ start()

void Ingest::start ( long  timeout,
unsigned int  threshold 
)

Start the ingest threads.

Parameters
timeoutMaximum time before sending a queue of readings in milliseconds
thresholdLength of queue before sending readings

◆ unDeprecateAssetTrackingRecord()

void Ingest::unDeprecateAssetTrackingRecord ( AssetTrackingTuple currentTuple,
const std::string &  assetName,
const std::string &  event 
)

Load an up-to-date AssetTracking record for the given parameters and un-deprecate AssetTracking record it has been found as deprecated Existing cache element is updated.

Parameters
currentTupleCurrent AssetTracking record for given assetName
assetNameAssetName to fetch from AssetTracking
eventThe event type to fetch

◆ unDeprecateStorageAssetTrackingRecord()

void Ingest::unDeprecateStorageAssetTrackingRecord ( StorageAssetTrackingTuple currentTuple,
const std::string &  assetName,
const std::string &  ,
const unsigned int &   
)

Load an up-to-date StorageAssetTracking record for the given parameters and un-deprecate StorageAssetTracking record it has been found as deprecated Existing cache element is updated.

Parameters
currentTupleCurrent StorageAssetTracking record for given assetName
assetNameAssetName to fetch from AssetTracking
datapointsThe datapoints comma separated list
countThe number of datapoints per asset

◆ updateStats()

void Ingest::updateStats ( void  )

Update statistics for this south service.

Successfully processed readings are reflected against plugin asset name and READINGS keys. Discarded readings stats are updated against DISCARDED key.

◆ useFilteredData()

void Ingest::useFilteredData ( OUTPUT_HANDLE *  outHandle,
READINGSET readingSet 
)
static

Use the current input readings (they have been filtered by all filters)

The assumption is that one of two things has happened.

  1. The filtering has all been done in place. In which case the m_data vector is in the ReadingSet passed in here.
  2. The filtering has created new ReadingSet in which case the reading vector must be copied into m_data from the ReadingSet.

Note: This routine must be passed to last filter "plugin_init" only

Static method

Parameters
outHandlePointer to Ingest class instance
readingSetFiltered reading set being added to Ingest::m_data

The documentation for this class was generated from the following files: