Fledge
An open source edge computing platform for industrial users
omf.h
1 #ifndef _OMF_H
2 #define _OMF_H
3 /*
4  * Fledge OSIsoft OMF interface to PI Server.
5  *
6  * Copyright (c) 2018-2025 Dianomic Systems
7  *
8  * Released under the Apache 2.0 Licence
9  *
10  * Author: Massimiliano Pinto
11  */
12 #include <string>
13 #include <vector>
14 #include <map>
15 #include <unordered_map>
16 #include <reading.h>
17 #include <http_sender.h>
18 #include <zlib.h>
19 #include <rapidjson/document.h>
20 #include <omfbuffer.h>
21 #include <omferror.h>
22 #include <linkedlookup.h>
23 
24 #define OMF_HINT "OMFHint"
25 
26 #define PIWEBAPI_PIPOINTS_NOT_CREATED "One or more PI Points could not be created."
27 #define PIWEBAPI_CONTAINER_NOT_FOUND "Container not found."
28 #define PIWEBAPI_UPDATE_EXCEPTION "An exception occurred while updating data."
29 #define MESSAGE_PI_UNSTABLE "HTTP Code %d: Processing cannot continue until data archive errors are corrected"
30 #define MESSAGE_UNAUTHORIZED "OMF endpoint reported we are not authorized, please check configuration of the authentication method and credentials"
31 
32 const char *const noConnectionErrorMessages[] =
33  {"Failed to send data: Operation canceled", // PI Web API
34  "Failed to send data: Connection refused", // Edge Data Store
35  "Failed to send data: Host not found", // usually followed by "(authoritative)" or "(non-authoritative), try again later"
36  "Failed to send data: Network is unreachable",
37  ""}; // empty string marks the end of the array
38 
39 // The following will force the OMF version for EDS endpoints
40 // Remove or comment out the line below to prevent the forcing
41 // of the version
42 #define EDS_OMF_VERSION "1.0"
43 #define CR_OMF_VERSION "1.0"
44 
45 
46 #define TYPE_ID_DEFAULT 1
47 #define FAKE_ASSET_KEY "_default_start_id_"
48 #define OMF_TYPE_STRING "string"
49 #define OMF_TYPE_INTEGER "integer"
50 #define OMF_TYPE_FLOAT "number"
51 #define OMF_TYPE_UNSUPPORTED "unsupported"
52 
53 enum OMF_ENDPOINT {
54  ENDPOINT_PIWEB_API,
55  ENDPOINT_CR,
56  ENDPOINT_OCS,
57  ENDPOINT_EDS,
58  ENDPOINT_ADH
59 };
60 
61 // Documentation about the Naming Scheme available at: https://fledge-iot.readthedocs.io/en/latest/OMF.html#naming-scheme
62 enum NAMINGSCHEME_ENDPOINT {
63  NAMINGSCHEME_CONCISE,
64  NAMINGSCHEME_SUFFIX,
65  NAMINGSCHEME_HASH,
66  NAMINGSCHEME_COMPATIBILITY
67 };
68 
69 
70 using namespace std;
71 using namespace rapidjson;
72 
73 std::string ApplyPIServerNamingRules(const std::string &objName, bool *changed);
74 std::string DataPointNamesAsString(const Reading& reading);
75 
90 {
91  public:
92  long typeId;
93  std::string types;
94  unsigned long typesShort;
95  long namingScheme;
96  string afhHash;
97  string afHierarchy;
98  string afHierarchyOrig;
99 
100  unsigned short hintChkSum;
101 };
102 
103 class OMFHints;
104 
109 class OMF
110 {
111  public:
116  OMF(const std::string& name,
117  HttpSender& sender,
118  const std::string& path,
119  const long typeId,
120  const std::string& producerToken);
121 
122  OMF(const std::string& name,
123  HttpSender& sender,
124  const std::string& path,
125  std::map<std::string, OMFDataTypes>& types,
126  const std::string& producerToken);
127 
128  // Destructor
129  ~OMF();
130 
131  void setOMFVersion(std::string& omfversion)
132  {
133  m_OMFVersion = omfversion;
134  if (omfversion.compare("1.0") == 0
135  || omfversion.compare("1.1") == 0)
136  {
137  m_linkedProperties = false;
138  }
139  else
140  {
141  m_linkedProperties = true;
142  }
143  };
144 
145  void setSender(HttpSender& sender)
146  {
147  m_sender = sender;
148  };
149 
163  // Method with vector (by reference) of readings
164  uint32_t sendToServer(const std::vector<Reading>& readings,
165  bool skipSentDataTypes = true); // never called
166 
167  // Method with vector (by reference) of reading pointers
168  uint32_t sendToServer(const std::vector<Reading *>& readings,
169  bool compression, bool skipSentDataTypes = true);
170 
171  // Send a single reading (by reference)
172  uint32_t sendToServer(const Reading& reading,
173  bool skipSentDataTypes = true); // never called
174 
175  // Send a single reading pointer
176  uint32_t sendToServer(const Reading* reading,
177  bool skipSentDataTypes = true); // never called
178 
179  // Set saved OMF formats
180  void setFormatType(const std::string &key, std::string &value);
181 
182  // Set which PIServer component should be used for the communication
183  void setPIServerEndpoint(const OMF_ENDPOINT PIServerEndpoint);
184 
185  // Set the naming scheme of the objects in the endpoint
186  void setNamingScheme(const NAMINGSCHEME_ENDPOINT namingScheme) {m_NamingScheme = namingScheme;};
187 
188  // Generate the container id for the given asset
189  std::string generateMeasurementId(const string& assetName);
190 
191  // Generate a suffix for the given asset in relation to the selected naming schema and the value of the type id
192  std::string generateSuffixType(string &assetName, long typeId);
193 
194  // Generate a suffix for the given asset in relation to the selected naming schema and the value of the type id
195  long getNamingScheme(const string& assetName);
196 
197  string getHashStored(const string& assetName);
198  string getPathStored(const string& assetName);
199  string getPathOrigStored(const string& assetName);
200  bool setPathStored(const string& assetName, string &afHierarchy);
201  bool deleteAssetAFH(const string& assetName, string& path);
202  bool createAssetAFH(const string& assetName, string& path);
203 
204  // Set the first level of hierarchy in Asset Framework in which the assets will be created, PI Web API only.
205  void setDefaultAFLocation(const std::string &DefaultAFLocation);
206 
207  bool setAFMap(const std::string &AFMap);
208 
209  void setSendFullStructure(const bool sendFullStructure) {m_sendFullStructure = sendFullStructure;};
210 
211  void setPrefixAFAsset(const std::string &prefixAFAsset);
212 
213  void setDelimiter(const std::string &delimiter) {m_delimiter = delimiter;};
214 
215  void setDataActionCode(const std::string &actionCode) {m_dataActionCode = actionCode;};
216 
217  // Get saved OMF formats
218  std::string getFormatType(const std::string &key) const;
219 
220  // Set the list of errors considered not blocking
221  // in the communication with the PI Server
222  void setNotBlockingErrors(std::vector<std::string>& );
223 
224  // Compress string using gzip
225  std::string compress_string(const std::string& str,
226  int compressionlevel = Z_DEFAULT_COMPRESSION);
227 
228  // Return current value of global type-id
229  const long getTypeId() const { return m_typeId; };
230 
231  // Check DataTypeError
232  bool isDataTypeError(const char* message);
233 
234  // Check if plugin configuration is working and PI is stable
235  bool isPIstable() { return m_PIstable; };
236 
237  // Check if plugin is connected to PI
238  bool isPIconnected() { return m_connected; };
239 
240  // Set PI connection status
241  void setPIconnected(bool connectionStatus) { m_connected = connectionStatus; };
242 
243  // Get and Set number of blocks of Readings
244  std::size_t getNumBlocks() { return m_numBlocks; };
245  void setNumBlocks(std::size_t numBlocks) { m_numBlocks = numBlocks; };
246 
247  // Map object types found in input data
248  void setMapObjectTypes(const std::vector<Reading *>& data,
249  std::map<std::string, Reading*>& dataSuperSet);
250  // Removed mapped object types found in input data
251  void unsetMapObjectTypes(std::map<std::string, Reading*>& dataSuperSet) const;
252 
253  void setStaticData(std::vector<std::pair<std::string, std::string>> *staticData)
254  {
255  m_staticData = staticData;
256  };
257 
258  void generateAFHierarchyPrefixLevel(string& path, string& prefix, string& AFHierarchyLevel);
259 
260  // Retrieve private objects
261  map<std::string, std::string> getNamesRules() const { return m_NamesRules; };
262  map<std::string, std::string> getMetadataRulesExist() const { return m_MetadataRulesExist; };
263 
264  bool getAFMapEmptyNames() const { return m_AFMapEmptyNames; };
265  bool getAFMapEmptyMetadata() const { return m_AFMapEmptyMetadata; };
266 
267  void setLegacyMode(bool legacy) { m_legacy = legacy; };
268 
269  static std::string ApplyPIServerNamingRulesObj(const std::string &objName, bool *changed);
270  static std::string ApplyPIServerNamingRulesPath(const std::string &objName, bool *changed);
271  static std::string ApplyPIServerNamingRulesInvalidChars(const std::string &objName, bool *changed);
272 
273  static std::string variableValueHandle(const Reading& reading, std::string &AFHierarchy);
274  static bool extractVariable(string &strToHandle, string &variable, string &value, string &defaultValue);
275  static void reportAsset(const string& asset, const string& level, const string& msg);
276 
277 private:
283  const std::vector<std::pair<std::string, std::string>>
284  createMessageHeader(const std::string& type, const std::string& action="create") const;
285 
286  // Create data for Type message for current row
287  const std::string createTypeData(const Reading& reading, OMFHints *hints);
288 
289  // Create data for Container message for current row
290  const std::string createContainerData(const Reading& reading, OMFHints *hints);
291 
292  // Create data for additional type message, with 'Data' for current row
293  const std::string createStaticData(const Reading& reading);
294 
295  // Create data Link message, with 'Data', for current row
296  std::string createLinkData(const Reading& reading, std::string& AFHierarchyLevel, std::string& prefix, std::string& objectPrefix, OMFHints *hints, bool legacy);
297 
303  const std::string createMessageData(Reading& reading);
304 
305  // Set the the tagName in an assetName Type message
306  void setAssetTypeTag(const std::string& assetName,
307  const std::string& tagName,
308  std::string& data);
309 
310  void setAssetTypeTagNew(const std::string& assetName,
311  const std::string& tagName,
312  std::string& data);
313 
314  // Create the OMF data types if needed
315  bool handleDataTypes(const string keyComplete,
316  const Reading& row,
317  bool skipSendingTypes, OMFHints *hints);
318 
319  // Send OMF data types
320  bool sendDataTypes(const Reading& row, OMFHints *hints);
321 
322  // Get saved dataType
323  bool getCreatedTypes(const std::string& keyComplete, const Reading& row, OMFHints *hints);
324 
325  // Set saved dataType
326  unsigned long calcTypeShort(const Reading& row);
327 
328  // Clear data types cache
329  void clearCreatedTypes();
330 
331  // Increment type-id value
332  void incrementTypeId();
333 
334  // Handle data type errors
335  bool handleTypeErrors(const string& keyComplete, const Reading& reading, OMFHints*hints);
336 
337  string errorMessageHandler(const string &msg);
338 
339  void handleRESTException(const std::exception &e, const char *mainMessage);
340 
341  void CheckHttpCode(const int httpCode, const std::string &errorMessage);
342 
343  std::string getExceptionMessage(const std::exception &e, OMFError *error);
344 
345  // Extract assetName from error message
346  std::string getAssetNameFromError(const char* message);
347 
348  // Get asset type-id from cached data
349  long getAssetTypeId(const std::string& assetName);
350 
351  // Increment per asset type-id value
352  void incrementAssetTypeId(const std::string& keyComplete);
353  void incrementAssetTypeIdOnly(const std::string& keyComplete);
354 
355  // Set global type-id as the maximum value of all per asset type-ids
356  void setTypeId();
357 
358  // Set saved dataType
359  bool setCreatedTypes(const Reading& row, OMFHints *hints);
360 
361  // Remove cached data types entry for given asset name
362  void clearCreatedTypes(const std::string& keyComplete);
363 
364  // Add the 1st level of AF hierarchy if the end point is PI Web API
365  void setAFHierarchy();
366 
367  bool handleAFHierarchy();
368  bool handleAFHierarchySystemWide();
369  bool handleOmfHintHierarchies();
370 
371  bool sendAFHierarchy(std::string AFHierarchy);
372 
373  bool sendAFHierarchyLevels(std::string parentPath, std::string path, std::string &lastLevel);
374  bool sendAFHierarchyTypes(const std::string AFHierarchyLevel, const std::string prefix);
375  bool sendAFHierarchyStatic(const std::string AFHierarchyLevel, const std::string prefix);
376  bool sendAFHierarchyLink(std::string parent, std::string child, std::string prefixIdParent, std::string prefixId);
377 
378  bool manageAFHierarchyLink(std::string parent, std::string child, std::string prefixIdParent, std::string prefixId, std::string childFull, string action);
379 
380  bool AFHierarchySendMessage(const std::string& msgType, std::string& jsonData, const std::string& action="create");
381 
382 
383  std::string generateUniquePrefixId(const std::string &path);
384  bool evaluateAFHierarchyRules(const string& assetName, const Reading& reading);
385  void retrieveAFHierarchyPrefixAssetName(const string& assetName, string& prefix, string& AFHierarchyLevel);
386  void retrieveAFHierarchyFullPrefixAssetName(const string& assetName, string& prefix, string& AFHierarchy);
387 
388  bool createAFHierarchyOmfHint(const string& assetName, const string &OmfHintHierarchy);
389 
390  bool HandleAFMapNames(Document& JSon);
391  bool HandleAFMapMetedata(Document& JSon);
392 
393  // Start of support for using linked containers
394  bool sendBaseTypes();
395  bool sendFledgeAssetType();
396  bool sendAFLinks(Reading& reading, OMFHints *hints);
397  // End of support for using linked containers
398  //
399  string createAFLinks(Reading &reading, OMFHints *hints);
400 
401 
402  private:
403  // Use for the evaluation of the OMFDataTypes.typesShort
404  union t_typeCount {
405  struct
406  {
407  unsigned char tTotal;
408  unsigned char tFloat;
409  unsigned char tString;
410  unsigned char spare0;
411 
412  unsigned char spare1;
413  unsigned char spare2;
414  unsigned char spare3;
415  unsigned char spare4;
416  } cnt;
417  unsigned long valueLong = 0;
418  };
419 
420  std::string m_assetName;
421  const std::string m_path;
422  long m_typeId;
423  const std::string m_producerToken;
424  OMF_ENDPOINT m_PIServerEndpoint;
425  NAMINGSCHEME_ENDPOINT m_NamingScheme;
426  std::string m_DefaultAFLocation;
427  bool m_sendFullStructure; // If disabled the AF hierarchy is not created.
428  std::string m_delimiter;
429  std::string m_dataActionCode;
430 
431  // Asset Framework Hierarchy Rules handling - Metadata MAP
432  // Documentation: https://fledge-iot.readthedocs.io/en/latest/plugins/fledge-north-OMF/index.html?highlight=hierarchy#asset-framework-hierarchy-rules
433  std::string m_AFMap;
434  bool m_AFMapEmptyNames; // true if there are no rules to manage
435  bool m_AFMapEmptyMetadata;
436  std::string m_AFHierarchyLevel;
437  std::string m_prefixAFAsset;
438 
439  vector<std::string> m_afhHierarchyAlreadyCreated={
440 
441  // Asset Framework path
442  // {""}
443  };
444 
445  map<std::string, std::string> m_NamesRules={
446 
447  // Asset_name - Asset Framework path
448  // {"", ""}
449  };
450 
451  map<std::string, std::string> m_MetadataRulesExist={
452 
453  // Property - Asset Framework path
454  // {"", ""}
455  };
456 
457  map<std::string, std::string> m_MetadataRulesNonExist={
458 
459  // Property - Asset Framework path
460  // {"", ""}
461  };
462 
463  map<std::string, vector<pair<string, string>>> m_MetadataRulesEqual={
464 
465  // Property - Value - Asset Framework path
466  // {"", {{"", ""}} }
467  };
468 
469  map<std::string, vector<pair<string, string>>> m_MetadataRulesNotEqual={
470 
471  // Property - Value - Asset Framework path
472  // {"", {{"", ""}} }
473  };
474 
475  map<std::string, vector<pair<string, string>>> m_AssetNamePrefix ={
476 
477  // Property - Hierarchy - prefix
478  // {"", {{"", ""}} }
479  };
480 
481  // Define the OMF format to use for each type
482  // the format will not be applied if the string is empty
483  std::map<const std::string, std::string> m_formatTypes {
484  {OMF_TYPE_STRING, ""},
485  {OMF_TYPE_INTEGER,"int64"},
486  {OMF_TYPE_FLOAT, "float64"},
487  {OMF_TYPE_UNSUPPORTED, "unsupported"}
488  };
489 
490  // Vector with OMF_TYPES
491  const std::vector<std::string> omfTypes = { OMF_TYPE_STRING,
492  OMF_TYPE_FLOAT, // Forces the creation of float also for integer numbers
493  OMF_TYPE_FLOAT,
494  OMF_TYPE_UNSUPPORTED};
495  // HTTP Sender interface
496  HttpSender& m_sender;
497  bool m_changeTypeId;
498 
499  // These errors are considered not blocking in the communication
500  // with the destination, the sending operation will proceed
501  // with the next block of data if one of these is encountered
502  std::vector<std::string> m_notBlockingErrors;
503 
504  // Data types cache[key] = (key_type_id, key data types)
505  std::map<std::string, OMFDataTypes>* m_OMFDataTypes;
506 
507  // Stores the type for the block of data containing all the used properties
508  std::map<string, Reading*> m_SuperSetDataPoints;
509 
513  std::vector<std::pair<std::string, std::string>> *m_staticData;
514 
515 
519  std::string m_OMFVersion;
520 
524  bool m_linkedProperties;
525 
532  std::unordered_map<std::string, LALookup>
533  m_linkedAssetState;
534 
538  bool m_legacy;
539 
544  static std::vector<std::string>
545  m_reportedAssets;
549  const std::string m_name;
550 
554  bool m_baseTypesSent;
555 
560  bool m_PIstable;
561 
565  bool m_connected;
566 
570  std::size_t m_numBlocks;
571  };
572 
581 class OMFData
582 {
583  public:
584  OMFData(OMFBuffer & payload,
585  const Reading& reading,
586  string measurementId,
587  bool needDelim,
588  const OMF_ENDPOINT PIServerEndpoint = ENDPOINT_CR,
589  const std::string& DefaultAFLocation = std::string(),
590  OMFHints *hints = NULL);
591  bool hasData() { return m_hasData; };
592  private:
593  bool m_hasData;
594 };
595 
596 #endif
Reading
An asset reading represented as a class.
Definition: reading.h:33
OMF
The OMF class.
Definition: omf.h:109
OMFData
The OMFData class.
Definition: omf.h:581
OMFBuffer
Buffer class designed to hold OMF payloads that can grow as required but have minimal copy semantics.
Definition: omfbuffer.h:24
HttpSender
Definition: http_sender.h:20
OMFDataTypes
Per asset dataTypes - This class is used in a std::map where assetName is a key.
Definition: omf.h:89
OMFError
An encapsulation of an error return from an OMF call.
Definition: omferror.h:21
OMFHints
A set of hints for a reading.
Definition: OMFHint.h:160