1 #ifndef _PIPELINE_ELEMENT_H 2 #define _PIPELINE_ELEMENT_H 13 #include <config_category.h> 14 #include <management_client.h> 16 #include <plugin_manager.h> 17 #include <plugin_data.h> 18 #include <reading_set.h> 19 #include <filter_plugin.h> 20 #include <service_handler.h> 21 #include <config_handler.h> 40 void setService(
const std::string& serviceName)
42 m_serviceName = serviceName;
48 static void ingest(
void *handle,
READINGSET *readings)
53 std::vector<std::string>& )
57 virtual bool isFilter()
61 virtual bool isBranch()
65 virtual void ingest(
READINGSET *readingSet) = 0;
66 virtual bool setup(
ManagementClient *mgmt,
void *ingest, std::map<std::string, PipelineElement*>& categories) = 0;
67 virtual bool init(OUTPUT_HANDLE* outHandle, OUTPUT_STREAM output) = 0;
69 virtual void reconfigure(
const std::string& )
72 virtual std::string getName() = 0;
73 virtual bool isReady() = 0;
75 std::string m_serviceName;
88 bool setupConfiguration(
ManagementClient *mgtClient, std::vector<std::string>& children);
93 m_plugin->ingest(readingSet);
97 Logger::getLogger()->error(
"Pipeline filter %s has no plugin associated with it.", m_name.c_str());
100 bool setup(
ManagementClient *mgmt,
void *ingest, std::map<std::string, PipelineElement*>& categories);
101 bool init(OUTPUT_HANDLE* outHandle, OUTPUT_STREAM output);
103 void reconfigure(
const std::string& newConfig);
104 bool isFilter() {
return true; };
105 std::string getCategoryName() {
return m_categoryName; };
106 bool persistData() {
return m_plugin->persistData(); };
107 void setPluginData(
PluginData *data) { m_plugin->m_plugin_data = data; };
108 std::string getPluginData() {
return m_plugin->m_plugin_data->
loadStoredData(m_serviceName + m_name); };
109 void setServiceName(
const std::string& name) { m_serviceName = name; };
110 std::string getName() {
return m_name; };
111 bool isReady() {
return true; };
113 PLUGIN_HANDLE loadFilterPlugin(
const std::string& filterName);
116 std::string m_categoryName;
117 std::string m_pluginName;
118 PLUGIN_HANDLE m_handle;
120 std::string m_serviceName;
132 std::string getName() {
return "Branch"; };
133 bool setupConfiguration(
ManagementClient *mgtClient, std::vector<std::string>& children);
134 bool setup(
ManagementClient *mgmt,
void *ingest, std::map<std::string, PipelineElement*>& categories);
135 bool init(OUTPUT_HANDLE* outHandle, OUTPUT_STREAM output);
142 std::vector<PipelineElement *>&
147 void setFunctions(
void *onward,
void *use,
void *ingest)
149 m_passOnward = onward;
154 static void branchHandler(
void *instance);
157 std::vector<PipelineElement *> m_branch;
158 std::thread *m_thread;
159 std::queue<READINGSET *> m_queue;
161 std::condition_variable m_cv;
165 bool m_shutdownCalled;
176 bool setup(
ManagementClient *mgmt,
void *ingest, std::map<std::string, PipelineElement*>& categories);
177 bool init(OUTPUT_HANDLE* outHandle, OUTPUT_STREAM output);
std::string loadStoredData(const std::string &key)
Load stored data for a given key.
Definition: plugin_data.cpp:34
Definition: config_category.h:56
Definition: filter_plugin.h:27
A pipeline element the runs a filter plugin.
Definition: pipeline_element.h:84
A pipeline element that writes to a storage service or buffer.
Definition: pipeline_element.h:172
The management client class used by services and tasks to communicate with the management API of the ...
Definition: management_client.h:43
ServiceHandler abstract class - the interface that services using the management API must provide...
Definition: service_handler.h:20
Reading set class.
Definition: reading_set.h:26
Definition: plugin_data.h:15
The base pipeline element class.
Definition: pipeline_element.h:28
Client for accessing the storage service.
Definition: storage_client.h:43
The FilterPipeline class is used to represent a pipeline of filters applicable to a task/service...
Definition: filter_pipeline.h:28
A pipeline element that represents a branch in the pipeline.
Definition: pipeline_element.h:127
Handler class within a service to manage configuration changes.
Definition: config_handler.h:25