From 1c94bbb5e4bfe3c714d874f4092a0f7347f9c59c Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 19 Apr 2022 16:11:16 +0800 Subject: [PATCH] enh(query): set the status of stream scan operator. --- source/common/src/tmsg.c | 4 ++++ source/libs/executor/src/executor.c | 2 ++ source/libs/executor/src/executorimpl.c | 4 ++++ source/libs/executor/src/scanoperator.c | 11 +++++++++-- source/libs/function/src/builtinsimpl.c | 4 ---- 5 files changed, 19 insertions(+), 6 deletions(-) diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 5c97743a62..0352999b3b 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -52,6 +52,10 @@ int32_t tGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock) { if (pIter->len == 0) { pIter->len += sizeof(SSubmitReq); } else { + if (pIter->len >= pIter->totalLen) { + ASSERT(0); + } + SSubmitBlk *pSubmitBlk = (SSubmitBlk *)POINTER_SHIFT(pIter->pMsg, pIter->len); pIter->len += (sizeof(SSubmitBlk) + pSubmitBlk->dataLen + pSubmitBlk->schemaLen); ASSERT(pIter->len > 0); diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 4af7e563e6..4863b03fb9 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -34,6 +34,8 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu pOperator->status = OP_NOT_OPENED; return doSetStreamBlock(pOperator->pDownstream[0], input, numOfBlocks, type, id); } else { + pOperator->status = OP_NOT_OPENED; + SStreamBlockScanInfo* pInfo = pOperator->info; // the block type can not be changed in the streamscan operators diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 366da58079..87ecdde3ab 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4931,6 +4931,10 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator, bool* newgroup) SSDataBlock* pRes = pInfo->pRes; blockDataCleanup(pRes); + + if (pOperator->status == OP_EXEC_DONE) { + return NULL; + } #if 0 if (pProjectInfo->existDataBlock) { // TODO refactor diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index b3050006b7..6b06f3e89b 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -539,7 +539,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator, bool* newgroup) SStreamBlockScanInfo* pInfo = pOperator->info; pTaskInfo->code = pOperator->_openFn(pOperator); - if (pTaskInfo->code != TSDB_CODE_SUCCESS) { + if (pTaskInfo->code != TSDB_CODE_SUCCESS || pOperator->status == OP_EXEC_DONE) { return NULL; } @@ -547,6 +547,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator, bool* newgroup) size_t total = taosArrayGetSize(pInfo->pBlockLists); if (pInfo->validBlockIndex >= total) { doClearBufferedBlocks(pInfo); + pOperator->status = OP_EXEC_DONE; return NULL; } @@ -560,11 +561,12 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator, bool* newgroup) pTaskInfo->code = tqRetrieveDataBlockInfo(pInfo->readerHandle, pBlockInfo); if (pTaskInfo->code != TSDB_CODE_SUCCESS) { terrno = pTaskInfo->code; + pOperator->status = OP_EXEC_DONE; return NULL; } if (pBlockInfo->rows == 0) { - return NULL; + break; } SArray* pCols = tqRetrieveDataBlock(pInfo->readerHandle); @@ -583,6 +585,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator, bool* newgroup) if (pInfo->pRes->pDataBlock == NULL) { // TODO add log + pOperator->status = OP_EXEC_DONE; pTaskInfo->code = terrno; return NULL; } @@ -594,6 +597,10 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator, bool* newgroup) pInfo->numOfExec++; pInfo->numOfRows += pBlockInfo->rows; + if (pBlockInfo->rows == 0) { + pOperator->status = OP_EXEC_DONE; + } + return (pBlockInfo->rows == 0) ? NULL : pInfo->pRes; } } diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 5a32a6ffec..2b1e4b9406 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -866,10 +866,6 @@ int32_t firstFunction(SqlFunctionCtx *pCtx) { } int32_t lastFunction(SqlFunctionCtx *pCtx) { - if (pCtx->order != TSDB_ORDER_DESC) { - return 0; - } - int32_t numOfElems = 0; SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx);