add mock interface

This commit is contained in:
Liu Jicong 2021-12-20 09:55:56 +08:00
parent 0e69034454
commit 894ff93085
2 changed files with 20 additions and 6 deletions

View File

@ -95,14 +95,24 @@ typedef struct STqTopicVhandle {
#define TQ_BUFFER_SIZE 8 #define TQ_BUFFER_SIZE 8
typedef struct STqExec {
void* runtimeEnv;
// return type will be SSDataBlock
void* (*exec)(void* runtimeEnv);
// inputData type will be submitblk
void* (*assign)(void* runtimeEnv, void* inputData);
char* (*serialize)(struct STqExec*);
struct STqExec* (*deserialize)(char*);
} STqExec;
typedef struct STqBufferItem { typedef struct STqBufferItem {
int64_t offset; int64_t offset;
// executors are identical but not concurrent // executors are identical but not concurrent
// so there must be a copy in each item // so there must be a copy in each item
void* executor; STqExec* executor;
int32_t status; int32_t status;
int64_t size; int64_t size;
void* content; void* content;
} STqMsgItem; } STqMsgItem;
typedef struct STqTopic { typedef struct STqTopic {
@ -248,10 +258,12 @@ typedef struct STQ {
STqLogReader* tqLogReader; STqLogReader* tqLogReader;
STqMemRef tqMemRef; STqMemRef tqMemRef;
STqMetaStore* tqMeta; STqMetaStore* tqMeta;
STqExec* tqExec;
} STQ; } STQ;
// open in each vnode // open in each vnode
STQ* tqOpen(const char* path, STqCfg* tqConfig, STqLogReader* tqLogReader, SMemAllocatorFactory* allocFac); STQ* tqOpen(const char* path, STqCfg* tqConfig, STqLogReader* tqLogReader, SMemAllocatorFactory* allocFac,
STqExec* tqExec);
void tqClose(STQ*); void tqClose(STQ*);
// void* will be replace by a msg type // void* will be replace by a msg type

View File

@ -35,7 +35,8 @@ void* tqSerializeItem(STqMsgItem* pItem, void* ptr);
const void* tqDeserializeTopic(const void* pBytes, STqTopic* pTopic); const void* tqDeserializeTopic(const void* pBytes, STqTopic* pTopic);
const void* tqDeserializeItem(const void* pBytes, STqMsgItem* pItem); const void* tqDeserializeItem(const void* pBytes, STqMsgItem* pItem);
STQ* tqOpen(const char* path, STqCfg* tqConfig, STqLogReader* tqLogReader, SMemAllocatorFactory* allocFac) { STQ* tqOpen(const char* path, STqCfg* tqConfig, STqLogReader* tqLogReader, SMemAllocatorFactory* allocFac,
STqExec* tqExec) {
STQ* pTq = malloc(sizeof(STQ)); STQ* pTq = malloc(sizeof(STQ));
if (pTq == NULL) { if (pTq == NULL) {
// TODO: memory error // TODO: memory error
@ -54,6 +55,7 @@ STQ* tqOpen(const char* path, STqCfg* tqConfig, STqLogReader* tqLogReader, SMemA
// TODO: free STQ // TODO: free STQ
return NULL; return NULL;
} }
pTq->tqExec = tqExec;
return pTq; return pTq;
} }