7 #include <condition_variable> 9 #include <storage_client.h> 11 #include <filter_pipeline.h> 12 #include <service_handler.h> 13 #include <perfmonitors.h> 15 #define DEFAULT_BLOCK_SIZE 100 26 DataLoad(
const std::string& name,
long streamId,
42 bool isRunning() {
return !m_shutdown; };
43 void configChange(
const std::string& category,
const std::string& newConfig);
44 void configChildCreate(
const std::string& ,
const std::string&,
const std::string&){};
45 void configChildDelete(
const std::string& ,
const std::string&){};
46 unsigned long getLastFetched() {
return m_lastFetched; };
47 void setBlockSize(
unsigned long blockSize)
49 m_blockSize = blockSize;
51 void setStreamUpdate(
unsigned long streamUpdate)
53 m_streamUpdate = streamUpdate;
54 m_nextStreamUpdate = streamUpdate;
57 const std::string &getName() {
return m_name; };
59 void setPrefetchLimit(
unsigned int limit)
61 m_prefetchLimit = limit;
65 void readBlock(
unsigned int blockSize);
66 unsigned int waitForReadRequest();
67 unsigned long getLastSentId();
68 int createNewStream();
69 ReadingSet *fetchStatistics(
unsigned int blockSize);
70 ReadingSet *fetchAudit(
unsigned int blockSize);
72 bool loadFilters(
const std::string& category);
75 const std::string& m_name;
78 volatile bool m_shutdown;
79 std::thread *m_thread;
81 std::condition_variable m_cv;
82 std::condition_variable m_fetchCV;
83 unsigned int m_readRequest;
84 enum { SourceReadings, SourceStatistics, SourceAudit }
86 unsigned long m_lastFetched;
87 std::deque<ReadingSet *>
91 std::mutex m_pipelineMutex;
92 unsigned long m_blockSize;
95 unsigned long m_streamSent;
96 int m_nextStreamUpdate;
97 unsigned int m_prefetchLimit;
A class used in the North service to load data from the buffer.
Definition: data_load.h:24
static void pipelineEnd(OUTPUT_HANDLE *outHandle, READINGSET *readings)
Use the current readings (they have been filtered by all filters)
Definition: data_load.cpp:611
ServiceHandler abstract class - the interface that services using the management API must provide...
Definition: service_handler.h:20
void shutdown()
External call to shutdown the north service.
Definition: data_load.cpp:87
Reading set class.
Definition: reading_set.h:26
ReadingSet * fetchReadings(bool wait)
Fetch Readings.
Definition: data_load.cpp:417
Client for accessing the storage service.
Definition: storage_client.h:43
void flushLastSentId()
Flush the last sent Id to the storeage layer.
Definition: data_load.cpp:512
The FilterPipeline class is used to represent a pipeline of filters applicable to a task/service...
Definition: filter_pipeline.h:28
bool setDataSource(const std::string &source)
Set the source of data for the service.
Definition: data_load.cpp:107
virtual ~DataLoad()
DataLoad destructor.
Definition: data_load.cpp:55
static void passToOnwardFilter(OUTPUT_HANDLE *outHandle, READINGSET *readings)
Pass the current readings set to the next filter in the pipeline.
Definition: data_load.cpp:581
void updateLastSentId(unsigned long id)
Update the last sent ID for our stream.
Definition: data_load.cpp:498
void loadThread()
The background thread that loads data from the database.
Definition: data_load.cpp:127
void triggerRead(unsigned int blockSize)
Trigger the loading thread to read a block of data.
Definition: data_load.cpp:166
void configChange(const std::string &category, const std::string &newConfig)
Configuration change for one of the filters or to the pipeline.
Definition: data_load.cpp:652
DataLoad(const std::string &name, long streamId, StorageClient *storage)
DataLoad Constructor.
Definition: data_load.cpp:30
void restart()
External call to restart the north service.
Definition: data_load.cpp:97