 |
Fledge
An open source edge computing platform for industrial users
|
7 #include <condition_variable>
9 #include <storage_client.h>
11 #include <filter_pipeline.h>
12 #include <service_handler.h>
13 #include <perfmonitors.h>
15 #define DEFAULT_BLOCK_SIZE 100
26 DataLoad(
const std::string& name,
long streamId,
42 bool isRunning() {
return !m_shutdown; };
43 void configChange(
const std::string& category,
const std::string& newConfig);
44 void configChildCreate(
const std::string& ,
const std::string&,
const std::string&){};
45 void configChildDelete(
const std::string& ,
const std::string&){};
46 unsigned long getLastFetched() {
return m_lastFetched; };
47 void setBlockSize(
unsigned long blockSize)
49 m_blockSize = blockSize;
51 void setStreamUpdate(
unsigned long streamUpdate)
53 m_streamUpdate = streamUpdate;
54 m_nextStreamUpdate = streamUpdate;
57 const std::string &getName() {
return m_name; };
59 void setPrefetchLimit(
unsigned int limit)
61 m_prefetchLimit = limit;
69 m_debuggerAttached =
true;
78 m_debuggerAttached =
false;
79 m_debuggerBufferSize = 1;
83 void setDebuggerBuffer(
unsigned int size)
87 m_debuggerBufferSize = size;
91 std::string getDebuggerBuffer()
98 void isolate(
bool isolate)
100 std::lock_guard<std::mutex> guard(m_isolateMutex);
105 std::lock_guard<std::mutex> guard(m_isolateMutex);
108 bool replayDebugger()
119 void suspendIngest(
bool suspend)
121 std::lock_guard<std::mutex> guard(m_suspendMutex);
122 m_suspendIngest = suspend;
127 std::lock_guard<std::mutex> guard(m_suspendMutex);
128 return m_suspendIngest;
130 void stepDebugger(
unsigned int steps)
132 std::lock_guard<std::mutex> guard(m_suspendMutex);
137 std::lock_guard<std::mutex> guard(m_suspendMutex);
138 if (m_suspendIngest && m_steps > 0)
145 void readBlock(
unsigned int blockSize);
146 unsigned int waitForReadRequest();
147 unsigned long getLastSentId();
148 int createNewStream();
149 ReadingSet *fetchStatistics(
unsigned int blockSize);
150 ReadingSet *fetchAudit(
unsigned int blockSize);
152 bool loadFilters(
const std::string& category);
155 const std::string& m_name;
158 volatile bool m_shutdown;
159 std::thread *m_thread;
161 std::condition_variable m_cv;
162 std::condition_variable m_fetchCV;
163 unsigned int m_readRequest;
164 enum { SourceReadings, SourceStatistics, SourceAudit }
166 unsigned long m_lastFetched;
167 std::deque<ReadingSet *>
171 std::mutex m_pipelineMutex;
172 unsigned long m_blockSize;
175 unsigned long m_streamSent;
176 int m_nextStreamUpdate;
177 unsigned int m_prefetchLimit;
178 bool m_flushRequired;
179 std::mutex m_isolateMutex;
181 bool m_debuggerAttached;
182 unsigned int m_debuggerBufferSize;
183 bool m_suspendIngest;
184 unsigned int m_steps;
185 std::mutex m_suspendMutex;
bool setDataSource(const std::string &source)
Set the source of data for the service.
Definition: data_load.cpp:108
void updateLastSentId(unsigned long id)
Update the last sent ID for our stream.
Definition: data_load.cpp:509
static void pipelineEnd(OUTPUT_HANDLE *outHandle, READINGSET *readings)
Use the current readings (they have been filtered by all filters)
Definition: data_load.cpp:628
std::string getDebuggerBuffer()
Get the debugger buffer contents for all the pipeline elements.
Definition: filter_pipeline.cpp:527
bool attachDebugger()
Attach the debugger to the pipeline elements.
Definition: filter_pipeline.cpp:421
virtual ~DataLoad()
DataLoad destructor.
Definition: data_load.cpp:56
void setDebuggerBuffer(unsigned int size)
Set the debugger buffer size to the pipeline elements.
Definition: filter_pipeline.cpp:498
void detachDebugger()
Detach the debugger from the pipeline elements.
Definition: filter_pipeline.cpp:470
The FilterPipeline class is used to represent a pipeline of filters applicable to a task/service.
Definition: filter_pipeline.h:28
void restart()
External call to restart the north service.
Definition: data_load.cpp:98
Reading set class.
Definition: reading_set.h:26
ReadingSet * fetchReadings(bool wait)
Fetch Readings.
Definition: data_load.cpp:428
void configChange(const std::string &category, const std::string &newConfig)
Configuration change for one of the filters or to the pipeline.
Definition: data_load.cpp:675
bool replayDebugger()
Replay the data in the first saved buffer to the filter pipeline.
Definition: filter_pipeline.cpp:621
static void passToOnwardFilter(OUTPUT_HANDLE *outHandle, READINGSET *readings)
Pass the current readings set to the next filter in the pipeline.
Definition: data_load.cpp:598
void shutdown()
External call to shutdown the north service.
Definition: data_load.cpp:88
A class used in the North service to load data from the buffer.
Definition: data_load.h:24
DataLoad(const std::string &name, long streamId, StorageClient *storage)
DataLoad Constructor.
Definition: data_load.cpp:30
void flushLastSentId()
Flush the last sent Id to the storeage layer.
Definition: data_load.cpp:523
Client for accessing the storage service.
Definition: storage_client.h:43
void triggerRead(unsigned int blockSize)
Trigger the loading thread to read a block of data.
Definition: data_load.cpp:167
void loadThread()
The background thread that loads data from the database.
Definition: data_load.cpp:128
ServiceHandler abstract class - the interface that services using the management API must provide.
Definition: service_handler.h:20