 |
Fledge
An open source edge computing platform for industrial users
|
1 #ifndef _PIPELINE_ELEMENT_H
2 #define _PIPELINE_ELEMENT_H
13 #include <config_category.h>
14 #include <management_client.h>
16 #include <plugin_manager.h>
17 #include <plugin_data.h>
18 #include <reading_set.h>
19 #include <filter_plugin.h>
20 #include <service_handler.h>
21 #include <config_handler.h>
22 #include <pipeline_debugger.h>
41 void setService(
const std::string& serviceName)
43 m_serviceName = serviceName;
52 std::vector<std::shared_ptr<Reading>>
54 static void ingest(
void *handle,
READINGSET *readings)
59 std::vector<std::string>& )
63 virtual bool isFilter()
67 virtual bool isBranch()
71 virtual void ingest(
READINGSET *readingSet) = 0;
72 virtual bool setup(
ManagementClient *mgmt,
void *ingest, std::map<std::string, PipelineElement*>& categories) = 0;
73 virtual bool init(OUTPUT_HANDLE* outHandle, OUTPUT_STREAM output) = 0;
75 virtual void reconfigure(
const std::string& )
78 virtual std::string getName() = 0;
79 virtual bool isReady() = 0;
81 std::string m_serviceName;
99 PipelineDebugger::DebuggerActions action =
100 m_debugger->
process(readingSet);
104 case PipelineDebugger::Block:
107 case PipelineDebugger::NoAction:
114 m_plugin->
ingest(readingSet);
121 bool setup(
ManagementClient *mgmt,
void *ingest, std::map<std::string, PipelineElement*>& categories);
122 bool init(OUTPUT_HANDLE* outHandle, OUTPUT_STREAM output);
125 bool isFilter() {
return true; };
126 std::string getCategoryName() {
return m_categoryName; };
127 bool persistData() {
return m_plugin->persistData(); };
128 void setPluginData(
PluginData *data) { m_plugin->m_plugin_data = data; };
129 std::string getPluginData() {
return m_plugin->m_plugin_data->
loadStoredData(m_serviceName + m_name); };
130 void setServiceName(
const std::string& name) { m_serviceName = name; };
131 std::string getName() {
return m_name; };
132 bool isReady() {
return true; };
134 PLUGIN_HANDLE loadFilterPlugin(
const std::string& filterName);
137 std::string m_categoryName;
138 std::string m_pluginName;
139 PLUGIN_HANDLE m_handle;
141 std::string m_serviceName;
153 std::string getName() {
return "Branch"; };
156 bool init(OUTPUT_HANDLE* outHandle, OUTPUT_STREAM output);
163 std::vector<PipelineElement *>&
168 void setFunctions(
void *onward,
void *use,
void *
ingest)
170 m_passOnward = onward;
175 static void branchHandler(
void *instance);
178 std::vector<PipelineElement *> m_branch;
179 std::thread *m_thread;
180 std::queue<READINGSET *> m_queue;
182 std::condition_variable m_cv;
186 bool m_shutdownCalled;
196 std::string getName() {
return "Writer"; };
199 bool init(OUTPUT_HANDLE* outHandle, OUTPUT_STREAM output);
203 OUTPUT_STREAM m_useData;
bool init(OUTPUT_HANDLE *outHandle, OUTPUT_STREAM output)
Initialise the pipeline filter ready for ingest of data.
Definition: pipeline_filter.cpp:162
DebuggerActions process(ReadingSet *readingSet)
Process a reading set as it flows through the pipeline.
Definition: pipeline_debugger.cpp:38
static Logger * getLogger()
Return the singleton instance of the logger class.
Definition: logger.cpp:184
bool attachDebugger()
Attach a debugger class to the pipeline.
Definition: pipeline_element.cpp:24
bool setupConfiguration(ManagementClient *mgtClient, std::vector< std::string > &children)
Setup the configuration for a branch in a pipeline.
Definition: pipeline_branch.cpp:66
void ingest(READINGSET *)
Call the loaded plugin "plugin_ingest" method.
Definition: filter_plugin.cpp:168
PipelineWriter()
Constructor for the pipeline writer, the element that sits at the end of every pipeline and branch.
Definition: pipeline_writer.cpp:23
void error(const std::string &msg,...)
Log a message at the level error.
Definition: logger.cpp:447
void detachDebugger()
Detach a pipeline debugger from the pipeline element.
Definition: pipeline_element.cpp:34
bool init(OUTPUT_HANDLE *outHandle, OUTPUT_STREAM output)
Initialise the pipeline branch.
Definition: pipeline_branch.cpp:110
Handler class within a service to manage configuration changes.
Definition: config_handler.h:25
The FilterPipeline class is used to represent a pipeline of filters applicable to a task/service.
Definition: filter_pipeline.h:28
void setDebuggerBuffer(unsigned int size)
Setup the size of the debug buffer.
Definition: pipeline_element.cpp:46
bool setup(ManagementClient *mgmt, void *ingest, std::map< std::string, PipelineElement * > &categories)
Setup the configuration categories for the branch element of a pipeline.
Definition: pipeline_branch.cpp:85
Reading set class.
Definition: reading_set.h:26
bool setupConfiguration(ManagementClient *mgtClient, std::vector< std::string > &children)
Setup the configuration for a filter in a pipeline.
Definition: pipeline_filter.cpp:61
Definition: filter_plugin.h:27
~PipelineFilter()
Destructor for the pipeline filter element.
Definition: pipeline_filter.cpp:50
void shutdown(ServiceHandler *serviceHandler, ConfigHandler *configHandler)
Shutdown the pipeline writer.
Definition: pipeline_writer.cpp:70
void reconfigure(const std::string &newConfig)
Reconfigure method.
Definition: pipeline_filter.cpp:218
PipelineFilter(const std::string &name, const ConfigCategory &filterDetails)
Construct the PipelineFilter class.
Definition: pipeline_filter.cpp:25
A pipeline element that represents a branch in the pipeline.
Definition: pipeline_element.h:148
The management client class used by services and tasks to communicate with the management API of the ...
Definition: management_client.h:43
bool isReady()
Return if the pipeline writer is ready to receive data.
Definition: pipeline_writer.cpp:77
Definition: config_category.h:56
bool setup(ManagementClient *mgmt, void *ingest, std::map< std::string, PipelineElement * > &categories)
Setup the pipeline writer.
Definition: pipeline_writer.cpp:52
std::string loadStoredData(const std::string &key)
Load stored data for a given key.
Definition: plugin_data.cpp:34
~PipelineBranch()
Destructor for the pipeline branch.
Definition: pipeline_branch.cpp:33
bool init(OUTPUT_HANDLE *outHandle, OUTPUT_STREAM output)
Initialise the pipeline writer.
Definition: pipeline_writer.cpp:60
Client for accessing the storage service.
Definition: storage_client.h:43
Definition: plugin_data.h:15
void ingest(READINGSET *readingSet)
Ingest a set of readings and pass on in the pipeline.
Definition: pipeline_branch.cpp:172
A pipeline element the runs a filter plugin.
Definition: pipeline_element.h:90
bool isReady()
Return if the branch is ready to be executed.
Definition: pipeline_branch.cpp:243
void shutdown(ServiceHandler *serviceHandler, ConfigHandler *configHandler)
Setup the configuration categories for the branch element of a pipeline.
Definition: pipeline_branch.cpp:216
The base pipeline element class.
Definition: pipeline_element.h:29
The debugger class for elements in a pipeline.
Definition: pipeline_debugger.h:22
A pipeline element that writes to a storage service or buffer.
Definition: pipeline_element.h:193
std::vector< std::shared_ptr< Reading > > getDebuggerBuffer()
Fetch the content of the debugger buffer.
Definition: pipeline_element.cpp:62
void shutdown(ServiceHandler *serviceHandler, ConfigHandler *configHandler)
Shutdown a pipeline element that is a filter.
Definition: pipeline_filter.cpp:187
bool setup(ManagementClient *mgmt, void *ingest, std::map< std::string, PipelineElement * > &categories)
Setup the configuration categories for the filter element in a pipeline.
Definition: pipeline_filter.cpp:127
PipelineBranch(FilterPipeline *parent)
Constructor for a branch in a filter pipeline.
Definition: pipeline_branch.cpp:22
void ingest(READINGSET *readingSet)
Ingest into a pipeline writer.
Definition: pipeline_writer.cpp:30
ServiceHandler abstract class - the interface that services using the management API must provide.
Definition: service_handler.h:20