Fledge
An open source edge computing platform for industrial users
filter_pipeline.h
1 #ifndef _FILTER_PIPELINE_H
2 #define _FILTER_PIPELINE_H
3 /*
4  * Fledge filter pipeline class.
5  *
6  * Copyright (c) 2018 Dianomic Systems
7  *
8  * Released under the Apache 2.0 Licence
9  *
10  * Author: Amandeep Singh Arora
11  */
12 #include <plugin.h>
13 #include <plugin_manager.h>
14 #include <config_category.h>
15 #include <management_client.h>
16 #include <plugin_data.h>
17 #include <reading_set.h>
18 #include <filter_plugin.h>
19 #include <service_handler.h>
20 #include <pipeline_element.h>
21 typedef void (*filterReadingSetFn)(OUTPUT_HANDLE *outHandle, READINGSET* readings);
22 
29 {
30 
31 public:
33  StorageClient& storage,
34  std::string serviceName);
36  PipelineElement *getFirstFilterPlugin()
37  {
38  return (m_filters.begin() == m_filters.end()) ?
39  NULL : *(m_filters.begin());
40  };
41  unsigned int getFilterCount() { return m_filters.size(); }
42  void configChange(const std::string&, const std::string&);
43 
44  // Cleanup the loaded filters
45  void cleanupFilters(const std::string& categoryName);
46  // Load filters as specified in the configuration
47  bool loadFilters(const std::string& categoryName);
48  // Setup the filter pipeline
49  bool setupFiltersPipeline(void *passToOnwardFilter,
50  void *useFilteredData,
51  void *ingest);
52  // Check FilterPipeline is ready for data ingest
53  bool isReady() { return m_ready; };
54  bool hasChanged(const std::string pipeline) const { return m_pipeline != pipeline; }
55  bool isShuttingDown() { return m_shutdown; };
56  void setShuttingDown() { m_shutdown = true; }
57  void execute();
58  void awaitCompletion();
59  void startBranch();
60  void completeBranch();
61 
62 private:
63  PLUGIN_HANDLE loadFilterPlugin(const std::string& filterName);
64  void loadPipeline(const rapidjson::Value& filters, std::vector<PipelineElement *>& pipeline);
65 
66 protected:
67  ManagementClient* mgtClient;
68  StorageClient& storage;
69  std::string serviceName;
70  std::vector<PipelineElement *>
71  m_filters;
72  std::map<std::string, PipelineElement *>
73  m_filterCategories;
74  std::string m_pipeline;
75  bool m_ready;
76  bool m_shutdown;
77  ServiceHandler *m_serviceHandler;
78  int m_activeBranches;
79  std::mutex m_actives;
80  std::condition_variable m_branchActivations;
81 };
82 
83 #endif
bool loadFilters(const std::string &categoryName)
Load all filter plugins in the pipeline.
Definition: filter_pipeline.cpp:78
bool setupFiltersPipeline(void *passToOnwardFilter, void *useFilteredData, void *ingest)
Set the filter pipeline.
Definition: filter_pipeline.cpp:237
~FilterPipeline()
FilterPipeline destructor.
Definition: filter_pipeline.cpp:39
The management client class used by services and tasks to communicate with the management API of the ...
Definition: management_client.h:43
void awaitCompletion()
Wait for all active branches of the pipeline to complete.
Definition: filter_pipeline.cpp:378
void completeBranch()
A branch in the pipeline has completed.
Definition: filter_pipeline.cpp:399
ServiceHandler abstract class - the interface that services using the management API must provide...
Definition: service_handler.h:20
Reading set class.
Definition: reading_set.h:26
void cleanupFilters(const std::string &categoryName)
Cleanup all the loaded filters.
Definition: filter_pipeline.cpp:320
void startBranch()
A new branch has started in the pipeline.
Definition: filter_pipeline.cpp:390
The base pipeline element class.
Definition: pipeline_element.h:28
Client for accessing the storage service.
Definition: storage_client.h:43
The FilterPipeline class is used to represent a pipeline of filters applicable to a task/service...
Definition: filter_pipeline.h:28
void configChange(const std::string &, const std::string &)
Configuration change for one of the filters.
Definition: filter_pipeline.cpp:356
void execute()
Called when we pass the data into the pipeline.
Definition: filter_pipeline.cpp:369
FilterPipeline(ManagementClient *mgtClient, StorageClient &storage, std::string serviceName)
FilterPipeline class constructor.
Definition: filter_pipeline.cpp:31