diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 9cafb4ee04..5379a8f712 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -61,7 +61,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, void* streamReadHandle); * @param type * @return */ -int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type); +int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type, bool assignUid); /** * Set multiple input data blocks for the stream scan. @@ -71,7 +71,7 @@ int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type); * @param type * @return */ -int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type); +int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type, bool assignUid); /** * Update the table id list, add or remove. diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 88af049d0b..df10d9d533 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -374,7 +374,7 @@ static FORCE_INLINE int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int3 smaDebug("vgId:%d execute rsma %" PRIi8 " task for qTaskInfo:%p suid:%" PRIu64, SMA_VID(pSma), level, taskInfo, suid); - qSetStreamInput(taskInfo, pMsg, inputType); + qSetStreamInput(taskInfo, pMsg, inputType, true); while (1) { SSDataBlock *output = NULL; uint64_t ts; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 9941b00ff7..192016166a 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -264,7 +264,7 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_ if (pExec->subType == TOPIC_SUB_TYPE__TABLE) { qTaskInfo_t task = pExec->task[workerId]; ASSERT(task); - qSetStreamInput(task, pReq, STREAM_DATA_TYPE_SUBMIT_BLOCK); + qSetStreamInput(task, pReq, STREAM_DATA_TYPE_SUBMIT_BLOCK, false); while (1) { SSDataBlock* pDataBlock = NULL; uint64_t ts = 0; @@ -510,7 +510,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { if (pExec->subType == TOPIC_SUB_TYPE__TABLE) { qTaskInfo_t task = pExec->task[workerId]; ASSERT(task); - qSetStreamInput(task, pCont, STREAM_DATA_TYPE_SUBMIT_BLOCK); + qSetStreamInput(task, pCont, STREAM_DATA_TYPE_SUBMIT_BLOCK, false); while (1) { SSDataBlock* pDataBlock = NULL; uint64_t ts = 0; diff --git a/source/dnode/vnode/src/tsdb/tsdbSma.c b/source/dnode/vnode/src/tsdb/tsdbSma.c index 18cf18dbad..2819ad3249 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSma.c +++ b/source/dnode/vnode/src/tsdb/tsdbSma.c @@ -2015,7 +2015,7 @@ static FORCE_INLINE int32_t tsdbExecuteRSmaImpl(STsdb *pTsdb, const void *pMsg, tsdbDebug("vgId:%d execute rsma %" PRIi8 " task for qTaskInfo:%p suid:%" PRIu64, REPO_ID(pTsdb), level, taskInfo, suid); - qSetStreamInput(taskInfo, pMsg, inputType); + qSetStreamInput(taskInfo, pMsg, inputType, true); while (1) { SSDataBlock *output = NULL; uint64_t ts; diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index dc12f47c65..c96eb57fa8 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -371,6 +371,7 @@ typedef struct SessionWindowSupporter { SStreamAggSupporter* pStreamAggSup; int64_t gap; } SessionWindowSupporter; + typedef struct SStreamBlockScanInfo { SArray* pBlockLists; // multiple SSDatablock. SSDataBlock* pRes; // result SSDataBlock @@ -379,7 +380,6 @@ typedef struct SStreamBlockScanInfo { int32_t blockType; // current block type int32_t validBlockIndex; // Is current data has returned? SColumnInfo* pCols; // the output column info - uint64_t numOfRows; // total scanned rows uint64_t numOfExec; // execution times void* streamBlockReader;// stream block reader handle SArray* pColMatchInfo; // @@ -394,8 +394,9 @@ typedef struct SStreamBlockScanInfo { SOperatorInfo* pOperatorDumy; SInterval interval; // if the upstream is an interval operator, the interval info is also kept here. SCatchSupporter childAggSup; - SArray* childIds; + SArray* childIds; SessionWindowSupporter sessionSup; + bool assignBlockUid; // assign block uid to groupId, temporarily used for generating rollup SMA. } SStreamBlockScanInfo; typedef struct SSysTableScanInfo { diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 2811c8dce8..fd62849e56 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -19,7 +19,7 @@ #include "tdatablock.h" #include "vnode.h" -static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, char* id) { +static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, bool assignUid, char* id) { ASSERT(pOperator != NULL); if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { if (pOperator->numOfDownstream == 0) { @@ -32,11 +32,12 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu return TSDB_CODE_QRY_APP_ERROR; } pOperator->status = OP_NOT_OPENED; - return doSetStreamBlock(pOperator->pDownstream[0], input, numOfBlocks, type, id); + return doSetStreamBlock(pOperator->pDownstream[0], input, numOfBlocks, type, assignUid, id); } else { pOperator->status = OP_NOT_OPENED; SStreamBlockScanInfo* pInfo = pOperator->info; + pInfo->assignBlockUid = assignUid; // the block type can not be changed in the streamscan operators if (pInfo->blockType == 0) { @@ -67,11 +68,11 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu } } -int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type) { - return qSetMultiStreamInput(tinfo, input, 1, type); +int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type, bool assignUid) { + return qSetMultiStreamInput(tinfo, input, 1, type, assignUid); } -int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type) { +int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type, bool assignUid) { if (tinfo == NULL) { return TSDB_CODE_QRY_APP_ERROR; } @@ -82,7 +83,7 @@ int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numO SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; - int32_t code = doSetStreamBlock(pTaskInfo->pRoot, (void**)pBlocks, numOfBlocks, type, GET_TASKID(pTaskInfo)); + int32_t code = doSetStreamBlock(pTaskInfo->pRoot, (void**)pBlocks, numOfBlocks, type, assignUid, GET_TASKID(pTaskInfo)); if (code != TSDB_CODE_SUCCESS) { qError("%s failed to set the stream block data", GET_TASKID(pTaskInfo)); } else { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index ceece4f9b5..63c6f46083 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -878,6 +878,12 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { pInfo->pRes->info.uid = uid; pInfo->pRes->info.type = STREAM_NORMAL; + // for generating rollup SMA result, each time is an independent time serie. + // TODO temporarily used, when the statement of "partition by tbname" is ready, remove this + if (pInfo->assignBlockUid) { + pInfo->pRes->info.groupId = uid; + } + int32_t numOfCols = pInfo->pRes->info.numOfCols; for (int32_t i = 0; i < numOfCols; ++i) { SColMatchInfo* pColMatchInfo = taosArrayGet(pInfo->pColMatchInfo, i); @@ -918,7 +924,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { // record the scan action. pInfo->numOfExec++; - pInfo->numOfRows += pBlockInfo->rows; + pOperator->resultInfo.totalRows += pBlockInfo->rows; if (rows == 0) { pOperator->status = OP_EXEC_DONE; diff --git a/source/libs/stream/src/tstream.c b/source/libs/stream/src/tstream.c index dc0fbf2bbe..7e4f83a693 100644 --- a/source/libs/stream/src/tstream.c +++ b/source/libs/stream/src/tstream.c @@ -141,13 +141,13 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes) SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)data; ASSERT(pSubmit->type == STREAM_INPUT__DATA_SUBMIT); - qSetStreamInput(exec, pSubmit->data, STREAM_DATA_TYPE_SUBMIT_BLOCK); + qSetStreamInput(exec, pSubmit->data, STREAM_DATA_TYPE_SUBMIT_BLOCK, false); } else if (pTask->inputType == STREAM_INPUT__DATA_BLOCK) { SStreamDataBlock* pBlock = (SStreamDataBlock*)data; ASSERT(pBlock->type == STREAM_INPUT__DATA_BLOCK); SArray* blocks = pBlock->blocks; - qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_DATA_TYPE_SSDATA_BLOCK); + qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_DATA_TYPE_SSDATA_BLOCK, false); } // exec