Fledge
An open source edge computing platform for industrial users
south_service.h
1 #ifndef _SOUTH_SERVICE_H
2 #define _SOUTH_SERVICE_H
3 /*
4  * Fledge south service.
5  *
6  * Copyright (c) 2018 OSisoft, LLC
7  *
8  * Released under the Apache 2.0 Licence
9  *
10  * Author: Mark Riddoch, Massimiliano Pinto
11  */
12 
13 #include <logger.h>
14 #include <south_plugin.h>
15 #include <service_handler.h>
16 #include <config_category.h>
17 #include <ingest.h>
18 #include <filter_plugin.h>
19 #include <plugin_data.h>
20 #include <audit_logger.h>
21 #include <perfmonitors.h>
22 
23 #define MAX_SLEEP 5 // Maximum number of seconds the service will sleep during a poll cycle
24 
25 #define SERVICE_NAME "Fledge South"
26 
27 /*
28  * Control the throttling of poll based plugins
29  *
30  * If the ingest queue grows then we reduce the poll rate, i.e. increase
31  * the interval between poll calls. If the ingest queue then drops below
32  * the threshold set in the advance configuration we then bring the poll
33  * rate back up.
34  */
35 #define SOUTH_THROTTLE_HIGH_PERCENT 50 // Percentage above buffer threshold where we throttle down
36 #define SOUTH_THROTTLE_LOW_PERCENT 10 // Percentage above buffer threshold where we throttle up
37 #define SOUTH_THROTTLE_PERCENT 10 // The percentage we throttle poll by
38 #define SOUTH_THROTTLE_DOWN_INTERVAL 10 // Interval between throttle down attmepts
39 #define SOUTH_THROTTLE_UP_INTERVAL 15 // Interval between throttle up attempts
40 
41 
45 #define DEBUG_ATTACHED 0x01
46 #define DEBUG_SUSPENDED 0x02
47 #define DEBUG_ISOLATED 0x04
48 
49 
51 
58  public:
59  SouthService(const std::string& name,
60  const std::string& token = "");
61  virtual ~SouthService();
62  void start(std::string& coreAddress,
63  unsigned short corePort);
64  void stop();
65  void shutdown();
66  void restart();
67  void configChange(const std::string&, const std::string&);
68  void processConfigChange(const std::string&, const std::string&);
69  void configChildCreate(const std::string&,
70  const std::string&,
71  const std::string&){};
72  void configChildDelete(const std::string&,
73  const std::string&){};
74  bool isRunning() { return !m_shutdown; };
75  bool setPoint(const std::string& name, const std::string& value);
76  bool operation(const std::string& name, std::vector<PLUGIN_PARAMETER *>& );
77  void setDryRun() { m_dryRun = true; };
78  void handlePendingReconf();
79  // Debugger Entry point
80  bool attachDebugger()
81  {
82  if (m_ingest)
83  {
84  m_debugState = DEBUG_ATTACHED;
85  return m_ingest->attachDebugger();
86  }
87  return false;
88  };
89  void detachDebugger()
90  {
91  if (m_ingest)
92  m_ingest->detachDebugger();
93  suspendDebugger(false);
94  isolateDebugger(false);
95  m_debugState = 0;
96  };
97  void setDebuggerBuffer(unsigned int size)
98  {
99  if (m_ingest)
100  m_ingest->setDebuggerBuffer(size);
101  };
102  std::string getDebuggerBuffer()
103  {
104  if (m_ingest)
105  return m_ingest->getDebuggerBuffer();
106  return "";
107  };
108  void suspendDebugger(bool suspend)
109  {
110  suspendIngest(suspend);
111  if (suspend)
112  m_debugState |= DEBUG_SUSPENDED;
113  else
114  m_debugState &= ~(unsigned int)DEBUG_SUSPENDED;
115  };
116  void isolateDebugger(bool isolate)
117  {
118  if (m_ingest)
119  m_ingest->isolate(isolate);
120  if (isolate)
121  m_debugState |= DEBUG_ISOLATED;
122  else
123  m_debugState &= ~(unsigned int)DEBUG_ISOLATED;
124  };
125  void stepDebugger(unsigned int steps)
126  {
127  std::lock_guard<std::mutex> guard(m_suspendMutex);
128  m_steps = steps;
129  }
130  void replayDebugger()
131  {
132  if (m_ingest)
133  m_ingest->replayDebugger();
134  };
135  std::string debugState();
136  bool debuggerAttached()
137  {
138  return m_debugState & DEBUG_ATTACHED;
139  };
140  // Global controls
141  bool allowControl()
142  {
143  return m_controlEnabled;
144  };
145  bool allowDebugger()
146  {
147  return m_debuggerEnabled;
148  };
149 
150  private:
151  void addConfigDefaults(DefaultConfigCategory& defaults);
152  bool loadPlugin();
153  int createTimerFd(struct timeval rate);
154  void createConfigCategories(DefaultConfigCategory configCategory,
155  std::string parent_name,
156  std::string current_name);
157  void throttlePoll();
158  void processNumberList(const ConfigCategory& cateogry, const std::string& item, std::vector<unsigned long>& list);
159  void calculateTimerRate();
160  bool syncToNextPoll();
161  bool onDemandPoll();
162  void checkPendingReconfigure();
163  void updateFeatures(const ConfigCategory& category);
164  void suspendIngest(bool suspend)
165  {
166  std::lock_guard<std::mutex> guard(m_suspendMutex);
167  m_suspendIngest = suspend;
168  m_steps = 0;
169  };
170  bool isSuspended()
171  {
172  std::lock_guard<std::mutex> guard(m_suspendMutex);
173  return m_suspendIngest;
174  };
175  bool willStep()
176  {
177  std::lock_guard<std::mutex> guard(m_suspendMutex);
178  if (m_suspendIngest && m_steps > 0)
179  {
180  m_steps--;
181  return true;
182  }
183  return false;
184  };
185  void getResourceLimit();
186  private:
187  std::thread *m_reconfThread;
188  std::deque<std::pair<std::string,std::string>> m_pendingNewConfig;
189  std::mutex m_pendingNewConfigMutex;
190  std::condition_variable m_cvNewReconf;
191 
192  SouthPlugin *southPlugin;
193  Logger *logger;
194  AssetTracker *m_assetTracker;
195  bool m_shutdown;
196  ConfigCategory m_config;
197  ConfigCategory m_configAdvanced;
198  ConfigCategory m_configResourceLimit;
199  unsigned long m_readingsPerSec; // May not be per second, new rate defines time units
200  unsigned int m_threshold;
201  unsigned long m_timeout;
202  Ingest *m_ingest;
203  bool m_throttle;
204  bool m_throttled;
205  unsigned int m_highWater;
206  unsigned int m_lowWater;
207  struct timeval m_lastThrottle;
208  struct timeval m_desiredRate;
209  struct timeval m_currentRate;
210  int m_timerfd;
211  const std::string m_token;
212  unsigned int m_repeatCnt;
213  PluginData *m_pluginData;
214  std::string m_dataKey;
215  bool m_dryRun;
216  bool m_requestRestart;
217  std::string m_rateUnits;
218  enum { POLL_INTERVAL, POLL_FIXED, POLL_ON_DEMAND }
219  m_pollType;
220  std::vector<unsigned long> m_hours;
221  std::vector<unsigned long> m_minutes;
222  std::vector<unsigned long> m_seconds;
223  std::string m_hoursStr;
224  std::string m_minutesStr;
225  std::string m_secondsStr;
226  std::condition_variable m_pollCV;
227  std::mutex m_pollMutex;
228  bool m_doPoll;
229  AuditLogger *m_auditLogger;
230  PerformanceMonitor *m_perfMonitor;
231  bool m_suspendIngest;
232  unsigned int m_steps;
233  std::mutex m_suspendMutex;
234  unsigned int m_debugState;
235  SouthServiceProvider *m_provider;
236  bool m_controlEnabled;
237  bool m_debuggerEnabled;
238  ServiceBufferingType m_serviceBufferingType;
239  unsigned int m_serviceBufferSize;
240  DiscardPolicy m_discardPolicy;
241 };
242 
248  public:
249  SouthServiceProvider(SouthService *south) : m_south(south) {};
250  virtual ~SouthServiceProvider() {};
251  void asJSON(std::string &json) const
252  {
253  if (m_south)
254  {
255  json = "\"debug\" : " + m_south->debugState();
256  }
257  };
258  private:
259  SouthService *m_south;
260 };
261 #endif
DefaultConfigCategory
DefaultConfigCategory.
Definition: config_category.h:241
JSONProvider
Definition: json_provider.h:14
SouthService::~SouthService
virtual ~SouthService()
Destructor for south service.
Definition: south.cpp:280
Ingest
The ingest class is used to ingest asset readings.
Definition: ingest.h:74
SouthService::debugState
std::string debugState()
Return the state of the pipeline debugger.
Definition: south.cpp:1962
SouthService::SouthService
SouthService(const std::string &name, const std::string &token="")
Constructor for the south service.
Definition: south.cpp:247
SouthService::processConfigChange
void processConfigChange(const std::string &, const std::string &)
Configuration change notification.
Definition: south.cpp:1068
SouthService::stop
void stop()
Stop the storage service/.
Definition: south.cpp:917
SouthService
The SouthService class.
Definition: south_service.h:57
SouthService::start
void start(std::string &coreAddress, unsigned short corePort)
Start the south service.
Definition: south.cpp:305
SouthService::restart
void restart()
Restart request.
Definition: south.cpp:1052
ConfigCategory
Definition: config_category.h:56
SouthService::shutdown
void shutdown()
Shutdown request.
Definition: south.cpp:1031
PluginData
Definition: plugin_data.h:15
SouthService::setPoint
bool setPoint(const std::string &name, const std::string &value)
Perform a setPoint operation on the south plugin.
Definition: south.cpp:1562
SouthService::configChange
void configChange(const std::string &, const std::string &)
Configuration change notification using a separate thread.
Definition: south.cpp:1307
SouthPlugin
Class that represents a south plugin.
Definition: south_plugin.h:33
SouthService::handlePendingReconf
void handlePendingReconf()
Handle configuration change notification; called by reconf thread Waits for some reconf operation(s) ...
Definition: south.cpp:1265
Logger
Fledge Logger class used to log to syslog.
Definition: logger.h:42
SouthServiceProvider
A data provider class to return data in the south service ping response.
Definition: south_service.h:247
AssetTracker
The AssetTracker class provides the asset tracking functionality.
Definition: asset_tracking.h:239
ServiceAuthHandler
ServiceAuthHandler adds security to the base class ServiceHandler.
Definition: service_handler.h:35
PerformanceMonitor
Class to handle the performance monitors.
Definition: perfmonitors.h:35
AuditLogger
A singleton class for access to the audit logger within services.
Definition: audit_logger.h:21
SouthService::operation
bool operation(const std::string &name, std::vector< PLUGIN_PARAMETER * > &)
Perform an operation on the south plugin.
Definition: south.cpp:1582