Fledge
An open source edge computing platform for industrial users
omf.h
1 #ifndef _OMF_H
2 #define _OMF_H
3 /*
4  * Fledge OSIsoft OMF interface to PI Server.
5  *
6  * Copyright (c) 2018-2022 Dianomic Systems
7  *
8  * Released under the Apache 2.0 Licence
9  *
10  * Author: Massimiliano Pinto
11  */
12 #include <string>
13 #include <vector>
14 #include <map>
15 #include <unordered_map>
16 #include <reading.h>
17 #include <http_sender.h>
18 #include <zlib.h>
19 #include <rapidjson/document.h>
20 #include <omfbuffer.h>
21 #include <omferror.h>
22 #include <linkedlookup.h>
23 
24 #define OMF_HINT "OMFHint"
25 
26 #define PIWEBAPI_PIPOINTS_NOT_CREATED "One or more PI Points could not be created."
27 #define PIWEBAPI_CONTAINER_NOT_FOUND "Container not found."
28 #define PIWEBAPI_UPDATE_EXCEPTION "An exception occurred while updating data."
29 #define MESSAGE_PI_UNSTABLE "HTTP Code %d: Processing cannot continue until data archive errors are corrected"
30 #define MESSAGE_UNAUTHORIZED "OMF endpoint reported we are not authorized, please check configuration of the authentication method and credentials"
31 
32 const char *const noConnectionErrorMessages[] =
33  {"Failed to send data: Operation canceled", // PI Web API
34  "Failed to send data: Connection refused", // Edge Data Store
35  "Failed to send data: Host not found", // usually followed by "(authoritative)" or "(non-authoritative), try again later"
36  "Failed to send data: Network is unreachable",
37  ""}; // empty string marks the end of the array
38 
39 // The following will force the OMF version for EDS endpoints
40 // Remove or comment out the line below to prevent the forcing
41 // of the version
42 #define EDS_OMF_VERSION "1.0"
43 #define CR_OMF_VERSION "1.0"
44 
45 
46 #define TYPE_ID_DEFAULT 1
47 #define FAKE_ASSET_KEY "_default_start_id_"
48 #define OMF_TYPE_STRING "string"
49 #define OMF_TYPE_INTEGER "integer"
50 #define OMF_TYPE_FLOAT "number"
51 #define OMF_TYPE_UNSUPPORTED "unsupported"
52 
53 enum OMF_ENDPOINT {
54  ENDPOINT_PIWEB_API,
55  ENDPOINT_CR,
56  ENDPOINT_OCS,
57  ENDPOINT_EDS,
58  ENDPOINT_ADH
59 };
60 
61 // Documentation about the Naming Scheme available at: https://fledge-iot.readthedocs.io/en/latest/OMF.html#naming-scheme
62 enum NAMINGSCHEME_ENDPOINT {
63  NAMINGSCHEME_CONCISE,
64  NAMINGSCHEME_SUFFIX,
65  NAMINGSCHEME_HASH,
66  NAMINGSCHEME_COMPATIBILITY
67 };
68 
69 
70 using namespace std;
71 using namespace rapidjson;
72 
73 std::string ApplyPIServerNamingRules(const std::string &objName, bool *changed);
74 std::string DataPointNamesAsString(const Reading& reading);
75 
90 {
91  public:
92  long typeId;
93  std::string types;
94  unsigned long typesShort;
95  long namingScheme;
96  string afhHash;
97  string afHierarchy;
98  string afHierarchyOrig;
99 
100  unsigned short hintChkSum;
101 };
102 
103 class OMFHints;
104 
109 class OMF
110 {
111  public:
116  OMF(const std::string& name,
117  HttpSender& sender,
118  const std::string& path,
119  const long typeId,
120  const std::string& producerToken);
121 
122  OMF(const std::string& name,
123  HttpSender& sender,
124  const std::string& path,
125  std::map<std::string, OMFDataTypes>& types,
126  const std::string& producerToken);
127 
128  // Destructor
129  ~OMF();
130 
131  void setOMFVersion(std::string& omfversion)
132  {
133  m_OMFVersion = omfversion;
134  if (omfversion.compare("1.0") == 0
135  || omfversion.compare("1.1") == 0)
136  {
137  m_linkedProperties = false;
138  }
139  else
140  {
141  m_linkedProperties = true;
142  }
143  };
144 
145  void setSender(HttpSender& sender)
146  {
147  m_sender = sender;
148  };
149 
163  // Method with vector (by reference) of readings
164  uint32_t sendToServer(const std::vector<Reading>& readings,
165  bool skipSentDataTypes = true); // never called
166 
167  // Method with vector (by reference) of reading pointers
168  uint32_t sendToServer(const std::vector<Reading *>& readings,
169  bool compression, bool skipSentDataTypes = true);
170 
171  // Send a single reading (by reference)
172  uint32_t sendToServer(const Reading& reading,
173  bool skipSentDataTypes = true); // never called
174 
175  // Send a single reading pointer
176  uint32_t sendToServer(const Reading* reading,
177  bool skipSentDataTypes = true); // never called
178 
179  // Set saved OMF formats
180  void setFormatType(const std::string &key, std::string &value);
181 
182  // Set which PIServer component should be used for the communication
183  void setPIServerEndpoint(const OMF_ENDPOINT PIServerEndpoint);
184 
185  // Set the naming scheme of the objects in the endpoint
186  void setNamingScheme(const NAMINGSCHEME_ENDPOINT namingScheme) {m_NamingScheme = namingScheme;};
187 
188  // Generate the container id for the given asset
189  std::string generateMeasurementId(const string& assetName);
190 
191  // Generate a suffix for the given asset in relation to the selected naming schema and the value of the type id
192  std::string generateSuffixType(string &assetName, long typeId);
193 
194  // Generate a suffix for the given asset in relation to the selected naming schema and the value of the type id
195  long getNamingScheme(const string& assetName);
196 
197  string getHashStored(const string& assetName);
198  string getPathStored(const string& assetName);
199  string getPathOrigStored(const string& assetName);
200  bool setPathStored(const string& assetName, string &afHierarchy);
201  bool deleteAssetAFH(const string& assetName, string& path);
202  bool createAssetAFH(const string& assetName, string& path);
203 
204  // Set the first level of hierarchy in Asset Framework in which the assets will be created, PI Web API only.
205  void setDefaultAFLocation(const std::string &DefaultAFLocation);
206 
207  bool setAFMap(const std::string &AFMap);
208 
209  void setSendFullStructure(const bool sendFullStructure) {m_sendFullStructure = sendFullStructure;};
210 
211  void setPrefixAFAsset(const std::string &prefixAFAsset);
212 
213  void setDelimiter(const std::string &delimiter) {m_delimiter = delimiter;};
214 
215  void setDataActionCode(const std::string &actionCode) {m_dataActionCode = actionCode;};
216 
217  // Get saved OMF formats
218  std::string getFormatType(const std::string &key) const;
219 
220  // Set the list of errors considered not blocking
221  // in the communication with the PI Server
222  void setNotBlockingErrors(std::vector<std::string>& );
223 
224  // Compress string using gzip
225  std::string compress_string(const std::string& str,
226  int compressionlevel = Z_DEFAULT_COMPRESSION);
227 
228  // Return current value of global type-id
229  const long getTypeId() const { return m_typeId; };
230 
231  // Check DataTypeError
232  bool isDataTypeError(const char* message);
233 
234  // Check if plugin configuration is working and PI is stable
235  bool isPIstable() { return m_PIstable; };
236 
237  // Check if plugin is connected to PI
238  bool isPIconnected() { return m_connected; };
239 
240  // Set PI connection status
241  void setPIconnected(bool connectionStatus) { m_connected = connectionStatus; };
242 
243  // Map object types found in input data
244  void setMapObjectTypes(const std::vector<Reading *>& data,
245  std::map<std::string, Reading*>& dataSuperSet);
246  // Removed mapped object types found in input data
247  void unsetMapObjectTypes(std::map<std::string, Reading*>& dataSuperSet) const;
248 
249  void setStaticData(std::vector<std::pair<std::string, std::string>> *staticData)
250  {
251  m_staticData = staticData;
252  };
253 
254  void generateAFHierarchyPrefixLevel(string& path, string& prefix, string& AFHierarchyLevel);
255 
256  // Retrieve private objects
257  map<std::string, std::string> getNamesRules() const { return m_NamesRules; };
258  map<std::string, std::string> getMetadataRulesExist() const { return m_MetadataRulesExist; };
259 
260  bool getAFMapEmptyNames() const { return m_AFMapEmptyNames; };
261  bool getAFMapEmptyMetadata() const { return m_AFMapEmptyMetadata; };
262 
263  void setLegacyMode(bool legacy) { m_legacy = legacy; };
264 
265  static std::string ApplyPIServerNamingRulesObj(const std::string &objName, bool *changed);
266  static std::string ApplyPIServerNamingRulesPath(const std::string &objName, bool *changed);
267  static std::string ApplyPIServerNamingRulesInvalidChars(const std::string &objName, bool *changed);
268 
269  static std::string variableValueHandle(const Reading& reading, std::string &AFHierarchy);
270  static bool extractVariable(string &strToHandle, string &variable, string &value, string &defaultValue);
271  static void reportAsset(const string& asset, const string& level, const string& msg);
272 
273 private:
279  const std::vector<std::pair<std::string, std::string>>
280  createMessageHeader(const std::string& type, const std::string& action="create") const;
281 
282  // Create data for Type message for current row
283  const std::string createTypeData(const Reading& reading, OMFHints *hints);
284 
285  // Create data for Container message for current row
286  const std::string createContainerData(const Reading& reading, OMFHints *hints);
287 
288  // Create data for additional type message, with 'Data' for current row
289  const std::string createStaticData(const Reading& reading);
290 
291  // Create data Link message, with 'Data', for current row
292  std::string createLinkData(const Reading& reading, std::string& AFHierarchyLevel, std::string& prefix, std::string& objectPrefix, OMFHints *hints, bool legacy);
293 
299  const std::string createMessageData(Reading& reading);
300 
301  // Set the the tagName in an assetName Type message
302  void setAssetTypeTag(const std::string& assetName,
303  const std::string& tagName,
304  std::string& data);
305 
306  void setAssetTypeTagNew(const std::string& assetName,
307  const std::string& tagName,
308  std::string& data);
309 
310  // Create the OMF data types if needed
311  bool handleDataTypes(const string keyComplete,
312  const Reading& row,
313  bool skipSendingTypes, OMFHints *hints);
314 
315  // Send OMF data types
316  bool sendDataTypes(const Reading& row, OMFHints *hints);
317 
318  // Get saved dataType
319  bool getCreatedTypes(const std::string& keyComplete, const Reading& row, OMFHints *hints);
320 
321  // Set saved dataType
322  unsigned long calcTypeShort(const Reading& row);
323 
324  // Clear data types cache
325  void clearCreatedTypes();
326 
327  // Increment type-id value
328  void incrementTypeId();
329 
330  // Handle data type errors
331  bool handleTypeErrors(const string& keyComplete, const Reading& reading, OMFHints*hints);
332 
333  string errorMessageHandler(const string &msg);
334 
335  void handleRESTException(const std::exception &e, const char *mainMessage);
336 
337  void CheckHttpCode(const int httpCode, const std::string &errorMessage);
338 
339  std::string getExceptionMessage(const std::exception &e, OMFError *error);
340 
341  // Extract assetName from error message
342  std::string getAssetNameFromError(const char* message);
343 
344  // Get asset type-id from cached data
345  long getAssetTypeId(const std::string& assetName);
346 
347  // Increment per asset type-id value
348  void incrementAssetTypeId(const std::string& keyComplete);
349  void incrementAssetTypeIdOnly(const std::string& keyComplete);
350 
351  // Set global type-id as the maximum value of all per asset type-ids
352  void setTypeId();
353 
354  // Set saved dataType
355  bool setCreatedTypes(const Reading& row, OMFHints *hints);
356 
357  // Remove cached data types entry for given asset name
358  void clearCreatedTypes(const std::string& keyComplete);
359 
360  // Add the 1st level of AF hierarchy if the end point is PI Web API
361  void setAFHierarchy();
362 
363  bool handleAFHierarchy();
364  bool handleAFHierarchySystemWide();
365  bool handleOmfHintHierarchies();
366 
367  bool sendAFHierarchy(std::string AFHierarchy);
368 
369  bool sendAFHierarchyLevels(std::string parentPath, std::string path, std::string &lastLevel);
370  bool sendAFHierarchyTypes(const std::string AFHierarchyLevel, const std::string prefix);
371  bool sendAFHierarchyStatic(const std::string AFHierarchyLevel, const std::string prefix);
372  bool sendAFHierarchyLink(std::string parent, std::string child, std::string prefixIdParent, std::string prefixId);
373 
374  bool manageAFHierarchyLink(std::string parent, std::string child, std::string prefixIdParent, std::string prefixId, std::string childFull, string action);
375 
376  bool AFHierarchySendMessage(const std::string& msgType, std::string& jsonData, const std::string& action="create");
377 
378 
379  std::string generateUniquePrefixId(const std::string &path);
380  bool evaluateAFHierarchyRules(const string& assetName, const Reading& reading);
381  void retrieveAFHierarchyPrefixAssetName(const string& assetName, string& prefix, string& AFHierarchyLevel);
382  void retrieveAFHierarchyFullPrefixAssetName(const string& assetName, string& prefix, string& AFHierarchy);
383 
384  bool createAFHierarchyOmfHint(const string& assetName, const string &OmfHintHierarchy);
385 
386  bool HandleAFMapNames(Document& JSon);
387  bool HandleAFMapMetedata(Document& JSon);
388 
389  // Start of support for using linked containers
390  bool sendBaseTypes();
391  bool sendAFLinks(Reading& reading, OMFHints *hints);
392  // End of support for using linked containers
393  //
394  string createAFLinks(Reading &reading, OMFHints *hints);
395 
396 
397  private:
398  // Use for the evaluation of the OMFDataTypes.typesShort
399  union t_typeCount {
400  struct
401  {
402  unsigned char tTotal;
403  unsigned char tFloat;
404  unsigned char tString;
405  unsigned char spare0;
406 
407  unsigned char spare1;
408  unsigned char spare2;
409  unsigned char spare3;
410  unsigned char spare4;
411  } cnt;
412  unsigned long valueLong = 0;
413  };
414 
415  std::string m_assetName;
416  const std::string m_path;
417  long m_typeId;
418  const std::string m_producerToken;
419  OMF_ENDPOINT m_PIServerEndpoint;
420  NAMINGSCHEME_ENDPOINT m_NamingScheme;
421  std::string m_DefaultAFLocation;
422  bool m_sendFullStructure; // If disabled the AF hierarchy is not created.
423  std::string m_delimiter;
424  std::string m_dataActionCode;
425 
426  // Asset Framework Hierarchy Rules handling - Metadata MAP
427  // Documentation: https://fledge-iot.readthedocs.io/en/latest/plugins/fledge-north-OMF/index.html?highlight=hierarchy#asset-framework-hierarchy-rules
428  std::string m_AFMap;
429  bool m_AFMapEmptyNames; // true if there are no rules to manage
430  bool m_AFMapEmptyMetadata;
431  std::string m_AFHierarchyLevel;
432  std::string m_prefixAFAsset;
433 
434  vector<std::string> m_afhHierarchyAlreadyCreated={
435 
436  // Asset Framework path
437  // {""}
438  };
439 
440  map<std::string, std::string> m_NamesRules={
441 
442  // Asset_name - Asset Framework path
443  // {"", ""}
444  };
445 
446  map<std::string, std::string> m_MetadataRulesExist={
447 
448  // Property - Asset Framework path
449  // {"", ""}
450  };
451 
452  map<std::string, std::string> m_MetadataRulesNonExist={
453 
454  // Property - Asset Framework path
455  // {"", ""}
456  };
457 
458  map<std::string, vector<pair<string, string>>> m_MetadataRulesEqual={
459 
460  // Property - Value - Asset Framework path
461  // {"", {{"", ""}} }
462  };
463 
464  map<std::string, vector<pair<string, string>>> m_MetadataRulesNotEqual={
465 
466  // Property - Value - Asset Framework path
467  // {"", {{"", ""}} }
468  };
469 
470  map<std::string, vector<pair<string, string>>> m_AssetNamePrefix ={
471 
472  // Property - Hierarchy - prefix
473  // {"", {{"", ""}} }
474  };
475 
476  // Define the OMF format to use for each type
477  // the format will not be applied if the string is empty
478  std::map<const std::string, std::string> m_formatTypes {
479  {OMF_TYPE_STRING, ""},
480  {OMF_TYPE_INTEGER,"int64"},
481  {OMF_TYPE_FLOAT, "float64"},
482  {OMF_TYPE_UNSUPPORTED, "unsupported"}
483  };
484 
485  // Vector with OMF_TYPES
486  const std::vector<std::string> omfTypes = { OMF_TYPE_STRING,
487  OMF_TYPE_FLOAT, // Forces the creation of float also for integer numbers
488  OMF_TYPE_FLOAT,
489  OMF_TYPE_UNSUPPORTED};
490  // HTTP Sender interface
491  HttpSender& m_sender;
492  bool m_changeTypeId;
493 
494  // These errors are considered not blocking in the communication
495  // with the destination, the sending operation will proceed
496  // with the next block of data if one of these is encountered
497  std::vector<std::string> m_notBlockingErrors;
498 
499  // Data types cache[key] = (key_type_id, key data types)
500  std::map<std::string, OMFDataTypes>* m_OMFDataTypes;
501 
502  // Stores the type for the block of data containing all the used properties
503  std::map<string, Reading*> m_SuperSetDataPoints;
504 
508  std::vector<std::pair<std::string, std::string>> *m_staticData;
509 
510 
514  std::string m_OMFVersion;
515 
519  bool m_linkedProperties;
520 
527  std::unordered_map<std::string, LALookup>
528  m_linkedAssetState;
529 
533  bool m_legacy;
534 
539  static std::vector<std::string>
540  m_reportedAssets;
544  const std::string m_name;
545 
549  bool m_baseTypesSent;
550 
555  bool m_PIstable;
556 
560  bool m_connected;
561 };
562 
571 class OMFData
572 {
573  public:
574  OMFData(OMFBuffer & payload,
575  const Reading& reading,
576  string measurementId,
577  bool needDelim,
578  const OMF_ENDPOINT PIServerEndpoint = ENDPOINT_CR,
579  const std::string& DefaultAFLocation = std::string(),
580  OMFHints *hints = NULL);
581  bool hasData() { return m_hasData; };
582  private:
583  bool m_hasData;
584 };
585 
586 #endif
An encapsulation of an error return from an OMF call.
Definition: omferror.h:21
The OMF class.
Definition: omf.h:109
Definition: asset_tracking.h:108
A set of hints for a reading.
Definition: OMFHint.h:151
An asset reading represented as a class.
Definition: reading.h:33
The OMFData class.
Definition: omf.h:571
Definition: http_sender.h:20
Buffer class designed to hold OMF payloads that can grow as required but have minimal copy semantics...
Definition: omfbuffer.h:24
Per asset dataTypes - This class is used in a std::map where assetName is a key.
Definition: omf.h:89