12 #include <storage_client.h> 21 #include <unordered_set> 22 #include <condition_variable> 23 #include <filter_plugin.h> 24 #include <filter_pipeline.h> 25 #include <asset_tracking.h> 26 #include <service_handler.h> 28 #include <perfmonitors.h> 30 #define SERVICE_NAME "Fledge South" 32 #define INGEST_SUFFIX "-Ingest" // Suffix for per service ingest statistic 34 #define FLUSH_STATS_INTERVAL 5 // Period between flushing of stats to storage (seconds) 36 #define STATS_UPDATE_FAIL_THRESHOLD 10 // After this many update fails try creating new stats 38 #define DEPRECATED_CACHE_AGE 600 // Maximum allowed aged of the deprecated asset cache 44 #define AFC_SLEEP_INCREMENT 20 // Number of milliseconds to wait for readings to drain 45 #define AFC_SLEEP_MAX 200 // Maximum sleep tiem in ms between tests 46 #define AFC_MAX_WAIT 5000 // Maximum amount of time we wait for the queue to drain 60 const std::string& serviceName,
61 const std::string& pluginName,
66 void ingest(
const std::vector<Reading *> *vec);
67 void start(
long timeout,
unsigned int threshold);
70 bool isRunning() {
return !m_shutdown; };
83 void setTimeout(
const long timeout) { m_timeout = timeout; };
84 void setThreshold(
const unsigned int threshold) { m_queueSizeThreshold = threshold; };
85 void configChange(
const std::string&,
const std::string&);
86 void configChildCreate(
const std::string& ,
const std::string&,
const std::string&){};
87 void configChildDelete(
const std::string& ,
const std::string&){};
91 const std::string& assetName,
92 const std::string& event);
94 const std::string& assetName,
99 std::string getStringFromSet(
const std::set<std::string> &dpSet);
100 void setFlowControl(
unsigned int lowWater,
unsigned int highWater) { m_lowWater = lowWater; m_highWater = highWater; };
109 void signalStatsUpdate() {
111 std::lock_guard<std::mutex> guard(m_statsMutex);
112 m_statsCv.notify_all();
114 void logDiscardedStat() {
115 std::lock_guard<std::mutex> guard(m_statsMutex);
116 m_discardedReadings++;
118 long calculateWaitTime();
119 int createServiceStatsDbEntry();
124 unsigned int m_queueSizeThreshold;
126 std::string m_serviceName;
127 std::string m_pluginName;
130 std::vector<Reading *>* m_queue;
132 std::mutex m_statsMutex;
133 std::mutex m_pipelineMutex;
134 std::thread* m_thread;
135 std::thread* m_statsThread;
137 std::condition_variable m_cv;
138 std::condition_variable m_statsCv;
140 std::vector<Reading *>* m_data;
141 std::vector<std::vector<Reading *>*>
143 std::queue<std::vector<Reading *>*>
145 std::mutex m_fqMutex;
146 unsigned int m_discardedReadings;
149 std::unordered_set<std::string> statsDbEntriesCache;
150 std::map<std::string, int> statsPendingEntries;
153 time_t m_reportedLatencyTime;
155 bool m_storageFailed;
157 int m_statsUpdateFails;
158 enum { STATS_BOTH, STATS_ASSET, STATS_SERVICE }
160 unsigned int m_highWater;
161 unsigned int m_lowWater;
163 time_t m_deprecatedAgeOut;
164 time_t m_deprecatedAgeOutStorage;
166 std::mutex m_useDataMutex;
Fledge Logger class used to log to syslog.
Definition: logger.h:26
A class to hold a set of asset tracking tuples that allows lookup by name.
Definition: asset_tracking.h:305
void setStatistics(const std::string &option)
Set the statistics option.
Definition: ingest.cpp:1470
bool running()
Check if the ingest process is still running.
Definition: ingest.cpp:412
void configChange(const std::string &, const std::string &)
Configuration change for one of the filters or to the pipeline.
Definition: ingest.cpp:1098
void start(long timeout, unsigned int threshold)
Start the ingest threads.
Definition: ingest.cpp:328
The AssetTrackingTuple class is used to represent an asset tracking tuple.
Definition: asset_tracking.h:47
void processQueue()
Process the queue of readings.
Definition: ingest.cpp:559
bool isStopping()
Check if a shutdown is requested.
Definition: ingest.cpp:421
void unDeprecateAssetTrackingRecord(AssetTrackingTuple *currentTuple, const std::string &assetName, const std::string &event)
Load an up-to-date AssetTracking record for the given parameters and un-deprecate AssetTracking recor...
Definition: ingest.cpp:1209
The management client class used by services and tasks to communicate with the management API of the ...
Definition: management_client.h:43
void flowControl()
Implement flow control backoff for the async ingest mechanism.
Definition: ingest.cpp:1512
ServiceHandler abstract class - the interface that services using the management API must provide...
Definition: service_handler.h:20
An asset reading represented as a class.
Definition: reading.h:33
void updateStats(void)
Update statistics for this south service.
Definition: ingest.cpp:163
Reading set class.
Definition: reading_set.h:26
static void useFilteredData(OUTPUT_HANDLE *outHandle, READINGSET *readings)
Use the current input readings (they have been filtered by all filters)
Definition: ingest.cpp:1069
size_t queueLength()
Return the number of queued readings in the south service.
Definition: ingest.cpp:1189
The ingest class is used to ingest asset readings.
Definition: ingest.h:56
void waitForQueue()
Wait for a period of time to allow the queue to build.
Definition: ingest.cpp:532
~Ingest()
Destructor for the Ingest class.
Definition: ingest.cpp:345
void configureRateMonitor(long interval, long factor)
Configure the ingest rate class with the collection interval and the sigma factor allowed before repo...
Definition: ingest.cpp:1545
void ingest(const Reading &reading)
Add a reading to the reading queue.
Definition: ingest.cpp:431
Client for accessing the storage service.
Definition: storage_client.h:43
The FilterPipeline class is used to represent a pipeline of filters applicable to a task/service...
Definition: filter_pipeline.h:28
bool loadFilters(const std::string &categoryName)
Load filter plugins.
Definition: ingest.cpp:992
void unDeprecateStorageAssetTrackingRecord(StorageAssetTrackingTuple *currentTuple, const std::string &assetName, const std::string &, const unsigned int &)
Load an up-to-date StorageAssetTracking record for the given parameters and un-deprecate StorageAsset...
Definition: ingest.cpp:1363
A class used to track and report on the ingest rates of a data stream.
Definition: ingest_rate.h:49
Ingest(StorageClient &storage, const std::string &serviceName, const std::string &pluginName, ManagementClient *mgmtClient)
Construct an Ingest class to handle the readings queue.
Definition: ingest.cpp:282
static void passToOnwardFilter(OUTPUT_HANDLE *outHandle, READINGSET *readings)
Pass the current readings set to the next filter in the pipeline.
Definition: ingest.cpp:1038
int createStatsDbEntry(const std::string &assetName)
Create a row for given assetName in statistics DB table, if not present already The key checked/creat...
Definition: ingest.cpp:59
Definition: asset_tracking.h:129