From 4b25449576ac0d0c182e5f1b57ba2238f9233b5e Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Thu, 23 Dec 2021 13:25:55 +0800 Subject: [PATCH] tq consume refactor --- include/dnode/vnode/tq/tq.h | 65 +++++++++++++++++----------- source/dnode/vnode/tq/src/tq.c | 78 ++++++++++++++++++++-------------- 2 files changed, 87 insertions(+), 56 deletions(-) diff --git a/include/dnode/vnode/tq/tq.h b/include/dnode/vnode/tq/tq.h index 5eeaaa1011..f5d5cc9a16 100644 --- a/include/dnode/vnode/tq/tq.h +++ b/include/dnode/vnode/tq/tq.h @@ -23,6 +23,7 @@ #include "taosmsg.h" #include "tlist.h" #include "trpc.h" +#include "ttimer.h" #include "tutil.h" #ifdef __cplusplus @@ -103,7 +104,7 @@ typedef struct STqTopicVhandle { typedef struct STqExec { void* runtimeEnv; SSDataBlock* (*exec)(void* runtimeEnv); - void* (*assign)(void* runtimeEnv, SSubmitBlk* inputData); + void* (*assign)(void* runtimeEnv, void* inputData); void (*clear)(void* runtimeEnv); char* (*serialize)(struct STqExec*); struct STqExec* (*deserialize)(char*); @@ -114,33 +115,33 @@ typedef struct STqRspHandle { void* ahandle; } STqRspHandle; -typedef enum { - TQ_ITEM_READY, - TQ_ITEM_PROCESS, - TQ_ITEM_EMPTY -} STqItemStatus; +typedef enum { TQ_ITEM_READY, TQ_ITEM_PROCESS, TQ_ITEM_EMPTY } STqItemStatus; + +typedef struct STqTopic STqTopic; typedef struct STqBufferItem { int64_t offset; // executors are identical but not concurrent // so there must be a copy in each item - STqExec* executor; - int32_t status; - int64_t size; - void* content; + STqExec* executor; + int32_t status; + int64_t size; + void* content; + STqTopic* pTopic; } STqMsgItem; -typedef struct STqTopic { +struct STqTopic { // char* topic; //c style, end with '\0' // int64_t cgId; // void* ahandle; + // int32_t head; + // int32_t tail; int64_t nextConsumeOffset; int64_t floatingCursor; int64_t topicId; - int32_t head; - int32_t tail; + void* logReader; STqMsgItem buffer[TQ_BUFFER_SIZE]; -} STqTopic; +}; typedef struct STqListHandle { STqTopic topic; @@ -148,11 +149,11 @@ typedef struct STqListHandle { } STqList; typedef struct STqGroup { - int64_t clientId; - int64_t cgId; - void* ahandle; - int32_t topicNum; - //STqList* head; + int64_t clientId; + int64_t cgId; + void* ahandle; + int32_t topicNum; + STqList* head; SList* topicList; // SList STqRspHandle rspHandle; } STqGroup; @@ -162,20 +163,23 @@ typedef struct STqQueryMsg { struct STqQueryMsg* next; } STqQueryMsg; -typedef struct STqLogReader { +typedef struct STqLogHandle { void* logHandle; - int32_t (*logRead)(void* logHandle, void** data, int64_t ver); + void* (*openLogReader)(void* logHandle); + void (*closeLogReader)(void* logReader); + int32_t (*logRead)(void* logReader, void** data, int64_t ver); + int64_t (*logGetFirstVer)(void* logHandle); int64_t (*logGetSnapshotVer)(void* logHandle); int64_t (*logGetLastVer)(void* logHandle); -} STqLogReader; +} STqLogHandle; typedef struct STqCfg { // TODO } STqCfg; typedef struct STqMemRef { - SMemAllocatorFactory* pAlloctorFactory; + SMemAllocatorFactory* pAllocatorFactory; SMemAllocator* pAllocator; } STqMemRef; @@ -265,13 +269,24 @@ typedef struct STQ { // the handle of meta kvstore char* path; STqCfg* tqConfig; - STqLogReader* tqLogReader; + STqLogHandle* tqLogHandle; STqMemRef tqMemRef; STqMetaStore* tqMeta; } STQ; +typedef struct STqMgmt { + int8_t inited; + tmr_h timer; +} STqMgmt; + +static STqMgmt tqMgmt; + +// init once +int tqInit(); +void tqCleanUp(); + // open in each vnode -STQ* tqOpen(const char* path, STqCfg* tqConfig, STqLogReader* tqLogReader, SMemAllocatorFactory* allocFac); +STQ* tqOpen(const char* path, STqCfg* tqConfig, STqLogHandle* tqLogHandle, SMemAllocatorFactory* allocFac); void tqClose(STQ*); // void* will be replace by a msg type diff --git a/source/dnode/vnode/tq/src/tq.c b/source/dnode/vnode/tq/src/tq.c index 972740183d..9e5aaf61fc 100644 --- a/source/dnode/vnode/tq/src/tq.c +++ b/source/dnode/vnode/tq/src/tq.c @@ -37,7 +37,22 @@ void* tqSerializeItem(STqMsgItem* pItem, void* ptr); const void* tqDeserializeTopic(const void* pBytes, STqTopic* pTopic); const void* tqDeserializeItem(const void* pBytes, STqMsgItem* pItem); -STQ* tqOpen(const char* path, STqCfg* tqConfig, STqLogReader* tqLogReader, SMemAllocatorFactory* allocFac) { +int tqInit() { + int8_t old = atomic_val_compare_exchange_8(&tqMgmt.inited, 0, 1); + if(old == 1) return 0; + + tqMgmt.timer = taosTmrInit(0, 0, 0, "TQ"); + return 0; +} + +void tqCleanUp() { + int8_t old = atomic_val_compare_exchange_8(&tqMgmt.inited, 1, 0); + if(old == 0) return; + taosTmrStop(tqMgmt.timer); + taosTmrCleanUp(tqMgmt.timer); +} + +STQ* tqOpen(const char* path, STqCfg* tqConfig, STqLogHandle* tqLogHandle, SMemAllocatorFactory* allocFac) { STQ* pTq = malloc(sizeof(STQ)); if (pTq == NULL) { terrno = TSDB_CODE_TQ_OUT_OF_MEMORY; @@ -45,8 +60,8 @@ STQ* tqOpen(const char* path, STqCfg* tqConfig, STqLogReader* tqLogReader, SMemA } pTq->path = strdup(path); pTq->tqConfig = tqConfig; - pTq->tqLogReader = tqLogReader; - pTq->tqMemRef.pAlloctorFactory = allocFac; + pTq->tqLogHandle = tqLogHandle; + pTq->tqMemRef.pAllocatorFactory = allocFac; pTq->tqMemRef.pAllocator = allocFac->create(allocFac); if (pTq->tqMemRef.pAllocator == NULL) { // TODO: error code of buffer pool @@ -360,9 +375,9 @@ int tqConsume(STQ* pTq, SRpcMsg* pReq, SRpcMsg** pRsp) { (*pRsp)->pCont = ptr; break; } - *((int64_t*)buffer) = htonll(pTopic->topicId); + *((int64_t*)buffer) = htobe64(pTopic->topicId); buffer = POINTER_SHIFT(buffer, sizeof(int64_t)); - *((int64_t*)buffer) = htonll(pTopic->buffer[idx].size); + *((int64_t*)buffer) = htobe64(pTopic->buffer[idx].size); buffer = POINTER_SHIFT(buffer, sizeof(int64_t)); memcpy(buffer, pTopic->buffer[idx].content, pTopic->buffer[idx].size); buffer = POINTER_SHIFT(buffer, pTopic->buffer[idx].size); @@ -384,41 +399,42 @@ int tqConsume(STQ* pTq, SRpcMsg* pReq, SRpcMsg** pRsp) { } else { ASSERT(0); } - } } + if (numOfMsgs > 0) { + // set code and other msg + rpcSendResponse(*pRsp); + } else { + // most recent data has been fetched + + // enable timer for blocking wait + // once new data written when waiting, launch query and rsp + } + + // fetched a num of msgs, rpc response for(int i = 0; i < pArray->size; i++) { STqMsgItem* pItem = taosArrayGet(pArray, i); - void* raw; //read from wal + void* raw = NULL; + /*int code = pTq->tqLogReader->logRead(, &raw, pItem->offset);*/ + int code = pTq->tqLogHandle->logRead(pItem->pTopic->logReader, &raw, pItem->offset); + if(code < 0) { + //TODO: error + } //get msgType //if submitblk pItem->executor->assign(pItem->executor->runtimeEnv, raw); SSDataBlock* content = pItem->executor->exec(pItem->executor->runtimeEnv); pItem->content = content; //if other type, send just put into buffer - pItem->content = raw; + /*pItem->content = raw;*/ int32_t old = atomic_val_compare_exchange_32(&pItem->status, TQ_ITEM_PROCESS, TQ_ITEM_READY); ASSERT(old == TQ_ITEM_PROCESS); - } - - if (numOfMsgs < 0) { - return -1; - } - - if (numOfMsgs == 0) { - // most recent data has been fetched - - // enable timer for blocking wait - // once new data written when waiting, launch query and rsp - return -1; - } - - // fetched a num of msgs, rpc response + taosArrayDestroy(pArray); return 0; } @@ -500,10 +516,10 @@ void* tqSerializeTopic(STqTopic* pTopic, void* ptr) { ptr = POINTER_SHIFT(ptr, sizeof(int64_t)); *(int64_t*)ptr = pTopic->topicId; ptr = POINTER_SHIFT(ptr, sizeof(int64_t)); - *(int32_t*)ptr = pTopic->head; - ptr = POINTER_SHIFT(ptr, sizeof(int32_t)); - *(int32_t*)ptr = pTopic->tail; - ptr = POINTER_SHIFT(ptr, sizeof(int32_t)); + /**(int32_t*)ptr = pTopic->head;*/ + /*ptr = POINTER_SHIFT(ptr, sizeof(int32_t));*/ + /**(int32_t*)ptr = pTopic->tail;*/ + /*ptr = POINTER_SHIFT(ptr, sizeof(int32_t));*/ for (int i = 0; i < TQ_BUFFER_SIZE; i++) { ptr = tqSerializeItem(&pTopic->buffer[i], ptr); } @@ -557,10 +573,10 @@ const void* tqDeserializeTopic(const void* pBytes, STqTopic* topic) { ptr = POINTER_SHIFT(ptr, sizeof(int64_t)); topic->topicId = *(int64_t*)ptr; ptr = POINTER_SHIFT(ptr, sizeof(int64_t)); - topic->head = *(int32_t*)ptr; - ptr = POINTER_SHIFT(ptr, sizeof(int32_t)); - topic->tail = *(int32_t*)ptr; - ptr = POINTER_SHIFT(ptr, sizeof(int32_t)); + /*topic->head = *(int32_t*)ptr;*/ + /*ptr = POINTER_SHIFT(ptr, sizeof(int32_t));*/ + /*topic->tail = *(int32_t*)ptr;*/ + /*ptr = POINTER_SHIFT(ptr, sizeof(int32_t));*/ for (int i = 0; i < TQ_BUFFER_SIZE; i++) { ptr = tqDeserializeItem(ptr, &topic->buffer[i]); }