 |
Fledge
An open source edge computing platform for industrial users
|
7 #include <condition_variable>
9 #include <storage_client.h>
11 #include <filter_pipeline.h>
12 #include <service_handler.h>
13 #include <perfmonitors.h>
15 #define DEFAULT_BLOCK_SIZE 100
26 DataLoad(
const std::string& name,
long streamId,
42 bool isRunning() {
return !m_shutdown; };
43 void configChange(
const std::string& category,
const std::string& newConfig);
44 void configChildCreate(
const std::string& ,
const std::string&,
const std::string&){};
45 void configChildDelete(
const std::string& ,
const std::string&){};
46 unsigned long getLastFetched() {
return m_lastFetched; };
47 void setBlockSize(
unsigned long blockSize)
49 m_blockSize = blockSize;
51 void setStreamUpdate(
unsigned long streamUpdate)
53 m_streamUpdate = streamUpdate;
54 m_nextStreamUpdate = streamUpdate;
57 const std::string &getName() {
return m_name; };
59 void setPrefetchLimit(
unsigned int limit)
61 m_prefetchLimit = limit;
69 m_debuggerAttached =
true;
78 m_debuggerAttached =
false;
79 m_debuggerBufferSize = 1;
83 void setDebuggerBuffer(
unsigned int size)
87 m_debuggerBufferSize = size;
91 std::string getDebuggerBuffer()
98 void isolate(
bool isolate)
100 std::lock_guard<std::mutex> guard(m_isolateMutex);
105 std::lock_guard<std::mutex> guard(m_isolateMutex);
108 void replayDebugger()
113 void suspendIngest(
bool suspend)
115 std::lock_guard<std::mutex> guard(m_suspendMutex);
116 m_suspendIngest = suspend;
121 std::lock_guard<std::mutex> guard(m_suspendMutex);
122 return m_suspendIngest;
124 void stepDebugger(
unsigned int steps)
126 std::lock_guard<std::mutex> guard(m_suspendMutex);
131 std::lock_guard<std::mutex> guard(m_suspendMutex);
132 if (m_suspendIngest && m_steps > 0)
139 void readBlock(
unsigned int blockSize);
140 unsigned int waitForReadRequest();
141 unsigned long getLastSentId();
142 int createNewStream();
143 ReadingSet *fetchStatistics(
unsigned int blockSize);
144 ReadingSet *fetchAudit(
unsigned int blockSize);
146 bool loadFilters(
const std::string& category);
149 const std::string& m_name;
152 volatile bool m_shutdown;
153 std::thread *m_thread;
155 std::condition_variable m_cv;
156 std::condition_variable m_fetchCV;
157 unsigned int m_readRequest;
158 enum { SourceReadings, SourceStatistics, SourceAudit }
160 unsigned long m_lastFetched;
161 std::deque<ReadingSet *>
165 std::mutex m_pipelineMutex;
166 unsigned long m_blockSize;
169 unsigned long m_streamSent;
170 int m_nextStreamUpdate;
171 unsigned int m_prefetchLimit;
172 bool m_flushRequired;
173 std::mutex m_isolateMutex;
175 bool m_debuggerAttached;
176 unsigned int m_debuggerBufferSize;
177 bool m_suspendIngest;
178 unsigned int m_steps;
179 std::mutex m_suspendMutex;
bool setDataSource(const std::string &source)
Set the source of data for the service.
Definition: data_load.cpp:108
void updateLastSentId(unsigned long id)
Update the last sent ID for our stream.
Definition: data_load.cpp:509
static void pipelineEnd(OUTPUT_HANDLE *outHandle, READINGSET *readings)
Use the current readings (they have been filtered by all filters)
Definition: data_load.cpp:628
std::string getDebuggerBuffer()
Get the debugger buffer contents for all the pipeline elements.
Definition: filter_pipeline.cpp:520
bool attachDebugger()
Attach the debugger to the pipeline elements.
Definition: filter_pipeline.cpp:419
virtual ~DataLoad()
DataLoad destructor.
Definition: data_load.cpp:56
void setDebuggerBuffer(unsigned int size)
Set the debugger buffer size to the pipeline elements.
Definition: filter_pipeline.cpp:491
void detachDebugger()
Detach the debugger from the pipeline elements.
Definition: filter_pipeline.cpp:463
The FilterPipeline class is used to represent a pipeline of filters applicable to a task/service.
Definition: filter_pipeline.h:28
void restart()
External call to restart the north service.
Definition: data_load.cpp:98
Reading set class.
Definition: reading_set.h:26
ReadingSet * fetchReadings(bool wait)
Fetch Readings.
Definition: data_load.cpp:428
void configChange(const std::string &category, const std::string &newConfig)
Configuration change for one of the filters or to the pipeline.
Definition: data_load.cpp:675
static void passToOnwardFilter(OUTPUT_HANDLE *outHandle, READINGSET *readings)
Pass the current readings set to the next filter in the pipeline.
Definition: data_load.cpp:598
void shutdown()
External call to shutdown the north service.
Definition: data_load.cpp:88
A class used in the North service to load data from the buffer.
Definition: data_load.h:24
DataLoad(const std::string &name, long streamId, StorageClient *storage)
DataLoad Constructor.
Definition: data_load.cpp:30
void flushLastSentId()
Flush the last sent Id to the storeage layer.
Definition: data_load.cpp:523
Client for accessing the storage service.
Definition: storage_client.h:43
void triggerRead(unsigned int blockSize)
Trigger the loading thread to read a block of data.
Definition: data_load.cpp:167
void loadThread()
The background thread that loads data from the database.
Definition: data_load.cpp:128
void replayDebugger()
Replay the data in the first saved buffer to the filter pipeline.
Definition: filter_pipeline.cpp:612
ServiceHandler abstract class - the interface that services using the management API must provide.
Definition: service_handler.h:20