Fledge
An open source edge computing platform for industrial users
stream_handler.h
1 #ifndef _STREAM_HANDLER_H
2 #define _STREAM_HANDLER_H
3 /*
4  * Fledge storage service.
5  *
6  * Copyright (c) 2019 Dianomic Systems Inc.
7  *
8  * Released under the Apache 2.0 Licence
9  *
10  * Author: Mark Riddoch
11  */
12 #include <thread>
13 #include <mutex>
14 #include <condition_variable>
15 #include <vector>
16 #include <map>
17 #include <sys/epoll.h>
18 #include <reading_stream.h>
19 
20 #define MAX_EVENTS 40 // Number of epoll events in one epoll_wait call
21 #define RDS_BLOCK 10000 // Number of readings to insert in each call to the storage plugin
22 #define BLOCK_POOL_SIZES 512 // Increments of block sizes in a block pool
23 
24 class StorageApi;
25 
27  public:
30  void handler();
31  uint32_t createStream(uint32_t *token);
32  private:
33  class Stream {
34  public:
35  Stream();
36  ~Stream();
37  uint32_t create(int epollfd, uint32_t *token);
38  void handleEvent(int epollfd, StorageApi *api, uint32_t events);
39  private:
46  class MemoryPool {
47  public:
48  MemoryPool(size_t blkIncr) : m_blkIncr(blkIncr) {};
49  ~MemoryPool();
50  void *allocate(size_t size);
51  void release(void *handle);
52  private:
53  size_t rndSize(size_t size)
54  {
55  return m_blkIncr * ((size + m_blkIncr - 1)
56  / m_blkIncr);
57  };
58  void createPool(size_t size);
59  void growPool(std::vector<void *>*, size_t);
60  size_t m_blkIncr;
61  std::map<size_t, std::vector<void *>* >
62  m_pool;
63  };
64  void setNonBlocking(int fd);
65  unsigned int available(int fd);
66  void queueInsert(StorageApi *api, unsigned int nReadings, bool commit);
67  void dump(int n);
68  enum { Closed, Listen, AwaitingToken, Connected }
69  m_status;
70  int m_socket;
71  uint16_t m_port;
72  uint32_t m_token;
73  uint32_t m_blockNo;
74  enum { BlkHdr, RdHdr, RdBody }
75  m_protocolState;
76  uint32_t m_readingNo;
77  uint32_t m_blockSize;
78  size_t m_readingSize;
79  struct epoll_event
80  m_event;
81  ReadingStream *m_readings[RDS_BLOCK+1];
82  ReadingStream *m_currentReading;
83  MemoryPool *m_blockPool;
84  std::string m_lastAsset;
85  bool m_sameAsset;
86  };
87  StorageApi *m_api;
88  std::thread m_handlerThread;
89  int m_tokens;
90  std::condition_variable m_streamsCV;
91  std::mutex m_streamsMutex;
92  std::vector<Stream *> m_streams;
93  bool m_running;
94  int m_pollfd;
95 };
96 #endif
Definition: stream_handler.h:26
uint32_t createStream(uint32_t *token)
Create a new stream and add it to the epoll mechanism for the stream handler.
Definition: stream_handler.cpp:108
~StreamHandler()
Destructor for the StreamHandler.
Definition: stream_handler.cpp:51
StreamHandler(StorageApi *)
Constructor for the StreamHandler class.
Definition: stream_handler.cpp:40
Definition: reading_stream.h:42
void handler()
The handler method for the stream handler.
Definition: stream_handler.cpp:63
The Storage API class - this class is responsible for the registration of all API entry points in the...
Definition: storage_api.h:84