Fledge
An open source edge computing platform for industrial users
south_service.h
1 #ifndef _SOUTH_SERVICE_H
2 #define _SOUTH_SERVICE_H
3 /*
4  * Fledge south service.
5  *
6  * Copyright (c) 2018 OSisoft, LLC
7  *
8  * Released under the Apache 2.0 Licence
9  *
10  * Author: Mark Riddoch, Massimiliano Pinto
11  */
12 
13 #include <logger.h>
14 #include <south_plugin.h>
15 #include <service_handler.h>
16 #include <config_category.h>
17 #include <ingest.h>
18 #include <filter_plugin.h>
19 #include <plugin_data.h>
20 #include <audit_logger.h>
21 #include <perfmonitors.h>
22 
23 #define MAX_SLEEP 5 // Maximum number of seconds the service will sleep during a poll cycle
24 
25 #define SERVICE_NAME "Fledge South"
26 
27 /*
28  * Control the throttling of poll based plugins
29  *
30  * If the ingest queue grows then we reduce the poll rate, i.e. increase
31  * the interval between poll calls. If the ingest queue then drops below
32  * the threshold set in the advance configuration we then bring the poll
33  * rate back up.
34  */
35 #define SOUTH_THROTTLE_HIGH_PERCENT 50 // Percentage above buffer threshold where we throttle down
36 #define SOUTH_THROTTLE_LOW_PERCENT 10 // Percentage above buffer threshold where we throttle up
37 #define SOUTH_THROTTLE_PERCENT 10 // The percentage we throttle poll by
38 #define SOUTH_THROTTLE_DOWN_INTERVAL 10 // Interval between throttle down attmepts
39 #define SOUTH_THROTTLE_UP_INTERVAL 15 // Interval between throttle up attempts
40 
47  public:
48  SouthService(const std::string& name,
49  const std::string& token = "");
50  virtual ~SouthService();
51  void start(std::string& coreAddress,
52  unsigned short corePort);
53  void stop();
54  void shutdown();
55  void restart();
56  void configChange(const std::string&, const std::string&);
57  void processConfigChange(const std::string&, const std::string&);
58  void configChildCreate(const std::string&,
59  const std::string&,
60  const std::string&){};
61  void configChildDelete(const std::string&,
62  const std::string&){};
63  bool isRunning() { return !m_shutdown; };
64  bool setPoint(const std::string& name, const std::string& value);
65  bool operation(const std::string& name, std::vector<PLUGIN_PARAMETER *>& );
66  void setDryRun() { m_dryRun = true; };
67  void handlePendingReconf();
68 
69  private:
70  void addConfigDefaults(DefaultConfigCategory& defaults);
71  bool loadPlugin();
72  int createTimerFd(struct timeval rate);
73  void createConfigCategories(DefaultConfigCategory configCategory,
74  std::string parent_name,
75  std::string current_name);
76  void throttlePoll();
77  void processNumberList(const ConfigCategory& cateogry, const std::string& item, std::vector<unsigned long>& list);
78  void calculateTimerRate();
79  bool syncToNextPoll();
80  bool onDemandPoll();
81  void checkPendingReconfigure();
82  private:
83  std::thread *m_reconfThread;
84  std::deque<std::pair<std::string,std::string>> m_pendingNewConfig;
85  std::mutex m_pendingNewConfigMutex;
86  std::condition_variable m_cvNewReconf;
87 
88  SouthPlugin *southPlugin;
89  Logger *logger;
90  AssetTracker *m_assetTracker;
91  bool m_shutdown;
92  ConfigCategory m_config;
93  ConfigCategory m_configAdvanced;
94  unsigned long m_readingsPerSec; // May not be per second, new rate defines time units
95  unsigned int m_threshold;
96  unsigned long m_timeout;
97  Ingest *m_ingest;
98  bool m_throttle;
99  bool m_throttled;
100  unsigned int m_highWater;
101  unsigned int m_lowWater;
102  struct timeval m_lastThrottle;
103  struct timeval m_desiredRate;
104  struct timeval m_currentRate;
105  int m_timerfd;
106  const std::string m_token;
107  unsigned int m_repeatCnt;
108  PluginData *m_pluginData;
109  std::string m_dataKey;
110  bool m_dryRun;
111  bool m_requestRestart;
112  std::string m_rateUnits;
113  enum { POLL_INTERVAL, POLL_FIXED, POLL_ON_DEMAND }
114  m_pollType;
115  std::vector<unsigned long> m_hours;
116  std::vector<unsigned long> m_minutes;
117  std::vector<unsigned long> m_seconds;
118  std::string m_hoursStr;
119  std::string m_minutesStr;
120  std::string m_secondsStr;
121  std::condition_variable m_pollCV;
122  std::mutex m_pollMutex;
123  bool m_doPoll;
124  AuditLogger *m_auditLogger;
125  PerformanceMonitor *m_perfMonitor;
126 };
127 #endif
Fledge Logger class used to log to syslog.
Definition: logger.h:26
bool operation(const std::string &name, std::vector< PLUGIN_PARAMETER *> &)
Perform an operation on the south plugin.
Definition: south.cpp:1413
DefaultConfigCategory.
Definition: config_category.h:236
Definition: config_category.h:56
Class that represents a south plugin.
Definition: south_plugin.h:33
void configChange(const std::string &, const std::string &)
Configuration change notification using a separate thread.
Definition: south.cpp:1138
void restart()
Restart request.
Definition: south.cpp:896
void shutdown()
Shutdown request.
Definition: south.cpp:875
void processConfigChange(const std::string &, const std::string &)
Configuration change notification.
Definition: south.cpp:912
Class to handle the performance monitors.
Definition: perfmonitors.h:35
SouthService(const std::string &name, const std::string &token="")
Constructor for the south service.
Definition: south.cpp:247
virtual ~SouthService()
Destructor for south service.
Definition: south.cpp:275
Definition: plugin_data.h:15
void stop()
Stop the storage service/.
Definition: south.cpp:761
The ingest class is used to ingest asset readings.
Definition: ingest.h:56
The SouthService class.
Definition: south_service.h:46
A singleton class for access to the audit logger within services.
Definition: audit_logger.h:21
bool setPoint(const std::string &name, const std::string &value)
Perform a setPoint operation on the south plugin.
Definition: south.cpp:1393
void handlePendingReconf()
Handle configuration change notification; called by reconf thread Waits for some reconf operation(s) ...
Definition: south.cpp:1096
The AssetTracker class provides the asset tracking functionality.
Definition: asset_tracking.h:239
ServiceAuthHandler adds security to the base class ServiceHandler.
Definition: service_handler.h:35
void start(std::string &coreAddress, unsigned short corePort)
Start the south service.
Definition: south.cpp:299