Fledge
An open source edge computing platform for industrial users
asset_tracking.h
1 #ifndef _ASSET_TRACKING_H
2 #define _ASSET_TRACKING_H
3 /*
4  * Fledge asset tracking related
5  *
6  * Copyright (c) 2018 OSisoft, LLC
7  *
8  * Released under the Apache 2.0 Licence
9  *
10  * Author: Amandeep Singh Arora, Massimiliano Pinto
11  */
12 #include <logger.h>
13 #include <vector>
14 #include <set>
15 #include <sstream>
16 #include <unordered_set>
17 #include <management_client.h>
18 #include <queue>
19 #include <thread>
20 #include <mutex>
21 #include <condition_variable>
22 #include <storage_client.h>
23 
24 #define MIN_ASSET_TRACKER_UPDATE 500 // The minimum interval for asset tracker updates
25 
30 public:
31  TrackingTuple() {};
32  virtual ~TrackingTuple() = default;
33  virtual InsertValues processData(bool storage_connected,
34  ManagementClient *mgtClient,
35  bool &warned,
36  std::string &instanceName) = 0;
37  virtual std::string assetToString() = 0;
38 };
39 
40 
48 
49 public:
50  std::string assetToString()
51  {
52  std::ostringstream o;
53  o << "service:" << m_serviceName <<
54  ", plugin:" << m_pluginName <<
55  ", asset:" << m_assetName <<
56  ", event:" << m_eventName <<
57  ", deprecated:" << m_deprecated;
58  return o.str();
59  }
60 
61  inline bool operator==(const AssetTrackingTuple& x) const
62  {
63  return ( x.m_serviceName==m_serviceName &&
64  x.m_pluginName==m_pluginName &&
65  x.m_assetName==m_assetName &&
66  x.m_eventName==m_eventName);
67  };
68 
69  AssetTrackingTuple(const std::string& service,
70  const std::string& plugin,
71  const std::string& asset,
72  const std::string& event,
73  const bool& deprecated = false) :
74  m_serviceName(service),
75  m_pluginName(plugin),
76  m_assetName(asset),
77  m_eventName(event),
78  m_deprecated(deprecated) {}
79 
80  std::string &getAssetName() { return m_assetName; };
81  std::string getPluginName() { return m_pluginName;}
82  std::string getEventName() { return m_eventName;}
83  std::string getServiceName() { return m_serviceName;}
84  bool isDeprecated() { return m_deprecated; };
85  void unDeprecate() { m_deprecated = false; };
86 
87  InsertValues processData(bool storage_connected,
88  ManagementClient *mgtClient,
89  bool &warned,
90  std::string &instanceName);
91 
92 public:
93  std::string m_serviceName;
94  std::string m_pluginName;
95  std::string m_assetName;
96  std::string m_eventName;
97 
98 private:
99  bool m_deprecated;
100 };
101 
103  bool operator()(AssetTrackingTuple const* a, AssetTrackingTuple const* b) const {
104  return *a == *b;
105  }
106 };
107 
108 namespace std
109 {
110  template <>
111  struct hash<AssetTrackingTuple>
112  {
113  size_t operator()(const AssetTrackingTuple& t) const
114  {
115  return (std::hash<std::string>()(t.m_serviceName + t.m_pluginName + t.m_assetName + t.m_eventName));
116  }
117  };
118 
119  template <>
120  struct hash<AssetTrackingTuple*>
121  {
122  size_t operator()(AssetTrackingTuple* t) const
123  {
124  return (std::hash<std::string>()(t->m_serviceName + t->m_pluginName + t->m_assetName + t->m_eventName));
125  }
126  };
127 }
128 
130 public:
131  StorageAssetTrackingTuple(const std::string& service,
132  const std::string& plugin,
133  const std::string& asset,
134  const std::string& event,
135  const bool& deprecated = false,
136  const std::string& datapoints = "",
137  unsigned int c = 0) : m_datapoints(datapoints),
138  m_maxCount(c),
139  m_serviceName(service),
140  m_pluginName(plugin),
141  m_assetName(asset),
142  m_eventName(event),
143  m_deprecated(deprecated)
144  {};
145 
146  inline bool operator==(const StorageAssetTrackingTuple& x) const
147  {
148  return ( x.m_serviceName==m_serviceName &&
149  x.m_pluginName==m_pluginName &&
150  x.m_assetName==m_assetName &&
151  x.m_eventName==m_eventName);
152  };
153  std::string assetToString()
154  {
155  std::ostringstream o;
156  o << "service:" << m_serviceName <<
157  ", plugin:" << m_pluginName <<
158  ", asset:" << m_assetName <<
159  ", event:" << m_eventName <<
160  ", deprecated:" << m_deprecated <<
161  ", m_datapoints:" << m_datapoints <<
162  ", m_maxCount:" << m_maxCount;
163  return o.str();
164  };
165 
166  bool isDeprecated() { return m_deprecated; };
167 
168  unsigned int getMaxCount() { return m_maxCount; }
169  std::string getDataPoints() { return m_datapoints; }
170  void unDeprecate() { m_deprecated = false; };
171  void setDeprecate() { m_deprecated = true; };
172 
173  InsertValues processData(bool storage,
174  ManagementClient *mgtClient,
175  bool &warned,
176  std::string &instanceName);
177 
178 public:
179  std::string m_datapoints;
180  unsigned int m_maxCount;
181  std::string m_serviceName;
182  std::string m_pluginName;
183  std::string m_assetName;
184  std::string m_eventName;
185 
186 private:
187  bool m_deprecated;
188 };
189 
191  bool operator()(StorageAssetTrackingTuple const* a, StorageAssetTrackingTuple const* b) const {
192  return *a == *b;
193  }
194 };
195 
196 namespace std
197 {
198  template <>
200  {
201  size_t operator()(const StorageAssetTrackingTuple& t) const
202  {
203  return (std::hash<std::string>()(t.m_serviceName +
204  t.m_pluginName +
205  t.m_assetName +
206  t.m_eventName));
207  }
208  };
209 
210  template <>
212  {
213  size_t operator()(StorageAssetTrackingTuple* t) const
214  {
215  return (std::hash<std::string>()(t->m_serviceName +
216  t->m_pluginName +
217  t->m_assetName +
218  t->m_eventName));
219  }
220  };
221 }
222 
223 typedef std::unordered_map<StorageAssetTrackingTuple*,
224  std::set<std::string>,
226  StorageAssetTrackingTuplePtrEqual> StorageAssetCacheMap;
227 typedef std::unordered_map<StorageAssetTrackingTuple*,
228  std::set<std::string>,
230  StorageAssetTrackingTuplePtrEqual>::iterator StorageAssetCacheMapItr;
231 
232 class ManagementClient;
233 
240 
241 public:
242  AssetTracker(ManagementClient *mgtClient, std::string service);
243  ~AssetTracker();
244  static AssetTracker *getAssetTracker();
245  void populateAssetTrackingCache(std::string plugin, std::string event);
246  void populateStorageAssetTrackingCache();
247  bool checkAssetTrackingCache(AssetTrackingTuple& tuple);
249  findAssetTrackingCache(AssetTrackingTuple& tuple);
250  void addAssetTrackingTuple(AssetTrackingTuple& tuple);
251  void addAssetTrackingTuple(std::string plugin, std::string asset, std::string event);
252  void addStorageAssetTrackingTuple(StorageAssetTrackingTuple& tuple,
253  std::set<std::string>& dpSet,
254  bool addObj = false);
256  findStorageAssetTrackingCache(StorageAssetTrackingTuple& tuple);
257  std::string
258  getIngestService(const std::string& asset)
259  {
260  return getService("Ingest", asset);
261  };
262  std::string
263  getEgressService(const std::string& asset)
264  {
265  return getService("Egress", asset);
266  };
267  void workerThread();
268 
269  bool getDeprecated(StorageAssetTrackingTuple* ptr);
270  void updateCache(std::set<std::string> dpSet, StorageAssetTrackingTuple* ptr);
271  std::set<std::string>
272  *getStorageAssetTrackingCacheData(StorageAssetTrackingTuple* tuple);
273  bool tune(unsigned long updateInterval);
274 
275 private:
276  std::string
277  getService(const std::string& event, const std::string& asset);
278  void queue(TrackingTuple *tuple);
279  void processQueue();
280  std::set<std::string>
281  getDataPointsSet(std::string strDatapoints);
282  bool getFledgeConfigInfo();
283 
284 private:
285  static AssetTracker *instance;
286  ManagementClient *m_mgtClient;
287  std::string m_service;
288  std::unordered_set<AssetTrackingTuple*, std::hash<AssetTrackingTuple*>, AssetTrackingTuplePtrEqual>
289  assetTrackerTuplesCache;
290  std::queue<TrackingTuple *> m_pending; // Tuples that are not yet written to the storage
291  std::thread *m_thread;
292  bool m_shutdown;
293  std::condition_variable m_cv;
294  std::mutex m_mutex;
295  std::string m_fledgeName;
296  StorageClient *m_storageClient;
297  StorageAssetCacheMap storageAssetTrackerTuplesCache;
298  unsigned int m_updateInterval;
299 };
300 
306  public:
309  void add(AssetTrackingTuple *tuple);
310  void remove(const std::string& name);
311  AssetTrackingTuple *find(const std::string& name);
312  private:
313  std::map<std::string, AssetTrackingTuple *>
314  m_tuples;
315 };
316 
317 #endif
A class to hold a set of asset tracking tuples that allows lookup by name.
Definition: asset_tracking.h:305
Definition: asset_tracking.h:102
The AssetTrackingTuple class is used to represent an asset tracking tuple.
Definition: asset_tracking.h:47
Tracking abstract base class to be passed in the process data queue.
Definition: asset_tracking.h:29
Definition: asset_tracking.h:190
Definition: asset_tracking.h:108
The management client class used by services and tasks to communicate with the management API of the ...
Definition: management_client.h:43
Client for accessing the storage service.
Definition: storage_client.h:43
The AssetTracker class provides the asset tracking functionality.
Definition: asset_tracking.h:239
Definition: insert.h:145
Definition: asset_tracking.h:211
Definition: asset_tracking.h:129