Fledge
An open source edge computing platform for industrial users
sending.h
1 #ifndef _SENDING_PROCESS_H
2 #define _SENDING_PROCESS_H
3 
4 /*
5  * Fledge process class
6  *
7  * Copyright (c) 2018 Dianomic Systems
8  *
9  * Released under the Apache 2.0 Licence
10  *
11  * Author: Massimiliano Pinto
12  */
13 
14 #include <process.h>
15 #include <thread>
16 #include <north_plugin.h>
17 #include <reading.h>
18 #include <filter_plugin.h>
19 #include <north_filter_pipeline.h>
20 #include <asset_tracking.h>
21 
22 // SendingProcess class
24 {
25  public:
26  // Constructor:
27  SendingProcess(int argc, char** argv);
28 
29  // Destructor
31 
32  void run() const;
33  void stop();
34  int getStreamId() const { return m_stream_id; };
35  bool isRunning() const { if (m_dryRun) return false; return m_running; };
36  void stopRunning() { m_running = false; };
37  void setLastFetchId(unsigned long id) { m_last_fetch_id = id; };
38  unsigned long getLastFetchId() const { return m_last_fetch_id; };
39  void setLastSentId(unsigned long id) { m_last_sent_id = id; };
40  unsigned long getLastSentId() const { return m_last_sent_id; };
41 
42  unsigned long getSentReadings() const { return m_tot_sent; };
43  bool updateSentReadings(unsigned long num) {
44  m_tot_sent += num;
45  return m_tot_sent;
46  };
47  void resetSentReadings() { m_tot_sent = 0; };
49  bool getLastSentReadingId();
50  bool createStream(int);
51  int createNewStream();
52  unsigned int getDuration() const { return m_duration; };
53  unsigned int getSleepTime() const { return m_sleep; };
54  bool getUpdateDb() const { return m_update_db; };
55  bool setUpdateDb(bool val) {
56  m_update_db = val;
57  return m_update_db;
58  };
59  unsigned long getReadBlockSize() const { return m_block_size; };
60  const std::string& getDataSourceType() const { return m_data_source_t; };
61  const std::string& getPluginName() const { return m_plugin_name; };
62  void setLoadBufferIndex(unsigned long loadBufferIdx);
63  unsigned long getLoadBufferIndex() const;
64  const unsigned long* getLoadBufferIndexPtr() const;
65 
66  unsigned long getMemoryBufferSize() const { return m_memory_buffer_size; };
68  std::string parent_name,
69  std::string current_name,
70  std::string current_description);
71 
72  // Public static methods
73  public:
74  static void setLoadBufferData(unsigned long index,
75  ReadingSet* readings);
76  static std::vector<ReadingSet *>*
77  getDataBuffers() { return m_buffer_ptr; };
78  static void useFilteredData(OUTPUT_HANDLE *outHandle,
79  READINGSET* readings);
80  static void passToOnwardFilter(OUTPUT_HANDLE *outHandle,
81  READINGSET* readings);
82 
83  private:
84  std::string retrieveTableInformationName(const char* dataSource);
85  void updateStreamLastSentId(long lastSentId);
86  void setDuration(unsigned int val) { m_duration = val; };
87  void setSleepTime(unsigned long val) { m_sleep = val; };
88  void setReadBlockSize(unsigned long size) { m_block_size = size; };
89  bool loadPlugin(const std::string& pluginName);
90  ConfigCategory fetchConfiguration(const std::string& defCfg,
91  const std::string& pluginName);
92  bool loadFilters(const std::string& pluginName);
93  void updateStatistics(std::string& stat_key,
94  const std::string& stat_description);
95 
96  // Make private the copy constructor and operator=
98  SendingProcess& operator=(SendingProcess const &);
99 
100  public:
101  std::vector<ReadingSet *> m_buffer;
102  std::thread* m_thread_load;
103  std::thread* m_thread_send;
104  NorthPlugin* m_plugin;
105  std::vector<unsigned long> m_last_read_id;
106  NorthFilterPipeline* filterPipeline;
107 
108  private:
109  bool m_running;
110  int m_stream_id;
111  unsigned long m_last_sent_id;
112  unsigned long m_last_fetch_id;
113  unsigned long m_tot_sent;
114  unsigned int m_duration;
115  unsigned long m_sleep;
116  unsigned long m_block_size;
117  bool m_update_db;
118  std::string m_plugin_name;
119  Logger* m_logger;
120  std::string m_data_source_t;
121  unsigned long m_load_buffer_index;
122  unsigned long m_memory_buffer_size = 1;
123 
124  // static pointer for data buffer access
125  static std::vector<ReadingSet *>*
126  m_buffer_ptr;
127  AssetTracker *m_assetTracker;
128 };
129 
130 #endif
Fledge Logger class used to log to syslog.
Definition: logger.h:26
unsigned long getLoadBufferIndex() const
Get the current buffer load index.
Definition: sending.cpp:1027
DefaultConfigCategory.
Definition: config_category.h:236
Definition: config_category.h:56
bool getLastSentReadingId()
Get last_object id sent for current stream_id Access foglam.streams table.
Definition: sending.cpp:628
bool createStream(int)
Creates a new stream, it adds a new row into the streams table allocating specific stream id...
Definition: sending.cpp:717
Fledge process base class.
Definition: process.h:21
static void passToOnwardFilter(OUTPUT_HANDLE *outHandle, READINGSET *readings)
Pass the current readings set to the next filter in the pipeline.
Definition: sending.cpp:1003
void updateDatabaseCounters()
Update database tables statistics and streams setting last_object id in streams.
Definition: sending.cpp:512
void createConfigCategories(DefaultConfigCategory configCategory, std::string parent_name, std::string current_name, std::string current_description)
Creates config categories and sub categories recursively, along with their parent-child relations...
Definition: sending.cpp:743
Reading set class.
Definition: reading_set.h:26
const unsigned long * getLoadBufferIndexPtr() const
Get the current buffer load index pointer.
Definition: sending.cpp:1041
The NorthFilterPipeline class is derived from FilterPipeline class and is used to represent a pipelin...
Definition: north_filter_pipeline.h:21
Class that represents a north plugin.
Definition: north_plugin.h:33
void setLoadBufferIndex(unsigned long loadBufferIdx)
Set the current buffer load index.
Definition: sending.cpp:1017
int createNewStream()
Creates a new stream, it adds a new row into the streams table allocating a new stream id...
Definition: sending.cpp:671
The AssetTracker class provides the asset tracking functionality.
Definition: asset_tracking.h:239
Definition: sending.h:23
SendingProcess(int argc, char **argv)
Definition: sending.cpp:145
~SendingProcess()
SendingProcess class methods.
Definition: sending.cpp:137
static void useFilteredData(OUTPUT_HANDLE *outHandle, READINGSET *readings)
Use the current input readings (they have been filtered by all filters)
Definition: sending.cpp:984