Fledge
An open source edge computing platform for industrial users
stream_handler.h
1 #ifndef _STREAM_HANDLER_H
2 #define _STREAM_HANDLER_H
3 /*
4  * Fledge storage service.
5  *
6  * Copyright (c) 2019 Dianomic Systems Inc.
7  *
8  * Released under the Apache 2.0 Licence
9  *
10  * Author: Mark Riddoch
11  */
12 #include <thread>
13 #include <mutex>
14 #include <condition_variable>
15 #include <vector>
16 #include <map>
17 #include <sys/epoll.h>
18 #include <reading_stream.h>
19 #include <string>
20 
21 #define MAX_EVENTS 40 // Number of epoll events in one epoll_wait call
22 #define RDS_BLOCK 10000 // Number of readings to insert in each call to the storage plugin
23 #define BLOCK_POOL_SIZES 512 // Increments of block sizes in a block pool
24 
25 class StorageApi;
26 
28  public:
31  void handler();
32  uint32_t createStream(uint32_t *token);
33  private:
34  class Stream {
35  public:
36  Stream();
37  ~Stream();
38  uint32_t create(int epollfd, uint32_t *token);
39  void handleEvent(int epollfd, StorageApi *api, uint32_t events);
40  private:
47  class MemoryPool {
48  public:
49  MemoryPool(size_t blkIncr) : m_blkIncr(blkIncr) {};
50  ~MemoryPool();
51  void *allocate(size_t size);
52  void release(void *handle);
53  private:
54  size_t rndSize(size_t size)
55  {
56  return m_blkIncr * ((size + m_blkIncr - 1)
57  / m_blkIncr);
58  };
59  void createPool(size_t size);
60  void growPool(std::vector<void *>*, size_t);
61  size_t m_blkIncr;
62  std::map<size_t, std::vector<void *>* >
63  m_pool;
64  };
65  void setNonBlocking(int fd);
66  unsigned int available(int fd);
67  void queueInsert(StorageApi *api, unsigned int nReadings, bool commit);
68  void dump(int n);
69  enum { Closed, Listen, AwaitingToken, Connected }
70  m_status;
71  int m_socket;
72  uint16_t m_port;
73  uint32_t m_token;
74  uint32_t m_blockNo;
75  enum { BlkHdr, RdHdr, RdBody }
76  m_protocolState;
77  uint32_t m_readingNo;
78  uint32_t m_blockSize;
79  size_t m_readingSize;
80  struct epoll_event
81  m_event;
82  ReadingStream *m_readings[RDS_BLOCK+1];
83  ReadingStream *m_currentReading;
84  MemoryPool *m_blockPool;
85  std::string m_lastAsset;
86  bool m_sameAsset;
87  };
88  StorageApi *m_api;
89  std::thread m_handlerThread;
90  int m_tokens;
91  std::condition_variable m_streamsCV;
92  std::mutex m_streamsMutex;
93  std::vector<Stream *> m_streams;
94  bool m_running;
95  int m_pollfd;
96 };
97 #endif
StreamHandler
Definition: stream_handler.h:27
StreamHandler::~StreamHandler
~StreamHandler()
Destructor for the StreamHandler.
Definition: stream_handler.cpp:51
ReadingStream
Definition: reading_stream.h:42
StreamHandler::handler
void handler()
The handler method for the stream handler.
Definition: stream_handler.cpp:63
StorageApi
The Storage API class - this class is responsible for the registration of all API entry points in the...
Definition: storage_api.h:84
StreamHandler::createStream
uint32_t createStream(uint32_t *token)
Create a new stream and add it to the epoll mechanism for the stream handler.
Definition: stream_handler.cpp:108
StreamHandler::StreamHandler
StreamHandler(StorageApi *)
Constructor for the StreamHandler class.
Definition: stream_handler.cpp:40