From 35749cb374279728b2ddc27d4ef8f3ddb322773b Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Fri, 21 Jan 2022 16:12:02 +0800 Subject: [PATCH 1/2] fix dependency for vnode --- include/common/tmsg.h | 19 +++++---- include/util/tcoding.h | 16 ++++--- source/dnode/vnode/inc/tq.h | 35 ++++------------ source/dnode/vnode/inc/vnode.h | 32 +++++++++++++- source/dnode/vnode/src/inc/tqInt.h | 1 + source/dnode/vnode/src/inc/vnd.h | 1 + source/dnode/vnode/src/tq/tq.c | 56 +++++++++++++++---------- source/dnode/vnode/src/vnd/vnodeMain.c | 2 +- source/libs/executor/src/executor.c | 8 ++-- source/libs/executor/src/executorimpl.c | 6 ++- source/libs/scheduler/src/scheduler.c | 8 ++-- 11 files changed, 110 insertions(+), 74 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index b468456cb7..62f55609ce 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1529,17 +1529,22 @@ typedef struct SMqSetCVgReq { } SMqSetCVgReq; static FORCE_INLINE int32_t tEncodeSSubQueryMsg(void** buf, const SSubQueryMsg* pMsg) { - int32_t tlen = sizeof(SSubQueryMsg) + pMsg->contentLen; - if (buf == NULL) return tlen; - memcpy(*buf, pMsg, tlen); - *buf = POINTER_SHIFT(*buf, tlen); + int32_t tlen = 0; + tlen += taosEncodeFixedU64(buf, pMsg->sId); + tlen += taosEncodeFixedU64(buf, pMsg->queryId); + tlen += taosEncodeFixedU64(buf, pMsg->taskId); + tlen += taosEncodeFixedU32(buf, pMsg->contentLen); + tlen += taosEncodeBinary(buf, pMsg->msg, pMsg->contentLen); return tlen; } static FORCE_INLINE void* tDecodeSSubQueryMsg(void* buf, SSubQueryMsg* pMsg) { - int32_t tlen = sizeof(SSubQueryMsg) + ((SSubQueryMsg*)buf)->contentLen; - memcpy(pMsg, buf, tlen); - return POINTER_SHIFT(buf, tlen); + buf = taosDecodeFixedU64(buf, &pMsg->sId); + buf = taosDecodeFixedU64(buf, &pMsg->queryId); + buf = taosDecodeFixedU64(buf, &pMsg->taskId); + buf = taosDecodeFixedU32(buf, &pMsg->contentLen); + buf = taosDecodeBinaryTo(buf, pMsg->msg, pMsg->contentLen); + return buf; } static FORCE_INLINE int32_t tEncodeSMqSetCVgReq(void** buf, const SMqSetCVgReq* pReq) { diff --git a/include/util/tcoding.h b/include/util/tcoding.h index 8198787048..4bb0bde4bb 100644 --- a/include/util/tcoding.h +++ b/include/util/tcoding.h @@ -372,9 +372,10 @@ static FORCE_INLINE void *taosDecodeStringTo(void *buf, char *value) { } // ---- binary -static FORCE_INLINE int taosEncodeBinary(void **buf, const void *value, int valueLen) { +static FORCE_INLINE int taosEncodeBinary(void **buf, const void *value, int32_t valueLen) { int tlen = 0; + tlen += taosEncodeVariantI32(buf, valueLen); if (buf != NULL) { memcpy(*buf, value, valueLen); *buf = POINTER_SHIFT(*buf, valueLen); @@ -384,14 +385,19 @@ static FORCE_INLINE int taosEncodeBinary(void **buf, const void *value, int valu return tlen; } -static FORCE_INLINE void *taosDecodeBinary(void *buf, void **value, int valueLen) { - uint64_t size = 0; +static FORCE_INLINE void *taosDecodeBinary(void *buf, void **value, int32_t valueLen) { *value = malloc((size_t)valueLen); if (*value == NULL) return NULL; - memcpy(*value, buf, (size_t)size); + memcpy(*value, buf, (size_t)valueLen); - return POINTER_SHIFT(buf, size); + return POINTER_SHIFT(buf, valueLen); +} + +static FORCE_INLINE void *taosDecodeBinaryTo(void *buf, void *value, int32_t valueLen) { + + memcpy(value, buf, (size_t)valueLen); + return POINTER_SHIFT(buf, valueLen); } #endif diff --git a/source/dnode/vnode/inc/tq.h b/source/dnode/vnode/inc/tq.h index f49542b5ec..9cd6c3d365 100644 --- a/source/dnode/vnode/inc/tq.h +++ b/source/dnode/vnode/inc/tq.h @@ -17,11 +17,12 @@ #define _TD_TQ_H_ #include "common.h" +#include "executor.h" +#include "vnode.h" #include "mallocator.h" #include "meta.h" #include "os.h" #include "scheduler.h" -#include "executor.h" #include "taoserror.h" #include "tlist.h" #include "tmsg.h" @@ -148,10 +149,10 @@ typedef struct STqGroup { } STqGroup; typedef struct STqTaskItem { - int8_t status; - int64_t offset; - void* dst; - qTaskInfo_t task; + int8_t status; + int64_t offset; + void* dst; + qTaskInfo_t task; } STqTaskItem; // new version @@ -184,10 +185,6 @@ typedef struct STqQueryMsg { struct STqQueryMsg* next; } STqQueryMsg; -typedef struct STqCfg { - // TODO -} STqCfg; - typedef struct STqMemRef { SMemAllocatorFactory* pAllocatorFactory; SMemAllocator* pAllocator; @@ -284,6 +281,7 @@ typedef struct STQ { STqMemRef tqMemRef; STqMetaStore* tqMeta; SWal* pWal; + SMeta* pMeta; } STQ; typedef struct STqMgmt { @@ -298,7 +296,7 @@ int tqInit(); void tqCleanUp(); // open in each vnode -STQ* tqOpen(const char* path, SWal* pWal, STqCfg* tqConfig, SMemAllocatorFactory* allocFac); +STQ* tqOpen(const char* path, SWal* pWal, SMeta* pMeta, STqCfg* tqConfig, SMemAllocatorFactory* allocFac); void tqClose(STQ*); // void* will be replace by a msg type @@ -320,23 +318,6 @@ int tqSendLaunchQuery(STqMsgItem*, int64_t offset); int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp); int32_t tqProcessSetConnReq(STQ* pTq, SMqSetCVgReq* pReq); -typedef struct STqReadHandle { - int64_t ver; - SSubmitMsg* pMsg; - SSubmitBlk* pBlock; - SSubmitMsgIter msgIter; - SSubmitBlkIter blkIter; - SMeta* pMeta; - SArray* pColumnIdList; -} STqReadHandle; - -STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta, SArray* pColumnIdList); -void tqReadHandleSetMsg(STqReadHandle* pHandle, SSubmitMsg* pMsg, int64_t ver); -bool tqNextDataBlock(STqReadHandle* pHandle); -int tqRetrieveDataBlockInfo(STqReadHandle* pHandle, SDataBlockInfo* pBlockInfo); -// return SArray -SArray* tqRetrieveDataBlock(STqReadHandle* pHandle); - #ifdef __cplusplus } #endif diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 499972f476..bb0ee8dfc4 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -22,7 +22,6 @@ #include "meta.h" #include "tarray.h" #include "tfs.h" -#include "tq.h" #include "tsdb.h" #include "wal.h" @@ -35,6 +34,12 @@ typedef struct SVnode SVnode; typedef struct SDnode SDnode; typedef int32_t (*PutReqToVQueryQFp)(SDnode *pDnode, struct SRpcMsg *pReq); +typedef struct STqCfg { + // TODO + int32_t reserved; +} STqCfg; + + typedef struct SVnodeCfg { int32_t vgId; SDnode *pDnode; @@ -61,6 +66,16 @@ typedef struct { PutReqToVQueryQFp putReqToVQueryQFp; } SVnodeOpt; +typedef struct STqReadHandle { + int64_t ver; + SSubmitMsg* pMsg; + SSubmitBlk* pBlock; + SSubmitMsgIter msgIter; + SSubmitBlkIter blkIter; + SMeta* pMeta; + SArray* pColumnIdList; +} STqReadHandle; + /* ------------------------ SVnode ------------------------ */ /** * @brief Initialize the vnode module @@ -180,6 +195,21 @@ int32_t vnodeCompact(SVnode *pVnode); int32_t vnodeSync(SVnode *pVnode); int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad); +/* ------------------------- TQ QUERY -------------------------- */ + +STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta); + +static FORCE_INLINE void tqReadHandleSetColIdList(STqReadHandle* pReadHandle, SArray* pColumnIdList) { + pReadHandle->pColumnIdList = pColumnIdList; +} + +void tqReadHandleSetMsg(STqReadHandle* pHandle, SSubmitMsg* pMsg, int64_t ver); +bool tqNextDataBlock(STqReadHandle* pHandle); +int tqRetrieveDataBlockInfo(STqReadHandle* pHandle, SDataBlockInfo* pBlockInfo); +// return SArray +SArray* tqRetrieveDataBlock(STqReadHandle* pHandle); + + #ifdef __cplusplus } #endif diff --git a/source/dnode/vnode/src/inc/tqInt.h b/source/dnode/vnode/src/inc/tqInt.h index b4e1f57384..2b4200fce5 100644 --- a/source/dnode/vnode/src/inc/tqInt.h +++ b/source/dnode/vnode/src/inc/tqInt.h @@ -17,6 +17,7 @@ #define _TD_TQ_INT_H_ #include "tq.h" +#include "meta.h" #include "tlog.h" #include "trpc.h" #ifdef __cplusplus diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index be32ed6829..1fa65b2a73 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -24,6 +24,7 @@ #include "tlockfree.h" #include "tmacro.h" #include "wal.h" +#include "tq.h" #include "vnode.h" diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 52c541dcfd..eca02c867c 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -50,7 +50,7 @@ void tqCleanUp() { taosTmrCleanUp(tqMgmt.timer); } -STQ* tqOpen(const char* path, SWal* pWal, STqCfg* tqConfig, SMemAllocatorFactory* allocFac) { +STQ* tqOpen(const char* path, SWal* pWal, SMeta* pMeta, STqCfg* tqConfig, SMemAllocatorFactory* allocFac) { STQ* pTq = malloc(sizeof(STQ)); if (pTq == NULL) { terrno = TSDB_CODE_TQ_OUT_OF_MEMORY; @@ -58,6 +58,8 @@ STQ* tqOpen(const char* path, SWal* pWal, STqCfg* tqConfig, SMemAllocatorFactory } pTq->path = strdup(path); pTq->tqConfig = tqConfig; + pTq->pWal = pWal; + pTq->pMeta = pMeta; #if 0 pTq->tqMemRef.pAllocatorFactory = allocFac; pTq->tqMemRef.pAllocator = allocFac->create(allocFac); @@ -610,48 +612,52 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp) { SMqCVConsumeReq* pReq = pMsg->pCont; int64_t reqId = pReq->reqId; int64_t consumerId = pReq->consumerId; - int64_t offset = pReq->offset; + int64_t reqOffset = pReq->offset; + int64_t fetchOffset = reqOffset; int64_t blockingTime = pReq->blockingTime; STqConsumerHandle* pConsumer = tqHandleGet(pTq->tqMeta, consumerId); int sz = taosArrayGetSize(pConsumer->topics); for (int i = 0 ; i < sz; i++) { - STqTopicHandle *pHandle = taosArrayGet(pConsumer->topics, i); + STqTopicHandle *pTopic = taosArrayGet(pConsumer->topics, i); - int8_t pos = offset % TQ_BUFFER_SIZE; - int8_t old = atomic_val_compare_exchange_8(&pHandle->buffer.output[pos].status, 0, 1); + int8_t pos = fetchOffset % TQ_BUFFER_SIZE; + int8_t old = atomic_val_compare_exchange_8(&pTopic->buffer.output[pos].status, 0, 1); if (old == 1) { // do nothing continue; } - if (walReadWithHandle(pHandle->pReadhandle, offset) < 0) { - // TODO + if (walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) { + return -1; } - SWalHead* pHead = pHandle->pReadhandle->pHead; - while (pHead->head.msgType != TDMT_VND_SUBMIT) { + SWalHead* pHead = pTopic->pReadhandle->pHead; + while (1) { // read until find TDMT_VND_SUBMIT + if (walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) { + return -1; + } } SSubmitMsg* pCont = (SSubmitMsg*)&pHead->head.body; - void* task = pHandle->buffer.output[pos].task; + void* task = pTopic->buffer.output[pos].task; - qStreamExecTaskSetInput(task, pCont); + qSetStreamInput(task, pCont); SSDataBlock* pDataBlock; uint64_t ts; if (qExecTask(task, &pDataBlock, &ts) < 0) { } // TODO: launch query and get output data - pHandle->buffer.output[pos].dst = pDataBlock; - if (pHandle->buffer.firstOffset == -1 - || pReq->offset < pHandle->buffer.firstOffset) { - pHandle->buffer.firstOffset = pReq->offset; + pTopic->buffer.output[pos].dst = pDataBlock; + if (pTopic->buffer.firstOffset == -1 + || pReq->offset < pTopic->buffer.firstOffset) { + pTopic->buffer.firstOffset = pReq->offset; } - if (pHandle->buffer.lastOffset == -1 - || pReq->offset > pHandle->buffer.lastOffset) { - pHandle->buffer.lastOffset = pReq->offset; + if (pTopic->buffer.lastOffset == -1 + || pReq->offset > pTopic->buffer.lastOffset) { + pTopic->buffer.lastOffset = pReq->offset; } - atomic_store_8(&pHandle->buffer.output[pos].status, 1); + atomic_store_8(&pTopic->buffer.output[pos].status, 1); // put output into rsp } @@ -681,16 +687,20 @@ int32_t tqProcessSetConnReq(STQ* pTq, SMqSetCVgReq* pReq) { pTopic->buffer.firstOffset = -1; pTopic->buffer.lastOffset = -1; + pTopic->pReadhandle = walOpenReadHandle(pTq->pWal); + if (pTopic->pReadhandle == NULL) { + + } for (int i = 0; i < TQ_BUFFER_SIZE; i++) { pTopic->buffer.output[i].status = 0; - pTopic->buffer.output[i].task = qCreateStreamExecTaskInfo(&pReq->msg, NULL); + STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pMeta); + pTopic->buffer.output[i].task = qCreateStreamExecTaskInfo(&pReq->msg, pReadHandle); } - pTopic->pReadhandle = walOpenReadHandle(pTq->pWal); // write mq meta return 0; } -STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta, SArray* pColumnIdList) { +STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta) { STqReadHandle* pReadHandle = malloc(sizeof(STqReadHandle)); if (pReadHandle == NULL) { return NULL; @@ -698,7 +708,7 @@ STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta, SArray* pColumnIdList) { pReadHandle->pMeta = pMeta; pReadHandle->pMsg = NULL; pReadHandle->ver = -1; - pReadHandle->pColumnIdList = pColumnIdList; + pReadHandle->pColumnIdList = NULL; return NULL; } diff --git a/source/dnode/vnode/src/vnd/vnodeMain.c b/source/dnode/vnode/src/vnd/vnodeMain.c index c4bbd93eda..6bbf3b959d 100644 --- a/source/dnode/vnode/src/vnd/vnodeMain.c +++ b/source/dnode/vnode/src/vnd/vnodeMain.c @@ -127,7 +127,7 @@ static int vnodeOpenImpl(SVnode *pVnode) { // Open TQ sprintf(dir, "%s/tq", pVnode->path); - pVnode->pTq = tqOpen(dir, pVnode->pWal, &(pVnode->config.tqCfg), vBufPoolGetMAF(pVnode)); + pVnode->pTq = tqOpen(dir, pVnode->pWal, pVnode->pMeta, &(pVnode->config.tqCfg), vBufPoolGetMAF(pVnode)); if (pVnode->pTq == NULL) { // TODO: handle error return -1; diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index ccc1620264..55f6b75fa2 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -62,10 +62,10 @@ qTaskInfo_t qCreateStreamExecTaskInfo(SSubQueryMsg* pMsg, void* streamReadHandle } // print those info into log - pMsg->sId = be64toh(pMsg->sId); - pMsg->queryId = be64toh(pMsg->queryId); - pMsg->taskId = be64toh(pMsg->taskId); - pMsg->contentLen = ntohl(pMsg->contentLen); + pMsg->sId = pMsg->sId; + pMsg->queryId = pMsg->queryId; + pMsg->taskId = pMsg->taskId; + pMsg->contentLen = pMsg->contentLen; struct SSubplan* plan = NULL; int32_t code = qStringToSubplan(pMsg->msg, &plan); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 3b01c319e4..d9b3f5f4a9 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -27,6 +27,7 @@ #include "thash.h" #include "ttypes.h" #include "query.h" +#include "vnode.h" #include "tsdb.h" #define IS_MAIN_SCAN(runtime) ((runtime)->scanFlag == MAIN_SCAN) @@ -5425,8 +5426,8 @@ SOperatorInfo* createStreamScanOperatorInfo(void *streamReadHandle, SArray* pExp taosArrayPush(pColList, &pExpr->pExpr->pSchema[0].colId); } - // TODO set the extract column id to streamHandle - // pColList + // set the extract column id to streamHandle + tqReadHandleSetColIdList((STqReadHandle* )streamReadHandle, pColList); pInfo->readerHandle = streamReadHandle; @@ -5438,6 +5439,7 @@ SOperatorInfo* createStreamScanOperatorInfo(void *streamReadHandle, SArray* pExp pOperator->numOfOutput = numOfOutput; pOperator->exec = doStreamBlockScan; pOperator->pTaskInfo = pTaskInfo; + return pOperator; } diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index ddfa73f0a5..46450d78ea 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -1485,11 +1485,11 @@ int32_t schedulerConvertDagToTaskList(SQueryDag* pDag, SArray **pTasks) { pMsg->header.vgId = htonl(tInfo.addr.nodeId); - pMsg->sId = htobe64(schMgmt.sId); - pMsg->queryId = htobe64(plan->id.queryId); - pMsg->taskId = htobe64(schGenUUID()); + pMsg->sId = schMgmt.sId; + pMsg->queryId = plan->id.queryId; + pMsg->taskId = schGenUUID(); pMsg->taskType = TASK_TYPE_PERSISTENT; - pMsg->contentLen = htonl(msgLen); + pMsg->contentLen = msgLen; memcpy(pMsg->msg, msg, msgLen); tInfo.msg = pMsg; From 99984adb45d784b086f6362c7104062ccbb98a88 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Fri, 21 Jan 2022 16:13:11 +0800 Subject: [PATCH 2/2] fix dependency for vnode --- include/util/tcoding.h | 1 - source/libs/executor/src/executor.c | 2 ++ 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/include/util/tcoding.h b/include/util/tcoding.h index 4bb0bde4bb..c105ce1ab9 100644 --- a/include/util/tcoding.h +++ b/include/util/tcoding.h @@ -375,7 +375,6 @@ static FORCE_INLINE void *taosDecodeStringTo(void *buf, char *value) { static FORCE_INLINE int taosEncodeBinary(void **buf, const void *value, int32_t valueLen) { int tlen = 0; - tlen += taosEncodeVariantI32(buf, valueLen); if (buf != NULL) { memcpy(*buf, value, valueLen); *buf = POINTER_SHIFT(*buf, valueLen); diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 55f6b75fa2..b6683e6043 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -62,10 +62,12 @@ qTaskInfo_t qCreateStreamExecTaskInfo(SSubQueryMsg* pMsg, void* streamReadHandle } // print those info into log +#if 0 pMsg->sId = pMsg->sId; pMsg->queryId = pMsg->queryId; pMsg->taskId = pMsg->taskId; pMsg->contentLen = pMsg->contentLen; +#endif struct SSubplan* plan = NULL; int32_t code = qStringToSubplan(pMsg->msg, &plan);