tq consume refactor
This commit is contained in:
parent
6beee204d8
commit
4b25449576
|
@ -23,6 +23,7 @@
|
|||
#include "taosmsg.h"
|
||||
#include "tlist.h"
|
||||
#include "trpc.h"
|
||||
#include "ttimer.h"
|
||||
#include "tutil.h"
|
||||
|
||||
#ifdef __cplusplus
|
||||
|
@ -103,7 +104,7 @@ typedef struct STqTopicVhandle {
|
|||
typedef struct STqExec {
|
||||
void* runtimeEnv;
|
||||
SSDataBlock* (*exec)(void* runtimeEnv);
|
||||
void* (*assign)(void* runtimeEnv, SSubmitBlk* inputData);
|
||||
void* (*assign)(void* runtimeEnv, void* inputData);
|
||||
void (*clear)(void* runtimeEnv);
|
||||
char* (*serialize)(struct STqExec*);
|
||||
struct STqExec* (*deserialize)(char*);
|
||||
|
@ -114,11 +115,9 @@ typedef struct STqRspHandle {
|
|||
void* ahandle;
|
||||
} STqRspHandle;
|
||||
|
||||
typedef enum {
|
||||
TQ_ITEM_READY,
|
||||
TQ_ITEM_PROCESS,
|
||||
TQ_ITEM_EMPTY
|
||||
} STqItemStatus;
|
||||
typedef enum { TQ_ITEM_READY, TQ_ITEM_PROCESS, TQ_ITEM_EMPTY } STqItemStatus;
|
||||
|
||||
typedef struct STqTopic STqTopic;
|
||||
|
||||
typedef struct STqBufferItem {
|
||||
int64_t offset;
|
||||
|
@ -128,19 +127,21 @@ typedef struct STqBufferItem {
|
|||
int32_t status;
|
||||
int64_t size;
|
||||
void* content;
|
||||
STqTopic* pTopic;
|
||||
} STqMsgItem;
|
||||
|
||||
typedef struct STqTopic {
|
||||
struct STqTopic {
|
||||
// char* topic; //c style, end with '\0'
|
||||
// int64_t cgId;
|
||||
// void* ahandle;
|
||||
// int32_t head;
|
||||
// int32_t tail;
|
||||
int64_t nextConsumeOffset;
|
||||
int64_t floatingCursor;
|
||||
int64_t topicId;
|
||||
int32_t head;
|
||||
int32_t tail;
|
||||
void* logReader;
|
||||
STqMsgItem buffer[TQ_BUFFER_SIZE];
|
||||
} STqTopic;
|
||||
};
|
||||
|
||||
typedef struct STqListHandle {
|
||||
STqTopic topic;
|
||||
|
@ -152,7 +153,7 @@ typedef struct STqGroup {
|
|||
int64_t cgId;
|
||||
void* ahandle;
|
||||
int32_t topicNum;
|
||||
//STqList* head;
|
||||
STqList* head;
|
||||
SList* topicList; // SList<STqTopic>
|
||||
STqRspHandle rspHandle;
|
||||
} STqGroup;
|
||||
|
@ -162,20 +163,23 @@ typedef struct STqQueryMsg {
|
|||
struct STqQueryMsg* next;
|
||||
} STqQueryMsg;
|
||||
|
||||
typedef struct STqLogReader {
|
||||
typedef struct STqLogHandle {
|
||||
void* logHandle;
|
||||
int32_t (*logRead)(void* logHandle, void** data, int64_t ver);
|
||||
void* (*openLogReader)(void* logHandle);
|
||||
void (*closeLogReader)(void* logReader);
|
||||
int32_t (*logRead)(void* logReader, void** data, int64_t ver);
|
||||
|
||||
int64_t (*logGetFirstVer)(void* logHandle);
|
||||
int64_t (*logGetSnapshotVer)(void* logHandle);
|
||||
int64_t (*logGetLastVer)(void* logHandle);
|
||||
} STqLogReader;
|
||||
} STqLogHandle;
|
||||
|
||||
typedef struct STqCfg {
|
||||
// TODO
|
||||
} STqCfg;
|
||||
|
||||
typedef struct STqMemRef {
|
||||
SMemAllocatorFactory* pAlloctorFactory;
|
||||
SMemAllocatorFactory* pAllocatorFactory;
|
||||
SMemAllocator* pAllocator;
|
||||
} STqMemRef;
|
||||
|
||||
|
@ -265,13 +269,24 @@ typedef struct STQ {
|
|||
// the handle of meta kvstore
|
||||
char* path;
|
||||
STqCfg* tqConfig;
|
||||
STqLogReader* tqLogReader;
|
||||
STqLogHandle* tqLogHandle;
|
||||
STqMemRef tqMemRef;
|
||||
STqMetaStore* tqMeta;
|
||||
} STQ;
|
||||
|
||||
typedef struct STqMgmt {
|
||||
int8_t inited;
|
||||
tmr_h timer;
|
||||
} STqMgmt;
|
||||
|
||||
static STqMgmt tqMgmt;
|
||||
|
||||
// init once
|
||||
int tqInit();
|
||||
void tqCleanUp();
|
||||
|
||||
// open in each vnode
|
||||
STQ* tqOpen(const char* path, STqCfg* tqConfig, STqLogReader* tqLogReader, SMemAllocatorFactory* allocFac);
|
||||
STQ* tqOpen(const char* path, STqCfg* tqConfig, STqLogHandle* tqLogHandle, SMemAllocatorFactory* allocFac);
|
||||
void tqClose(STQ*);
|
||||
|
||||
// void* will be replace by a msg type
|
||||
|
|
|
@ -37,7 +37,22 @@ void* tqSerializeItem(STqMsgItem* pItem, void* ptr);
|
|||
const void* tqDeserializeTopic(const void* pBytes, STqTopic* pTopic);
|
||||
const void* tqDeserializeItem(const void* pBytes, STqMsgItem* pItem);
|
||||
|
||||
STQ* tqOpen(const char* path, STqCfg* tqConfig, STqLogReader* tqLogReader, SMemAllocatorFactory* allocFac) {
|
||||
int tqInit() {
|
||||
int8_t old = atomic_val_compare_exchange_8(&tqMgmt.inited, 0, 1);
|
||||
if(old == 1) return 0;
|
||||
|
||||
tqMgmt.timer = taosTmrInit(0, 0, 0, "TQ");
|
||||
return 0;
|
||||
}
|
||||
|
||||
void tqCleanUp() {
|
||||
int8_t old = atomic_val_compare_exchange_8(&tqMgmt.inited, 1, 0);
|
||||
if(old == 0) return;
|
||||
taosTmrStop(tqMgmt.timer);
|
||||
taosTmrCleanUp(tqMgmt.timer);
|
||||
}
|
||||
|
||||
STQ* tqOpen(const char* path, STqCfg* tqConfig, STqLogHandle* tqLogHandle, SMemAllocatorFactory* allocFac) {
|
||||
STQ* pTq = malloc(sizeof(STQ));
|
||||
if (pTq == NULL) {
|
||||
terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
|
||||
|
@ -45,8 +60,8 @@ STQ* tqOpen(const char* path, STqCfg* tqConfig, STqLogReader* tqLogReader, SMemA
|
|||
}
|
||||
pTq->path = strdup(path);
|
||||
pTq->tqConfig = tqConfig;
|
||||
pTq->tqLogReader = tqLogReader;
|
||||
pTq->tqMemRef.pAlloctorFactory = allocFac;
|
||||
pTq->tqLogHandle = tqLogHandle;
|
||||
pTq->tqMemRef.pAllocatorFactory = allocFac;
|
||||
pTq->tqMemRef.pAllocator = allocFac->create(allocFac);
|
||||
if (pTq->tqMemRef.pAllocator == NULL) {
|
||||
// TODO: error code of buffer pool
|
||||
|
@ -360,9 +375,9 @@ int tqConsume(STQ* pTq, SRpcMsg* pReq, SRpcMsg** pRsp) {
|
|||
(*pRsp)->pCont = ptr;
|
||||
break;
|
||||
}
|
||||
*((int64_t*)buffer) = htonll(pTopic->topicId);
|
||||
*((int64_t*)buffer) = htobe64(pTopic->topicId);
|
||||
buffer = POINTER_SHIFT(buffer, sizeof(int64_t));
|
||||
*((int64_t*)buffer) = htonll(pTopic->buffer[idx].size);
|
||||
*((int64_t*)buffer) = htobe64(pTopic->buffer[idx].size);
|
||||
buffer = POINTER_SHIFT(buffer, sizeof(int64_t));
|
||||
memcpy(buffer, pTopic->buffer[idx].content, pTopic->buffer[idx].size);
|
||||
buffer = POINTER_SHIFT(buffer, pTopic->buffer[idx].size);
|
||||
|
@ -384,41 +399,42 @@ int tqConsume(STQ* pTq, SRpcMsg* pReq, SRpcMsg** pRsp) {
|
|||
} else {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
if (numOfMsgs > 0) {
|
||||
// set code and other msg
|
||||
rpcSendResponse(*pRsp);
|
||||
} else {
|
||||
// most recent data has been fetched
|
||||
|
||||
// enable timer for blocking wait
|
||||
// once new data written when waiting, launch query and rsp
|
||||
}
|
||||
|
||||
// fetched a num of msgs, rpc response
|
||||
for(int i = 0; i < pArray->size; i++) {
|
||||
STqMsgItem* pItem = taosArrayGet(pArray, i);
|
||||
|
||||
void* raw;
|
||||
//read from wal
|
||||
void* raw = NULL;
|
||||
/*int code = pTq->tqLogReader->logRead(, &raw, pItem->offset);*/
|
||||
int code = pTq->tqLogHandle->logRead(pItem->pTopic->logReader, &raw, pItem->offset);
|
||||
if(code < 0) {
|
||||
//TODO: error
|
||||
}
|
||||
//get msgType
|
||||
//if submitblk
|
||||
pItem->executor->assign(pItem->executor->runtimeEnv, raw);
|
||||
SSDataBlock* content = pItem->executor->exec(pItem->executor->runtimeEnv);
|
||||
pItem->content = content;
|
||||
//if other type, send just put into buffer
|
||||
pItem->content = raw;
|
||||
/*pItem->content = raw;*/
|
||||
|
||||
int32_t old = atomic_val_compare_exchange_32(&pItem->status, TQ_ITEM_PROCESS, TQ_ITEM_READY);
|
||||
ASSERT(old == TQ_ITEM_PROCESS);
|
||||
|
||||
}
|
||||
|
||||
if (numOfMsgs < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (numOfMsgs == 0) {
|
||||
// most recent data has been fetched
|
||||
|
||||
// enable timer for blocking wait
|
||||
// once new data written when waiting, launch query and rsp
|
||||
return -1;
|
||||
}
|
||||
|
||||
// fetched a num of msgs, rpc response
|
||||
taosArrayDestroy(pArray);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -500,10 +516,10 @@ void* tqSerializeTopic(STqTopic* pTopic, void* ptr) {
|
|||
ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
|
||||
*(int64_t*)ptr = pTopic->topicId;
|
||||
ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
|
||||
*(int32_t*)ptr = pTopic->head;
|
||||
ptr = POINTER_SHIFT(ptr, sizeof(int32_t));
|
||||
*(int32_t*)ptr = pTopic->tail;
|
||||
ptr = POINTER_SHIFT(ptr, sizeof(int32_t));
|
||||
/**(int32_t*)ptr = pTopic->head;*/
|
||||
/*ptr = POINTER_SHIFT(ptr, sizeof(int32_t));*/
|
||||
/**(int32_t*)ptr = pTopic->tail;*/
|
||||
/*ptr = POINTER_SHIFT(ptr, sizeof(int32_t));*/
|
||||
for (int i = 0; i < TQ_BUFFER_SIZE; i++) {
|
||||
ptr = tqSerializeItem(&pTopic->buffer[i], ptr);
|
||||
}
|
||||
|
@ -557,10 +573,10 @@ const void* tqDeserializeTopic(const void* pBytes, STqTopic* topic) {
|
|||
ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
|
||||
topic->topicId = *(int64_t*)ptr;
|
||||
ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
|
||||
topic->head = *(int32_t*)ptr;
|
||||
ptr = POINTER_SHIFT(ptr, sizeof(int32_t));
|
||||
topic->tail = *(int32_t*)ptr;
|
||||
ptr = POINTER_SHIFT(ptr, sizeof(int32_t));
|
||||
/*topic->head = *(int32_t*)ptr;*/
|
||||
/*ptr = POINTER_SHIFT(ptr, sizeof(int32_t));*/
|
||||
/*topic->tail = *(int32_t*)ptr;*/
|
||||
/*ptr = POINTER_SHIFT(ptr, sizeof(int32_t));*/
|
||||
for (int i = 0; i < TQ_BUFFER_SIZE; i++) {
|
||||
ptr = tqDeserializeItem(ptr, &topic->buffer[i]);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue