From fcb058cee4a8681b7d7351ade53dfab532a428ed Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 21 Mar 2022 16:15:51 +0800 Subject: [PATCH] [td-13039] support scan ssdatablock. --- include/libs/executor/executor.h | 6 ++++- source/dnode/vnode/src/tq/tq.c | 2 +- source/libs/executor/inc/executorimpl.h | 2 ++ source/libs/executor/src/executor.c | 34 ++++++++++++++++++++----- source/libs/executor/src/executorimpl.c | 13 ++++++++-- 5 files changed, 47 insertions(+), 10 deletions(-) diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index d4af51fc21..b08ee5303d 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -32,6 +32,9 @@ typedef struct SReadHandle { void* meta; } SReadHandle; +#define STREAM_DATA_TYPE_SUBMIT_BLOCK 0x1 +#define STREAM_DATA_TYPE_SSDAT_BLOCK 0x2 + /** * Create the exec task for streaming mode * @param pMsg @@ -44,9 +47,10 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void *msg, void* streamReadHandle); * Set the input data block for the stream scan. * @param tinfo * @param input + * @param type * @return */ -int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input); +int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type); /** * Update the table id list, add or remove. diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 02fecb49b7..b94e7d7c03 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -282,7 +282,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { if (pHead->head.msgType == TDMT_VND_SUBMIT) { SSubmitReq* pCont = (SSubmitReq*)&pHead->head.body; qTaskInfo_t task = pTopic->buffer.output[pos].task; - qSetStreamInput(task, pCont); + qSetStreamInput(task, pCont, STREAM_DATA_TYPE_SUBMIT_BLOCK); SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock)); while (1) { SSDataBlock* pDataBlock; diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 0399a78b09..fd0c6bd675 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -422,6 +422,8 @@ typedef struct STagScanInfo { typedef struct SStreamBlockScanInfo { SSDataBlock* pRes; // result SSDataBlock + int32_t blockType; // current block type + bool blockValid; // Is current data has returned? SColumnInfo* pCols; // the output column info uint64_t numOfRows; // total scanned rows uint64_t numOfExec; // execution times diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index a8602b7c77..e6cdbcf10f 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -18,7 +18,7 @@ #include "planner.h" #include "tq.h" -static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, char* id) { +static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, int32_t type, char* id) { ASSERT(pOperator != NULL); if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { if (pOperator->numOfDownstream == 0) { @@ -31,18 +31,40 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, char* id) return TSDB_CODE_QRY_APP_ERROR; } - return doSetStreamBlock(pOperator->pDownstream[0], input, id); + return doSetStreamBlock(pOperator->pDownstream[0], input, type, id); } else { SStreamBlockScanInfo* pInfo = pOperator->info; - if (tqReadHandleSetMsg(pInfo->readerHandle, input, 0) < 0) { - qError("submit msg messed up when initing stream block, %s" PRIx64, id); + + // the block type can not be changed in the streamscan operators + if (pInfo->blockType == 0) { + pInfo->blockType = type; + } else if (pInfo->blockType != type) { return TSDB_CODE_QRY_APP_ERROR; } + + if (type == STREAM_DATA_TYPE_SUBMIT_BLOCK) { + if (tqReadHandleSetMsg(pInfo->readerHandle, input, 0) < 0) { + qError("submit msg messed up when initing stream block, %s" PRIx64, id); + return TSDB_CODE_QRY_APP_ERROR; + } + } else { + ASSERT(!pInfo->blockValid); + + SSDataBlock* pDataBlock = input; + pInfo->pRes->info = pDataBlock->info; + for(int32_t i = 0; i < pInfo->pRes->info.numOfCols; ++i) { + pInfo->pRes->pDataBlock = pDataBlock->pDataBlock; + } + + // set current block valid. + pInfo->blockValid = true; + } + return TSDB_CODE_SUCCESS; } } -int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input) { +int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type) { if (tinfo == NULL) { return TSDB_CODE_QRY_APP_ERROR; } @@ -53,7 +75,7 @@ int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; - int32_t code = doSetStreamBlock(pTaskInfo->pRoot, (void*)input, GET_TASKID(pTaskInfo)); + int32_t code = doSetStreamBlock(pTaskInfo->pRoot, (void*)input, type, GET_TASKID(pTaskInfo)); if (code != TSDB_CODE_SUCCESS) { qError("%s failed to set the stream block data", GET_TASKID(pTaskInfo)); } else { diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 947fb08ff9..befb210b80 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4869,17 +4869,26 @@ static SSDataBlock* doBlockInfoScan(SOperatorInfo *pOperator, bool* newgroup) { } static SSDataBlock* doStreamBlockScan(SOperatorInfo *pOperator, bool* newgroup) { - // NOTE: this operator never check if current status is done or not + // NOTE: this operator does never check if current status is done or not SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SStreamBlockScanInfo* pInfo = pOperator->info; + if (pInfo->blockType == STREAM_DATA_TYPE_SSDAT_BLOCK) { + if (pInfo->blockValid) { + pInfo->blockValid = false; // this block can only be used once. + return pInfo->pRes; + } else { + return NULL; + } + } + pTaskInfo->code = pOperator->_openFn(pOperator); if (pTaskInfo->code != TSDB_CODE_SUCCESS) { return NULL; } SDataBlockInfo* pBlockInfo = &pInfo->pRes->info; - pBlockInfo->rows = 0; + blockDataClearup(pInfo->pRes); while (tqNextDataBlock(pInfo->readerHandle)) { pTaskInfo->code = tqRetrieveDataBlockInfo(pInfo->readerHandle, pBlockInfo);