Fledge
An open source edge computing platform for industrial users
data_load.h
1 #ifndef _DATA_LOAD_H
2 #define _DATA_LOAD_H
3 
4 #include <string>
5 #include <thread>
6 #include <mutex>
7 #include <condition_variable>
8 #include <deque>
9 #include <storage_client.h>
10 #include <reading.h>
11 #include <filter_pipeline.h>
12 #include <service_handler.h>
13 #include <perfmonitors.h>
14 
15 #define DEFAULT_BLOCK_SIZE 100
16 
24 class DataLoad : public ServiceHandler {
25  public:
26  DataLoad(const std::string& name, long streamId,
27  StorageClient *storage);
28  virtual ~DataLoad();
29 
30  void loadThread();
31  bool setDataSource(const std::string& source);
32  void triggerRead(unsigned int blockSize);
33  void updateLastSentId(unsigned long id);
34  void flushLastSentId();
35  ReadingSet *fetchReadings(bool wait);
36  static void passToOnwardFilter(OUTPUT_HANDLE *outHandle,
37  READINGSET* readings);
38  static void pipelineEnd(OUTPUT_HANDLE *outHandle,
39  READINGSET* readings);
40  void shutdown();
41  void restart();
42  bool isRunning() { return !m_shutdown; };
43  void configChange(const std::string& category, const std::string& newConfig);
44  void configChildCreate(const std::string& , const std::string&, const std::string&){};
45  void configChildDelete(const std::string& , const std::string&){};
46  unsigned long getLastFetched() { return m_lastFetched; };
47  void setBlockSize(unsigned long blockSize)
48  {
49  m_blockSize = blockSize;
50  };
51  void setStreamUpdate(unsigned long streamUpdate)
52  {
53  m_streamUpdate = streamUpdate;
54  m_nextStreamUpdate = streamUpdate;
55  };
56  void setPerfMonitor(PerformanceMonitor *perfMonitor) { m_perfMonitor = perfMonitor; };
57  const std::string &getName() { return m_name; };
58  StorageClient *getStorage() { return m_storage; };
59  void setPrefetchLimit(unsigned int limit)
60  {
61  m_prefetchLimit = limit;
62  };
63 
64  // Debugger entry points
65  bool attachDebugger()
66  {
67  if (m_pipeline)
68  {
69  m_debuggerAttached = true;
70  return m_pipeline->attachDebugger();
71  }
72  return false;
73  };
74  void detachDebugger()
75  {
76  if (m_pipeline)
77  {
78  m_debuggerAttached = false;
79  m_debuggerBufferSize = 1;
80  m_pipeline->detachDebugger();
81  }
82  };
83  void setDebuggerBuffer(unsigned int size)
84  {
85  if (m_pipeline)
86  {
87  m_debuggerBufferSize = size;
88  m_pipeline->setDebuggerBuffer(size);
89  }
90  };
91  std::string getDebuggerBuffer()
92  {
93  std::string rval;
94  if (m_pipeline)
95  rval = m_pipeline->getDebuggerBuffer();
96  return rval;
97  };
98  void isolate(bool isolate)
99  {
100  std::lock_guard<std::mutex> guard(m_isolateMutex);
101  m_isolate = isolate;
102  };
103  bool isolated()
104  {
105  std::lock_guard<std::mutex> guard(m_isolateMutex);
106  return m_isolate;
107  };
108  void replayDebugger()
109  {
110  if (m_pipeline)
111  m_pipeline->replayDebugger();
112  };
113  void suspendIngest(bool suspend)
114  {
115  std::lock_guard<std::mutex> guard(m_suspendMutex);
116  m_suspendIngest = suspend;
117  m_steps = 0;
118  };
119  bool isSuspended()
120  {
121  std::lock_guard<std::mutex> guard(m_suspendMutex);
122  return m_suspendIngest;
123  };
124  void stepDebugger(unsigned int steps)
125  {
126  std::lock_guard<std::mutex> guard(m_suspendMutex);
127  m_steps = steps;
128  };
129  bool willStep()
130  {
131  std::lock_guard<std::mutex> guard(m_suspendMutex);
132  if (m_suspendIngest && m_steps > 0)
133  {
134  return true;
135  }
136  return false;
137  };
138  private:
139  void readBlock(unsigned int blockSize);
140  unsigned int waitForReadRequest();
141  unsigned long getLastSentId();
142  int createNewStream();
143  ReadingSet *fetchStatistics(unsigned int blockSize);
144  ReadingSet *fetchAudit(unsigned int blockSize);
145  void bufferReadings(ReadingSet *readings);
146  bool loadFilters(const std::string& category);
147 
148  private:
149  const std::string& m_name;
150  long m_streamId;
151  StorageClient *m_storage;
152  volatile bool m_shutdown;
153  std::thread *m_thread;
154  std::mutex m_mutex;
155  std::condition_variable m_cv;
156  std::condition_variable m_fetchCV;
157  unsigned int m_readRequest;
158  enum { SourceReadings, SourceStatistics, SourceAudit }
159  m_dataSource;
160  unsigned long m_lastFetched;
161  std::deque<ReadingSet *>
162  m_queue;
163  std::mutex m_qMutex;
164  FilterPipeline *m_pipeline;
165  std::mutex m_pipelineMutex;
166  unsigned long m_blockSize;
167  PerformanceMonitor *m_perfMonitor;
168  int m_streamUpdate;
169  unsigned long m_streamSent;
170  int m_nextStreamUpdate;
171  unsigned int m_prefetchLimit;
172  bool m_flushRequired;
173  std::mutex m_isolateMutex;
174  bool m_isolate;
175  bool m_debuggerAttached;
176  unsigned int m_debuggerBufferSize;
177  bool m_suspendIngest;
178  unsigned int m_steps;
179  std::mutex m_suspendMutex;
180 };
181 #endif
DataLoad::setDataSource
bool setDataSource(const std::string &source)
Set the source of data for the service.
Definition: data_load.cpp:108
DataLoad::updateLastSentId
void updateLastSentId(unsigned long id)
Update the last sent ID for our stream.
Definition: data_load.cpp:509
DataLoad::pipelineEnd
static void pipelineEnd(OUTPUT_HANDLE *outHandle, READINGSET *readings)
Use the current readings (they have been filtered by all filters)
Definition: data_load.cpp:628
FilterPipeline::getDebuggerBuffer
std::string getDebuggerBuffer()
Get the debugger buffer contents for all the pipeline elements.
Definition: filter_pipeline.cpp:520
FilterPipeline::attachDebugger
bool attachDebugger()
Attach the debugger to the pipeline elements.
Definition: filter_pipeline.cpp:419
DataLoad::~DataLoad
virtual ~DataLoad()
DataLoad destructor.
Definition: data_load.cpp:56
FilterPipeline::setDebuggerBuffer
void setDebuggerBuffer(unsigned int size)
Set the debugger buffer size to the pipeline elements.
Definition: filter_pipeline.cpp:491
FilterPipeline::detachDebugger
void detachDebugger()
Detach the debugger from the pipeline elements.
Definition: filter_pipeline.cpp:463
FilterPipeline
The FilterPipeline class is used to represent a pipeline of filters applicable to a task/service.
Definition: filter_pipeline.h:28
DataLoad::restart
void restart()
External call to restart the north service.
Definition: data_load.cpp:98
ReadingSet
Reading set class.
Definition: reading_set.h:26
DataLoad::fetchReadings
ReadingSet * fetchReadings(bool wait)
Fetch Readings.
Definition: data_load.cpp:428
DataLoad::configChange
void configChange(const std::string &category, const std::string &newConfig)
Configuration change for one of the filters or to the pipeline.
Definition: data_load.cpp:675
DataLoad::passToOnwardFilter
static void passToOnwardFilter(OUTPUT_HANDLE *outHandle, READINGSET *readings)
Pass the current readings set to the next filter in the pipeline.
Definition: data_load.cpp:598
DataLoad::shutdown
void shutdown()
External call to shutdown the north service.
Definition: data_load.cpp:88
DataLoad
A class used in the North service to load data from the buffer.
Definition: data_load.h:24
DataLoad::DataLoad
DataLoad(const std::string &name, long streamId, StorageClient *storage)
DataLoad Constructor.
Definition: data_load.cpp:30
DataLoad::flushLastSentId
void flushLastSentId()
Flush the last sent Id to the storeage layer.
Definition: data_load.cpp:523
StorageClient
Client for accessing the storage service.
Definition: storage_client.h:43
DataLoad::triggerRead
void triggerRead(unsigned int blockSize)
Trigger the loading thread to read a block of data.
Definition: data_load.cpp:167
DataLoad::loadThread
void loadThread()
The background thread that loads data from the database.
Definition: data_load.cpp:128
FilterPipeline::replayDebugger
void replayDebugger()
Replay the data in the first saved buffer to the filter pipeline.
Definition: filter_pipeline.cpp:612
PerformanceMonitor
Class to handle the performance monitors.
Definition: perfmonitors.h:35
ServiceHandler
ServiceHandler abstract class - the interface that services using the management API must provide.
Definition: service_handler.h:20