Fledge
An open source edge computing platform for industrial users
pipeline_element.h
1 #ifndef _PIPELINE_ELEMENT_H
2 #define _PIPELINE_ELEMENT_H
3 /*
4  * Fledge filter pipeline elements.
5  *
6  * Copyright (c) 2024 Dianomic Systems
7  *
8  * Released under the Apache 2.0 Licence
9  *
10  * Author: Mark Riddoch
11  */
12 #include <string>
13 #include <config_category.h>
14 #include <management_client.h>
15 #include <plugin.h>
16 #include <plugin_manager.h>
17 #include <plugin_data.h>
18 #include <reading_set.h>
19 #include <filter_plugin.h>
20 #include <service_handler.h>
21 #include <config_handler.h>
22 #include <pipeline_debugger.h>
23 
24 class FilterPipeline;
25 
30  public:
31  PipelineElement() : m_next(NULL), m_storage(NULL), m_debugger(NULL) {};
32  virtual ~PipelineElement() {};
33  void setNext(PipelineElement *next)
34  {
35  m_next = next;
36  };
37  PipelineElement *getNext()
38  {
39  return m_next;
40  };
41  void setService(const std::string& serviceName)
42  {
43  m_serviceName = serviceName;
44  };
45  void setStorage(StorageClient *storage)
46  {
47  m_storage = storage;
48  };
49  bool attachDebugger();
50  void detachDebugger();
51  void setDebuggerBuffer(unsigned int size);
52  std::vector<std::shared_ptr<Reading>>
54  static void ingest(void *handle, READINGSET *readings)
55  {
56  ((PipelineElement *)handle)->ingest(readings);
57  };
58  virtual bool setupConfiguration(ManagementClient * /* mgtClient */,
59  std::vector<std::string>& /* children */)
60  {
61  return false;
62  };
63  virtual bool isFilter()
64  {
65  return false;
66  };
67  virtual bool isBranch()
68  {
69  return false;
70  };
71  virtual void ingest(READINGSET *readingSet) = 0;
72  virtual bool setup(ManagementClient *mgmt, void *ingest, std::map<std::string, PipelineElement*>& categories) = 0;
73  virtual bool init(OUTPUT_HANDLE* outHandle, OUTPUT_STREAM output) = 0;
74  virtual void shutdown(ServiceHandler *serviceHandler, ConfigHandler *configHandler) = 0;
75  virtual void reconfigure(const std::string& /* newConfig */)
76  {
77  };
78  virtual std::string getName() = 0;
79  virtual bool isReady() = 0;
80  protected:
81  std::string m_serviceName;
82  PipelineElement *m_next;
83  StorageClient *m_storage;
84  PipelineDebugger *m_debugger;
85 };
86 
91  public:
92  PipelineFilter(const std::string& name, const ConfigCategory& filterDetails);
94  bool setupConfiguration(ManagementClient *mgtClient, std::vector<std::string>& children);
95  void ingest(READINGSET *readingSet)
96  {
97  if (m_debugger)
98  {
99  PipelineDebugger::DebuggerActions action =
100  m_debugger->process(readingSet);
101 
102  switch (action)
103  {
104  case PipelineDebugger::Block:
105  delete readingSet;
106  return;
107  case PipelineDebugger::NoAction:
108  break;
109  }
110 
111  }
112  if (m_plugin)
113  {
114  m_plugin->ingest(readingSet);
115  }
116  else
117  {
118  Logger::getLogger()->error("Pipeline filter %s has no plugin associated with it.", m_name.c_str());
119  }
120  };
121  bool setup(ManagementClient *mgmt, void *ingest, std::map<std::string, PipelineElement*>& categories);
122  bool init(OUTPUT_HANDLE* outHandle, OUTPUT_STREAM output);
123  void shutdown(ServiceHandler *serviceHandler, ConfigHandler *configHandler);
124  void reconfigure(const std::string& newConfig);
125  bool isFilter() { return true; };
126  std::string getCategoryName() { return m_categoryName; };
127  bool persistData() { return m_plugin->persistData(); };
128  void setPluginData(PluginData *data) { m_plugin->m_plugin_data = data; };
129  std::string getPluginData() { return m_plugin->m_plugin_data->loadStoredData(m_serviceName + m_name); };
130  void setServiceName(const std::string& name) { m_serviceName = name; };
131  std::string getName() { return m_name; };
132  bool isReady() { return true; };
133  private:
134  PLUGIN_HANDLE loadFilterPlugin(const std::string& filterName);
135  private:
136  std::string m_name; // The name of the filter instance
137  std::string m_categoryName;
138  std::string m_pluginName;
139  PLUGIN_HANDLE m_handle;
140  FilterPlugin *m_plugin;
141  std::string m_serviceName;
142  ConfigCategory m_updatedCfg;
143 };
144 
149  public:
151  ~PipelineBranch();
152  void ingest(READINGSET *readingSet);
153  std::string getName() { return "Branch"; };
154  bool setupConfiguration(ManagementClient *mgtClient, std::vector<std::string>& children);
155  bool setup(ManagementClient *mgmt, void *ingest, std::map<std::string, PipelineElement*>& categories);
156  bool init(OUTPUT_HANDLE* outHandle, OUTPUT_STREAM output);
157  void shutdown(ServiceHandler *serviceHandler, ConfigHandler *configHandler);
158  bool isReady();
159  bool isBranch()
160  {
161  return true;
162  };
163  std::vector<PipelineElement *>&
164  getBranchElements()
165  {
166  return m_branch;
167  };
168  void setFunctions(void *onward, void *use, void *ingest)
169  {
170  m_passOnward = onward;
171  m_useData = use;
172  m_ingest = ingest;
173  };
174  private:
175  static void branchHandler(void *instance);
176  void handler();
177  private:
178  std::vector<PipelineElement *> m_branch;
179  std::thread *m_thread;
180  std::queue<READINGSET *> m_queue;
181  std::mutex m_mutex;
182  std::condition_variable m_cv;
183  void *m_passOnward;
184  void *m_useData;
185  void *m_ingest;
186  bool m_shutdownCalled;
187  FilterPipeline *m_pipeline;
188 };
189 
194  public:
195  PipelineWriter();
196  std::string getName() { return "Writer"; };
197  void ingest(READINGSET *readingSet);
198  bool setup(ManagementClient *mgmt, void *ingest, std::map<std::string, PipelineElement*>& categories);
199  bool init(OUTPUT_HANDLE* outHandle, OUTPUT_STREAM output);
200  void shutdown(ServiceHandler *serviceHandler, ConfigHandler *configHandler);
201  bool isReady();
202  private:
203  OUTPUT_STREAM m_useData;
204  void *m_ingest;
205 };
206 
207 #endif
PipelineFilter::init
bool init(OUTPUT_HANDLE *outHandle, OUTPUT_STREAM output)
Initialise the pipeline filter ready for ingest of data.
Definition: pipeline_filter.cpp:162
PipelineDebugger::process
DebuggerActions process(ReadingSet *readingSet)
Process a reading set as it flows through the pipeline.
Definition: pipeline_debugger.cpp:38
Logger::getLogger
static Logger * getLogger()
Return the singleton instance of the logger class.
Definition: logger.cpp:184
PipelineElement::attachDebugger
bool attachDebugger()
Attach a debugger class to the pipeline.
Definition: pipeline_element.cpp:24
PipelineBranch::setupConfiguration
bool setupConfiguration(ManagementClient *mgtClient, std::vector< std::string > &children)
Setup the configuration for a branch in a pipeline.
Definition: pipeline_branch.cpp:66
FilterPlugin::ingest
void ingest(READINGSET *)
Call the loaded plugin "plugin_ingest" method.
Definition: filter_plugin.cpp:168
PipelineWriter::PipelineWriter
PipelineWriter()
Constructor for the pipeline writer, the element that sits at the end of every pipeline and branch.
Definition: pipeline_writer.cpp:23
Logger::error
void error(const std::string &msg,...)
Log a message at the level error.
Definition: logger.cpp:447
PipelineElement::detachDebugger
void detachDebugger()
Detach a pipeline debugger from the pipeline element.
Definition: pipeline_element.cpp:34
PipelineBranch::init
bool init(OUTPUT_HANDLE *outHandle, OUTPUT_STREAM output)
Initialise the pipeline branch.
Definition: pipeline_branch.cpp:110
ConfigHandler
Handler class within a service to manage configuration changes.
Definition: config_handler.h:25
FilterPipeline
The FilterPipeline class is used to represent a pipeline of filters applicable to a task/service.
Definition: filter_pipeline.h:28
PipelineElement::setDebuggerBuffer
void setDebuggerBuffer(unsigned int size)
Setup the size of the debug buffer.
Definition: pipeline_element.cpp:46
PipelineBranch::setup
bool setup(ManagementClient *mgmt, void *ingest, std::map< std::string, PipelineElement * > &categories)
Setup the configuration categories for the branch element of a pipeline.
Definition: pipeline_branch.cpp:85
ReadingSet
Reading set class.
Definition: reading_set.h:26
PipelineFilter::setupConfiguration
bool setupConfiguration(ManagementClient *mgtClient, std::vector< std::string > &children)
Setup the configuration for a filter in a pipeline.
Definition: pipeline_filter.cpp:61
FilterPlugin
Definition: filter_plugin.h:27
PipelineFilter::~PipelineFilter
~PipelineFilter()
Destructor for the pipeline filter element.
Definition: pipeline_filter.cpp:50
PipelineWriter::shutdown
void shutdown(ServiceHandler *serviceHandler, ConfigHandler *configHandler)
Shutdown the pipeline writer.
Definition: pipeline_writer.cpp:70
PipelineFilter::reconfigure
void reconfigure(const std::string &newConfig)
Reconfigure method.
Definition: pipeline_filter.cpp:218
PipelineFilter::PipelineFilter
PipelineFilter(const std::string &name, const ConfigCategory &filterDetails)
Construct the PipelineFilter class.
Definition: pipeline_filter.cpp:25
PipelineBranch
A pipeline element that represents a branch in the pipeline.
Definition: pipeline_element.h:148
ManagementClient
The management client class used by services and tasks to communicate with the management API of the ...
Definition: management_client.h:43
PipelineWriter::isReady
bool isReady()
Return if the pipeline writer is ready to receive data.
Definition: pipeline_writer.cpp:77
ConfigCategory
Definition: config_category.h:56
PipelineWriter::setup
bool setup(ManagementClient *mgmt, void *ingest, std::map< std::string, PipelineElement * > &categories)
Setup the pipeline writer.
Definition: pipeline_writer.cpp:52
PluginData::loadStoredData
std::string loadStoredData(const std::string &key)
Load stored data for a given key.
Definition: plugin_data.cpp:34
PipelineBranch::~PipelineBranch
~PipelineBranch()
Destructor for the pipeline branch.
Definition: pipeline_branch.cpp:33
PipelineWriter::init
bool init(OUTPUT_HANDLE *outHandle, OUTPUT_STREAM output)
Initialise the pipeline writer.
Definition: pipeline_writer.cpp:60
StorageClient
Client for accessing the storage service.
Definition: storage_client.h:43
PluginData
Definition: plugin_data.h:15
PipelineBranch::ingest
void ingest(READINGSET *readingSet)
Ingest a set of readings and pass on in the pipeline.
Definition: pipeline_branch.cpp:172
PipelineFilter
A pipeline element the runs a filter plugin.
Definition: pipeline_element.h:90
PipelineBranch::isReady
bool isReady()
Return if the branch is ready to be executed.
Definition: pipeline_branch.cpp:243
PipelineBranch::shutdown
void shutdown(ServiceHandler *serviceHandler, ConfigHandler *configHandler)
Setup the configuration categories for the branch element of a pipeline.
Definition: pipeline_branch.cpp:216
PipelineElement
The base pipeline element class.
Definition: pipeline_element.h:29
PipelineDebugger
The debugger class for elements in a pipeline.
Definition: pipeline_debugger.h:22
PipelineWriter
A pipeline element that writes to a storage service or buffer.
Definition: pipeline_element.h:193
PipelineElement::getDebuggerBuffer
std::vector< std::shared_ptr< Reading > > getDebuggerBuffer()
Fetch the content of the debugger buffer.
Definition: pipeline_element.cpp:62
PipelineFilter::shutdown
void shutdown(ServiceHandler *serviceHandler, ConfigHandler *configHandler)
Shutdown a pipeline element that is a filter.
Definition: pipeline_filter.cpp:187
PipelineFilter::setup
bool setup(ManagementClient *mgmt, void *ingest, std::map< std::string, PipelineElement * > &categories)
Setup the configuration categories for the filter element in a pipeline.
Definition: pipeline_filter.cpp:127
PipelineBranch::PipelineBranch
PipelineBranch(FilterPipeline *parent)
Constructor for a branch in a filter pipeline.
Definition: pipeline_branch.cpp:22
PipelineWriter::ingest
void ingest(READINGSET *readingSet)
Ingest into a pipeline writer.
Definition: pipeline_writer.cpp:30
ServiceHandler
ServiceHandler abstract class - the interface that services using the management API must provide.
Definition: service_handler.h:20