Fledge
An open source edge computing platform for industrial users
data_load.h
1 #ifndef _DATA_LOAD_H
2 #define _DATA_LOAD_H
3 
4 #include <string>
5 #include <thread>
6 #include <mutex>
7 #include <condition_variable>
8 #include <deque>
9 #include <storage_client.h>
10 #include <reading.h>
11 #include <filter_pipeline.h>
12 #include <service_handler.h>
13 #include <perfmonitors.h>
14 
15 #define DEFAULT_BLOCK_SIZE 100
16 
24 class DataLoad : public ServiceHandler {
25  public:
26  DataLoad(const std::string& name, long streamId,
27  StorageClient *storage);
28  virtual ~DataLoad();
29 
30  void loadThread();
31  bool setDataSource(const std::string& source);
32  void triggerRead(unsigned int blockSize);
33  void updateLastSentId(unsigned long id);
34  void flushLastSentId();
35  ReadingSet *fetchReadings(bool wait);
36  static void passToOnwardFilter(OUTPUT_HANDLE *outHandle,
37  READINGSET* readings);
38  static void pipelineEnd(OUTPUT_HANDLE *outHandle,
39  READINGSET* readings);
40  void shutdown();
41  void restart();
42  bool isRunning() { return !m_shutdown; };
43  void configChange(const std::string& category, const std::string& newConfig);
44  void configChildCreate(const std::string& , const std::string&, const std::string&){};
45  void configChildDelete(const std::string& , const std::string&){};
46  unsigned long getLastFetched() { return m_lastFetched; };
47  void setBlockSize(unsigned long blockSize)
48  {
49  m_blockSize = blockSize;
50  };
51  void setStreamUpdate(unsigned long streamUpdate)
52  {
53  m_streamUpdate = streamUpdate;
54  m_nextStreamUpdate = streamUpdate;
55  };
56  void setPerfMonitor(PerformanceMonitor *perfMonitor) { m_perfMonitor = perfMonitor; };
57  const std::string &getName() { return m_name; };
58  StorageClient *getStorage() { return m_storage; };
59  void setPrefetchLimit(unsigned int limit)
60  {
61  m_prefetchLimit = limit;
62  };
63 
64  // Debugger entry points
65  bool attachDebugger()
66  {
67  if (m_pipeline)
68  {
69  m_debuggerAttached = true;
70  return m_pipeline->attachDebugger();
71  }
72  return false;
73  };
74  void detachDebugger()
75  {
76  if (m_pipeline)
77  {
78  m_debuggerAttached = false;
79  m_debuggerBufferSize = 1;
80  m_pipeline->detachDebugger();
81  }
82  };
83  void setDebuggerBuffer(unsigned int size)
84  {
85  if (m_pipeline)
86  {
87  m_debuggerBufferSize = size;
88  m_pipeline->setDebuggerBuffer(size);
89  }
90  };
91  std::string getDebuggerBuffer()
92  {
93  std::string rval;
94  if (m_pipeline)
95  rval = m_pipeline->getDebuggerBuffer();
96  return rval;
97  };
98  void isolate(bool isolate)
99  {
100  std::lock_guard<std::mutex> guard(m_isolateMutex);
101  m_isolate = isolate;
102  };
103  bool isolated()
104  {
105  std::lock_guard<std::mutex> guard(m_isolateMutex);
106  return m_isolate;
107  };
108  bool replayDebugger()
109  {
110  if (m_pipeline)
111  {
112  return m_pipeline->replayDebugger();
113  }
114  else
115  {
116  return false;
117  }
118  };
119  void suspendIngest(bool suspend)
120  {
121  std::lock_guard<std::mutex> guard(m_suspendMutex);
122  m_suspendIngest = suspend;
123  m_steps = 0;
124  };
125  bool isSuspended()
126  {
127  std::lock_guard<std::mutex> guard(m_suspendMutex);
128  return m_suspendIngest;
129  };
130  void stepDebugger(unsigned int steps)
131  {
132  std::lock_guard<std::mutex> guard(m_suspendMutex);
133  m_steps = steps;
134  };
135  bool willStep()
136  {
137  std::lock_guard<std::mutex> guard(m_suspendMutex);
138  if (m_suspendIngest && m_steps > 0)
139  {
140  return true;
141  }
142  return false;
143  };
144  private:
145  void readBlock(unsigned int blockSize);
146  unsigned int waitForReadRequest();
147  unsigned long getLastSentId();
148  int createNewStream();
149  ReadingSet *fetchStatistics(unsigned int blockSize);
150  ReadingSet *fetchAudit(unsigned int blockSize);
151  void bufferReadings(ReadingSet *readings);
152  bool loadFilters(const std::string& category);
153 
154  private:
155  const std::string& m_name;
156  long m_streamId;
157  StorageClient *m_storage;
158  volatile bool m_shutdown;
159  std::thread *m_thread;
160  std::mutex m_mutex;
161  std::condition_variable m_cv;
162  std::condition_variable m_fetchCV;
163  unsigned int m_readRequest;
164  enum { SourceReadings, SourceStatistics, SourceAudit }
165  m_dataSource;
166  unsigned long m_lastFetched;
167  std::deque<ReadingSet *>
168  m_queue;
169  std::mutex m_qMutex;
170  FilterPipeline *m_pipeline;
171  std::mutex m_pipelineMutex;
172  unsigned long m_blockSize;
173  PerformanceMonitor *m_perfMonitor;
174  int m_streamUpdate;
175  unsigned long m_streamSent;
176  int m_nextStreamUpdate;
177  unsigned int m_prefetchLimit;
178  bool m_flushRequired;
179  std::mutex m_isolateMutex;
180  bool m_isolate;
181  bool m_debuggerAttached;
182  unsigned int m_debuggerBufferSize;
183  bool m_suspendIngest;
184  unsigned int m_steps;
185  std::mutex m_suspendMutex;
186 };
187 #endif
DataLoad::setDataSource
bool setDataSource(const std::string &source)
Set the source of data for the service.
Definition: data_load.cpp:108
DataLoad::updateLastSentId
void updateLastSentId(unsigned long id)
Update the last sent ID for our stream.
Definition: data_load.cpp:509
DataLoad::pipelineEnd
static void pipelineEnd(OUTPUT_HANDLE *outHandle, READINGSET *readings)
Use the current readings (they have been filtered by all filters)
Definition: data_load.cpp:628
FilterPipeline::getDebuggerBuffer
std::string getDebuggerBuffer()
Get the debugger buffer contents for all the pipeline elements.
Definition: filter_pipeline.cpp:527
FilterPipeline::attachDebugger
bool attachDebugger()
Attach the debugger to the pipeline elements.
Definition: filter_pipeline.cpp:421
DataLoad::~DataLoad
virtual ~DataLoad()
DataLoad destructor.
Definition: data_load.cpp:56
FilterPipeline::setDebuggerBuffer
void setDebuggerBuffer(unsigned int size)
Set the debugger buffer size to the pipeline elements.
Definition: filter_pipeline.cpp:498
FilterPipeline::detachDebugger
void detachDebugger()
Detach the debugger from the pipeline elements.
Definition: filter_pipeline.cpp:470
FilterPipeline
The FilterPipeline class is used to represent a pipeline of filters applicable to a task/service.
Definition: filter_pipeline.h:28
DataLoad::restart
void restart()
External call to restart the north service.
Definition: data_load.cpp:98
ReadingSet
Reading set class.
Definition: reading_set.h:26
DataLoad::fetchReadings
ReadingSet * fetchReadings(bool wait)
Fetch Readings.
Definition: data_load.cpp:428
DataLoad::configChange
void configChange(const std::string &category, const std::string &newConfig)
Configuration change for one of the filters or to the pipeline.
Definition: data_load.cpp:675
FilterPipeline::replayDebugger
bool replayDebugger()
Replay the data in the first saved buffer to the filter pipeline.
Definition: filter_pipeline.cpp:621
DataLoad::passToOnwardFilter
static void passToOnwardFilter(OUTPUT_HANDLE *outHandle, READINGSET *readings)
Pass the current readings set to the next filter in the pipeline.
Definition: data_load.cpp:598
DataLoad::shutdown
void shutdown()
External call to shutdown the north service.
Definition: data_load.cpp:88
DataLoad
A class used in the North service to load data from the buffer.
Definition: data_load.h:24
DataLoad::DataLoad
DataLoad(const std::string &name, long streamId, StorageClient *storage)
DataLoad Constructor.
Definition: data_load.cpp:30
DataLoad::flushLastSentId
void flushLastSentId()
Flush the last sent Id to the storeage layer.
Definition: data_load.cpp:523
StorageClient
Client for accessing the storage service.
Definition: storage_client.h:43
DataLoad::triggerRead
void triggerRead(unsigned int blockSize)
Trigger the loading thread to read a block of data.
Definition: data_load.cpp:167
DataLoad::loadThread
void loadThread()
The background thread that loads data from the database.
Definition: data_load.cpp:128
PerformanceMonitor
Class to handle the performance monitors.
Definition: perfmonitors.h:35
ServiceHandler
ServiceHandler abstract class - the interface that services using the management API must provide.
Definition: service_handler.h:20