Fledge
An open source edge computing platform for industrial users
pipeline_element.h
1 #ifndef _PIPELINE_ELEMENT_H
2 #define _PIPELINE_ELEMENT_H
3 /*
4  * Fledge filter pipeline elements.
5  *
6  * Copyright (c) 2024 Dianomic Systems
7  *
8  * Released under the Apache 2.0 Licence
9  *
10  * Author: Mark Riddoch
11  */
12 #include <string>
13 #include <config_category.h>
14 #include <management_client.h>
15 #include <plugin.h>
16 #include <plugin_manager.h>
17 #include <plugin_data.h>
18 #include <reading_set.h>
19 #include <filter_plugin.h>
20 #include <service_handler.h>
21 #include <config_handler.h>
22 
23 class FilterPipeline;
24 
29  public:
30  PipelineElement() : m_next(NULL), m_storage(NULL) {};
31  virtual ~PipelineElement() {};
32  void setNext(PipelineElement *next)
33  {
34  m_next = next;
35  };
36  PipelineElement *getNext()
37  {
38  return m_next;
39  };
40  void setService(const std::string& serviceName)
41  {
42  m_serviceName = serviceName;
43  };
44  void setStorage(StorageClient *storage)
45  {
46  m_storage = storage;
47  };
48  static void ingest(void *handle, READINGSET *readings)
49  {
50  ((PipelineElement *)handle)->ingest(readings);
51  };
52  virtual bool setupConfiguration(ManagementClient * /* mgtClient */,
53  std::vector<std::string>& /* children */)
54  {
55  return false;
56  };
57  virtual bool isFilter()
58  {
59  return false;
60  };
61  virtual bool isBranch()
62  {
63  return false;
64  };
65  virtual void ingest(READINGSET *readingSet) = 0;
66  virtual bool setup(ManagementClient *mgmt, void *ingest, std::map<std::string, PipelineElement*>& categories) = 0;
67  virtual bool init(OUTPUT_HANDLE* outHandle, OUTPUT_STREAM output) = 0;
68  virtual void shutdown(ServiceHandler *serviceHandler, ConfigHandler *configHandler) = 0;
69  virtual void reconfigure(const std::string& /* newConfig */)
70  {
71  };
72  virtual std::string getName() = 0;
73  virtual bool isReady() = 0;
74  protected:
75  std::string m_serviceName;
76  PipelineElement *m_next;
77  StorageClient *m_storage;
78 
79 };
80 
85  public:
86  PipelineFilter(const std::string& name, const ConfigCategory& filterDetails);
87  ~PipelineFilter();
88  bool setupConfiguration(ManagementClient *mgtClient, std::vector<std::string>& children);
89  void ingest(READINGSET *readingSet)
90  {
91  if (m_plugin)
92  {
93  m_plugin->ingest(readingSet);
94  }
95  else
96  {
97  Logger::getLogger()->error("Pipeline filter %s has no plugin associated with it.", m_name.c_str());
98  }
99  };
100  bool setup(ManagementClient *mgmt, void *ingest, std::map<std::string, PipelineElement*>& categories);
101  bool init(OUTPUT_HANDLE* outHandle, OUTPUT_STREAM output);
102  void shutdown(ServiceHandler *serviceHandler, ConfigHandler *configHandler);
103  void reconfigure(const std::string& newConfig);
104  bool isFilter() { return true; };
105  std::string getCategoryName() { return m_categoryName; };
106  bool persistData() { return m_plugin->persistData(); };
107  void setPluginData(PluginData *data) { m_plugin->m_plugin_data = data; };
108  std::string getPluginData() { return m_plugin->m_plugin_data->loadStoredData(m_serviceName + m_name); };
109  void setServiceName(const std::string& name) { m_serviceName = name; };
110  std::string getName() { return m_name; };
111  bool isReady() { return true; };
112  private:
113  PLUGIN_HANDLE loadFilterPlugin(const std::string& filterName);
114  private:
115  std::string m_name; // The name of the filter instance
116  std::string m_categoryName;
117  std::string m_pluginName;
118  PLUGIN_HANDLE m_handle;
119  FilterPlugin *m_plugin;
120  std::string m_serviceName;
121  ConfigCategory m_updatedCfg;
122 };
123 
128  public:
130  ~PipelineBranch();
131  void ingest(READINGSET *readingSet);
132  std::string getName() { return "Branch"; };
133  bool setupConfiguration(ManagementClient *mgtClient, std::vector<std::string>& children);
134  bool setup(ManagementClient *mgmt, void *ingest, std::map<std::string, PipelineElement*>& categories);
135  bool init(OUTPUT_HANDLE* outHandle, OUTPUT_STREAM output);
136  void shutdown(ServiceHandler *serviceHandler, ConfigHandler *configHandler);
137  bool isReady();
138  bool isBranch()
139  {
140  return true;
141  };
142  std::vector<PipelineElement *>&
143  getBranchElements()
144  {
145  return m_branch;
146  };
147  void setFunctions(void *onward, void *use, void *ingest)
148  {
149  m_passOnward = onward;
150  m_useData = use;
151  m_ingest = ingest;
152  };
153  private:
154  static void branchHandler(void *instance);
155  void handler();
156  private:
157  std::vector<PipelineElement *> m_branch;
158  std::thread *m_thread;
159  std::queue<READINGSET *> m_queue;
160  std::mutex m_mutex;
161  std::condition_variable m_cv;
162  void *m_passOnward;
163  void *m_useData;
164  void *m_ingest;
165  bool m_shutdownCalled;
166  FilterPipeline *m_pipeline;
167 };
168 
173  public:
174  PipelineWriter();
175  void ingest(READINGSET *readingSet);
176  bool setup(ManagementClient *mgmt, void *ingest, std::map<std::string, PipelineElement*>& categories);
177  bool init(OUTPUT_HANDLE* outHandle, OUTPUT_STREAM output);
178  void shutdown(ServiceHandler *serviceHandler, ConfigHandler *configHandler);
179  bool isReady();
180 };
181 
182 #endif
std::string loadStoredData(const std::string &key)
Load stored data for a given key.
Definition: plugin_data.cpp:34
Definition: config_category.h:56
Definition: filter_plugin.h:27
A pipeline element the runs a filter plugin.
Definition: pipeline_element.h:84
A pipeline element that writes to a storage service or buffer.
Definition: pipeline_element.h:172
The management client class used by services and tasks to communicate with the management API of the ...
Definition: management_client.h:43
ServiceHandler abstract class - the interface that services using the management API must provide...
Definition: service_handler.h:20
Reading set class.
Definition: reading_set.h:26
Definition: plugin_data.h:15
The base pipeline element class.
Definition: pipeline_element.h:28
Client for accessing the storage service.
Definition: storage_client.h:43
The FilterPipeline class is used to represent a pipeline of filters applicable to a task/service...
Definition: filter_pipeline.h:28
A pipeline element that represents a branch in the pipeline.
Definition: pipeline_element.h:127
Handler class within a service to manage configuration changes.
Definition: config_handler.h:25