From a1c6c94c0b3558b99a06965d2fd40e4a47fc3ef7 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Fri, 21 Jan 2022 19:00:09 +0800 Subject: [PATCH] consume skip ununsed table --- include/common/tmsg.h | 4 +- source/dnode/vnode/inc/vnode.h | 11 ++- source/dnode/vnode/src/tq/tq.c | 114 +++++++++++++++++----------- source/libs/executor/src/executor.c | 1 + 4 files changed, 82 insertions(+), 48 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 62f55609ce..bb53c6ddfa 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1590,8 +1590,8 @@ typedef struct SMqCVConsumeReq { typedef struct SMqConsumeRspBlock { int32_t bodyLen; - char topicName[TSDB_TOPIC_FNAME_LEN]; - char body[]; + char topicName[TSDB_TOPIC_FNAME_LEN]; + char body[]; } SMqConsumeRspBlock; typedef struct SMqCVConsumeRsp { diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index bb0ee8dfc4..ab538ff12d 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -68,12 +68,13 @@ typedef struct { typedef struct STqReadHandle { int64_t ver; + int64_t tbUid; SSubmitMsg* pMsg; SSubmitBlk* pBlock; SSubmitMsgIter msgIter; SSubmitBlkIter blkIter; SMeta* pMeta; - SArray* pColumnIdList; + SArray* pColIdList; } STqReadHandle; /* ------------------------ SVnode ------------------------ */ @@ -199,8 +200,12 @@ int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad); STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta); -static FORCE_INLINE void tqReadHandleSetColIdList(STqReadHandle* pReadHandle, SArray* pColumnIdList) { - pReadHandle->pColumnIdList = pColumnIdList; +static FORCE_INLINE void tqReadHandleSetColIdList(STqReadHandle* pReadHandle, SArray* pColIdList) { + pReadHandle->pColIdList = pColIdList; +} + +static FORCE_INLINE void tqReadHandleSetTbUid(STqReadHandle* pHandle, int64_t tbUid) { + pHandle->tbUid = tbUid; } void tqReadHandleSetMsg(STqReadHandle* pHandle, SSubmitMsg* pMsg, int64_t ver); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index eca02c867c..4a6f55564c 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -15,6 +15,7 @@ #include "tqInt.h" #include "tqMetaStore.h" +#include "tcompare.h" // static // read next version data @@ -424,7 +425,7 @@ int tqConsume(STQ* pTq, SRpcMsg* pReq, SRpcMsg** pRsp) { /*int code = pTq->tqLogReader->logRead(, &raw, pItem->offset);*/ /*int code = pTq->tqLogHandle->logRead(pItem->pTopic->logReader, &raw, pItem->offset);*/ /*if (code < 0) {*/ - // TODO: error + // TODO: error /*}*/ // get msgType // if submitblk @@ -610,61 +611,91 @@ int tqItemSSize() { int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp) { SMqCVConsumeReq* pReq = pMsg->pCont; + SRpcMsg rpcMsg; int64_t reqId = pReq->reqId; int64_t consumerId = pReq->consumerId; int64_t reqOffset = pReq->offset; int64_t fetchOffset = reqOffset; int64_t blockingTime = pReq->blockingTime; + int rspLen = 0; + STqConsumerHandle* pConsumer = tqHandleGet(pTq->tqMeta, consumerId); - int sz = taosArrayGetSize(pConsumer->topics); + int sz = taosArrayGetSize(pConsumer->topics); - for (int i = 0 ; i < sz; i++) { - STqTopicHandle *pTopic = taosArrayGet(pConsumer->topics, i); + for (int i = 0; i < sz; i++) { + STqTopicHandle* pTopic = taosArrayGet(pConsumer->topics, i); - int8_t pos = fetchOffset % TQ_BUFFER_SIZE; - int8_t old = atomic_val_compare_exchange_8(&pTopic->buffer.output[pos].status, 0, 1); - if (old == 1) { - // do nothing - continue; - } - if (walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) { - return -1; - } - SWalHead* pHead = pTopic->pReadhandle->pHead; + int8_t pos; + int8_t skip = 0; + SWalHead* pHead; while (1) { - // read until find TDMT_VND_SUBMIT - if (walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) { - return -1; + pos = fetchOffset % TQ_BUFFER_SIZE; + skip = atomic_val_compare_exchange_8(&pTopic->buffer.output[pos].status, 0, 1); + if (skip == 1) { + // do nothing + break; } + if (walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) { + // check err + atomic_store_8(&pTopic->buffer.output[pos].status, 0); + skip = 1; + break; + } + // read until find TDMT_VND_SUBMIT + pHead = pTopic->pReadhandle->pHead; + if (pHead->head.msgType == TDMT_VND_SUBMIT) { + break; + } + if (walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) { + atomic_store_8(&pTopic->buffer.output[pos].status, 0); + skip = 1; + break; + } + atomic_store_8(&pTopic->buffer.output[pos].status, 0); + fetchOffset++; } + if (skip == 1) continue; SSubmitMsg* pCont = (SSubmitMsg*)&pHead->head.body; - void* task = pTopic->buffer.output[pos].task; + qTaskInfo_t task = pTopic->buffer.output[pos].task; qSetStreamInput(task, pCont); - SSDataBlock* pDataBlock; - uint64_t ts; - if (qExecTask(task, &pDataBlock, &ts) < 0) { + //SArray + SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock)); + while (1) { + SSDataBlock* pDataBlock; + uint64_t ts; + if (qExecTask(task, &pDataBlock, &ts) < 0) { + break; + } + if (pDataBlock != NULL) { + taosArrayPush(pRes, pDataBlock); + } else { + break; + } } - // TODO: launch query and get output data - pTopic->buffer.output[pos].dst = pDataBlock; - if (pTopic->buffer.firstOffset == -1 - || pReq->offset < pTopic->buffer.firstOffset) { + + atomic_store_8(&pTopic->buffer.output[pos].status, 0); + + if (taosArrayGetSize(pRes) == 0) { + taosArrayDestroy(pRes); + fetchOffset++; + continue; + } + + pTopic->buffer.output[pos].dst = pRes; + if (pTopic->buffer.firstOffset == -1 || pReq->offset < pTopic->buffer.firstOffset) { pTopic->buffer.firstOffset = pReq->offset; } - if (pTopic->buffer.lastOffset == -1 - || pReq->offset > pTopic->buffer.lastOffset) { + if (pTopic->buffer.lastOffset == -1 || pReq->offset > pTopic->buffer.lastOffset) { pTopic->buffer.lastOffset = pReq->offset; } - atomic_store_8(&pTopic->buffer.output[pos].status, 1); - // put output into rsp } // launch query // get result - SMqCvConsumeRsp* pRsp; return 0; } @@ -673,14 +704,14 @@ int32_t tqProcessSetConnReq(STQ* pTq, SMqSetCVgReq* pReq) { if (pConsumer == NULL) { return -1; } - + STqTopicHandle* pTopic = calloc(sizeof(STqTopicHandle), 1); if (pTopic == NULL) { free(pConsumer); return -1; } - strcpy(pTopic->topicName, pReq->topicName); - strcpy(pTopic->cgroup, pReq->cgroup); + strcpy(pTopic->topicName, pReq->topicName); + strcpy(pTopic->cgroup, pReq->cgroup); strcpy(pTopic->sql, pReq->sql); strcpy(pTopic->logicalPlan, pReq->logicalPlan); strcpy(pTopic->physicalPlan, pReq->physicalPlan); @@ -689,7 +720,6 @@ int32_t tqProcessSetConnReq(STQ* pTq, SMqSetCVgReq* pReq) { pTopic->buffer.lastOffset = -1; pTopic->pReadhandle = walOpenReadHandle(pTq->pWal); if (pTopic->pReadhandle == NULL) { - } for (int i = 0; i < TQ_BUFFER_SIZE; i++) { pTopic->buffer.output[i].status = 0; @@ -708,7 +738,7 @@ STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta) { pReadHandle->pMeta = pMeta; pReadHandle->pMsg = NULL; pReadHandle->ver = -1; - pReadHandle->pColumnIdList = NULL; + pReadHandle->pColIdList = NULL; return NULL; } @@ -720,20 +750,18 @@ void tqReadHandleSetMsg(STqReadHandle* pReadHandle, SSubmitMsg* pMsg, int64_t ve } bool tqNextDataBlock(STqReadHandle* pHandle) { - if (tGetSubmitMsgNext(&pHandle->msgIter, &pHandle->pBlock) < 0) { - return false; + while (tGetSubmitMsgNext(&pHandle->msgIter, &pHandle->pBlock) >= 0) { + if (pHandle->tbUid == pHandle->pBlock->uid) return true; } - return true; + return false; } int tqRetrieveDataBlockInfo(STqReadHandle* pHandle, SDataBlockInfo* pBlockInfo) { - SMemRow row; - int32_t sversion = pHandle->pBlock->sversion; - SSchemaWrapper* pSchema = metaGetTableSchema(pHandle->pMeta, pHandle->pBlock->uid, sversion, false); - pBlockInfo->numOfCols = pSchema->nCols; + /*int32_t sversion = pHandle->pBlock->sversion;*/ + /*SSchemaWrapper* pSchema = metaGetTableSchema(pHandle->pMeta, pHandle->pBlock->uid, sversion, false);*/ + pBlockInfo->numOfCols = taosArrayGetSize(pHandle->pColIdList); pBlockInfo->rows = pHandle->pBlock->numOfRows; pBlockInfo->uid = pHandle->pBlock->uid; - // TODO: filter out unused column return 0; } diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index b6683e6043..b4aa506df3 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -29,6 +29,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input) { return doSetStreamBlock(pOperator->pDownstream[0], input); } + return TSDB_CODE_QRY_APP_ERROR; } else { SStreamBlockScanInfo* pInfo = pOperator->info; tqReadHandleSetMsg(pInfo->readerHandle, input, 0);