Fledge
An open source edge computing platform for industrial users
data_load.h
1 #ifndef _DATA_LOAD_H
2 #define _DATA_LOAD_H
3 
4 #include <string>
5 #include <thread>
6 #include <mutex>
7 #include <condition_variable>
8 #include <deque>
9 #include <storage_client.h>
10 #include <reading.h>
11 #include <filter_pipeline.h>
12 #include <service_handler.h>
13 #include <perfmonitors.h>
14 
15 #define DEFAULT_BLOCK_SIZE 100
16 
24 class DataLoad : public ServiceHandler {
25  public:
26  DataLoad(const std::string& name, long streamId,
27  StorageClient *storage);
28  virtual ~DataLoad();
29 
30  void loadThread();
31  bool setDataSource(const std::string& source);
32  void triggerRead(unsigned int blockSize);
33  void updateLastSentId(unsigned long id);
34  void flushLastSentId();
35  ReadingSet *fetchReadings(bool wait);
36  static void passToOnwardFilter(OUTPUT_HANDLE *outHandle,
37  READINGSET* readings);
38  static void pipelineEnd(OUTPUT_HANDLE *outHandle,
39  READINGSET* readings);
40  void shutdown();
41  void restart();
42  bool isRunning() { return !m_shutdown; };
43  void configChange(const std::string& category, const std::string& newConfig);
44  void configChildCreate(const std::string& , const std::string&, const std::string&){};
45  void configChildDelete(const std::string& , const std::string&){};
46  unsigned long getLastFetched() { return m_lastFetched; };
47  void setBlockSize(unsigned long blockSize)
48  {
49  m_blockSize = blockSize;
50  };
51  void setStreamUpdate(unsigned long streamUpdate)
52  {
53  m_streamUpdate = streamUpdate;
54  m_nextStreamUpdate = streamUpdate;
55  };
56  void setPerfMonitor(PerformanceMonitor *perfMonitor) { m_perfMonitor = perfMonitor; };
57  const std::string &getName() { return m_name; };
58  StorageClient *getStorage() { return m_storage; };
59  void setPrefetchLimit(unsigned int limit)
60  {
61  m_prefetchLimit = limit;
62  };
63 
64  private:
65  void readBlock(unsigned int blockSize);
66  unsigned int waitForReadRequest();
67  unsigned long getLastSentId();
68  int createNewStream();
69  ReadingSet *fetchStatistics(unsigned int blockSize);
70  ReadingSet *fetchAudit(unsigned int blockSize);
71  void bufferReadings(ReadingSet *readings);
72  bool loadFilters(const std::string& category);
73 
74  private:
75  const std::string& m_name;
76  long m_streamId;
77  StorageClient *m_storage;
78  volatile bool m_shutdown;
79  std::thread *m_thread;
80  std::mutex m_mutex;
81  std::condition_variable m_cv;
82  std::condition_variable m_fetchCV;
83  unsigned int m_readRequest;
84  enum { SourceReadings, SourceStatistics, SourceAudit }
85  m_dataSource;
86  unsigned long m_lastFetched;
87  std::deque<ReadingSet *>
88  m_queue;
89  std::mutex m_qMutex;
90  FilterPipeline *m_pipeline;
91  std::mutex m_pipelineMutex;
92  unsigned long m_blockSize;
93  PerformanceMonitor *m_perfMonitor;
94  int m_streamUpdate;
95  unsigned long m_streamSent;
96  int m_nextStreamUpdate;
97  unsigned int m_prefetchLimit;
98  bool m_flushRequired;
99 };
100 #endif
A class used in the North service to load data from the buffer.
Definition: data_load.h:24
static void pipelineEnd(OUTPUT_HANDLE *outHandle, READINGSET *readings)
Use the current readings (they have been filtered by all filters)
Definition: data_load.cpp:611
Class to handle the performance monitors.
Definition: perfmonitors.h:35
ServiceHandler abstract class - the interface that services using the management API must provide...
Definition: service_handler.h:20
void shutdown()
External call to shutdown the north service.
Definition: data_load.cpp:87
Reading set class.
Definition: reading_set.h:26
ReadingSet * fetchReadings(bool wait)
Fetch Readings.
Definition: data_load.cpp:417
Client for accessing the storage service.
Definition: storage_client.h:43
void flushLastSentId()
Flush the last sent Id to the storeage layer.
Definition: data_load.cpp:512
The FilterPipeline class is used to represent a pipeline of filters applicable to a task/service...
Definition: filter_pipeline.h:28
bool setDataSource(const std::string &source)
Set the source of data for the service.
Definition: data_load.cpp:107
virtual ~DataLoad()
DataLoad destructor.
Definition: data_load.cpp:55
static void passToOnwardFilter(OUTPUT_HANDLE *outHandle, READINGSET *readings)
Pass the current readings set to the next filter in the pipeline.
Definition: data_load.cpp:581
void updateLastSentId(unsigned long id)
Update the last sent ID for our stream.
Definition: data_load.cpp:498
void loadThread()
The background thread that loads data from the database.
Definition: data_load.cpp:127
void triggerRead(unsigned int blockSize)
Trigger the loading thread to read a block of data.
Definition: data_load.cpp:166
void configChange(const std::string &category, const std::string &newConfig)
Configuration change for one of the filters or to the pipeline.
Definition: data_load.cpp:652
DataLoad(const std::string &name, long streamId, StorageClient *storage)
DataLoad Constructor.
Definition: data_load.cpp:30
void restart()
External call to restart the north service.
Definition: data_load.cpp:97