Fledge
An open source edge computing platform for industrial users
filter_pipeline.h
1 #ifndef _FILTER_PIPELINE_H
2 #define _FILTER_PIPELINE_H
3 /*
4  * Fledge filter pipeline class.
5  *
6  * Copyright (c) 2018 Dianomic Systems
7  *
8  * Released under the Apache 2.0 Licence
9  *
10  * Author: Amandeep Singh Arora
11  */
12 #include <plugin.h>
13 #include <plugin_manager.h>
14 #include <config_category.h>
15 #include <management_client.h>
16 #include <plugin_data.h>
17 #include <reading_set.h>
18 #include <filter_plugin.h>
19 #include <service_handler.h>
20 #include <pipeline_element.h>
21 typedef void (*filterReadingSetFn)(OUTPUT_HANDLE *outHandle, READINGSET* readings);
22 
29 {
30 
31 public:
33  StorageClient& storage,
34  std::string serviceName);
36  PipelineElement *getFirstFilterPlugin()
37  {
38  return (m_filters.begin() == m_filters.end()) ?
39  NULL : *(m_filters.begin());
40  };
41  unsigned int getFilterCount() { return m_filters.size(); }
42  void configChange(const std::string&, const std::string&);
43 
44  // Cleanup the loaded filters
45  void cleanupFilters(const std::string& categoryName);
46  // Load filters as specified in the configuration
47  bool loadFilters(const std::string& categoryName);
48  // Setup the filter pipeline
49  bool setupFiltersPipeline(void *passToOnwardFilter,
50  void *useFilteredData,
51  void *ingest);
52  // Check FilterPipeline is ready for data ingest
53  bool isReady() { return m_ready; };
54  bool hasChanged(const std::string pipeline) const { return m_pipeline != pipeline; }
55  bool isShuttingDown() { return m_shutdown; };
56  void setShuttingDown() { m_shutdown = true; }
57  void execute();
58  void awaitCompletion();
59  void startBranch();
60  void completeBranch();
61  // The filter pipeline debugger entry points
62  bool attachDebugger();
63  void detachDebugger();
64  void setDebuggerBuffer(unsigned int size);
65  std::string getDebuggerBuffer();
66  std::string getDebuggerBuffer(const std::string& name);
67  void replayDebugger();
68 
69 private:
70  PLUGIN_HANDLE loadFilterPlugin(const std::string& filterName);
71  void loadPipeline(const rapidjson::Value& filters, std::vector<PipelineElement *>& pipeline);
72  bool attachDebugger(const std::vector<PipelineElement *>& pipeline);
73  void detachDebugger(const std::vector<PipelineElement *>& pipeline);
74  void setDebuggerBuffer(const std::vector<PipelineElement *>& pipeline, unsigned int size);
75  std::string getDebuggerBuffer(const std::vector<PipelineElement *>& pipeline);
76  std::string readingsToJSON(std::vector<std::shared_ptr<Reading>> readings);
77 
78 protected:
79  ManagementClient* mgtClient;
80  StorageClient& storage;
81  std::string serviceName;
82  std::vector<PipelineElement *>
83  m_filters; // Elements in the "trunk" pipeline
84  std::map<std::string, PipelineElement *>
85  m_filterCategories;
86  std::string m_pipeline;
87  bool m_ready;
88  bool m_shutdown;
89  ServiceHandler *m_serviceHandler;
90  int m_activeBranches;
91  std::mutex m_actives;
92  std::condition_variable m_branchActivations;
93 };
94 
95 #endif
FilterPipeline::startBranch
void startBranch()
A new branch has started in the pipeline.
Definition: filter_pipeline.cpp:395
FilterPipeline::execute
void execute()
Called when we pass the data into the pipeline.
Definition: filter_pipeline.cpp:374
FilterPipeline::getDebuggerBuffer
std::string getDebuggerBuffer()
Get the debugger buffer contents for all the pipeline elements.
Definition: filter_pipeline.cpp:520
FilterPipeline::attachDebugger
bool attachDebugger()
Attach the debugger to the pipeline elements.
Definition: filter_pipeline.cpp:419
FilterPipeline::setDebuggerBuffer
void setDebuggerBuffer(unsigned int size)
Set the debugger buffer size to the pipeline elements.
Definition: filter_pipeline.cpp:491
FilterPipeline::cleanupFilters
void cleanupFilters(const std::string &categoryName)
Cleanup all the loaded filters.
Definition: filter_pipeline.cpp:325
FilterPipeline::detachDebugger
void detachDebugger()
Detach the debugger from the pipeline elements.
Definition: filter_pipeline.cpp:463
FilterPipeline::~FilterPipeline
~FilterPipeline()
FilterPipeline destructor.
Definition: filter_pipeline.cpp:39
FilterPipeline
The FilterPipeline class is used to represent a pipeline of filters applicable to a task/service.
Definition: filter_pipeline.h:28
ReadingSet
Reading set class.
Definition: reading_set.h:26
FilterPipeline::awaitCompletion
void awaitCompletion()
Wait for all active branches of the pipeline to complete.
Definition: filter_pipeline.cpp:383
FilterPipeline::completeBranch
void completeBranch()
A branch in the pipeline has completed.
Definition: filter_pipeline.cpp:404
ManagementClient
The management client class used by services and tasks to communicate with the management API of the ...
Definition: management_client.h:43
FilterPipeline::setupFiltersPipeline
bool setupFiltersPipeline(void *passToOnwardFilter, void *useFilteredData, void *ingest)
Set the filter pipeline.
Definition: filter_pipeline.cpp:242
StorageClient
Client for accessing the storage service.
Definition: storage_client.h:43
FilterPipeline::FilterPipeline
FilterPipeline(ManagementClient *mgtClient, StorageClient &storage, std::string serviceName)
FilterPipeline class constructor.
Definition: filter_pipeline.cpp:31
FilterPipeline::loadFilters
bool loadFilters(const std::string &categoryName)
Load all filter plugins in the pipeline.
Definition: filter_pipeline.cpp:78
PipelineElement
The base pipeline element class.
Definition: pipeline_element.h:29
FilterPipeline::replayDebugger
void replayDebugger()
Replay the data in the first saved buffer to the filter pipeline.
Definition: filter_pipeline.cpp:612
FilterPipeline::configChange
void configChange(const std::string &, const std::string &)
Configuration change for one of the filters.
Definition: filter_pipeline.cpp:361
ServiceHandler
ServiceHandler abstract class - the interface that services using the management API must provide.
Definition: service_handler.h:20