From 8833a8bdfba45277df7debcc80863a39debac230 Mon Sep 17 00:00:00 2001 From: jiajingbin Date: Mon, 25 Jul 2022 21:39:12 +0800 Subject: [PATCH 1/2] fix: fix bug for TD-17801 --- source/libs/executor/src/projectoperator.c | 20 ++++++++++++++++---- source/libs/stream/src/streamExec.c | 6 ++++-- 2 files changed, 20 insertions(+), 6 deletions(-) diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index 34149d7499..0ddbfbd4d7 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -68,9 +68,9 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys pInfo->mergeDataBlocks = pProjPhyNode->mergeDataBlock; // todo remove it soon - if (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM) { - pInfo->mergeDataBlocks = true; - } + // if (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM) { + // pInfo->mergeDataBlocks = true; + // } int32_t numOfRows = 4096; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; @@ -181,6 +181,16 @@ static int32_t doIngroupLimitOffset(SLimitInfo* pLimitInfo, uint64_t groupId, SS return PROJECT_RETRIEVE_DONE; } +void printDataBlock1(SSDataBlock* pBlock, const char* flag) { + if (!pBlock || pBlock->info.rows == 0) { + qDebug("===stream===printDataBlock: Block is Null or Empty"); + return; + } + char* pBuf = NULL; + qDebug("%s", dumpBlockData(pBlock, flag, &pBuf)); + taosMemoryFree(pBuf); +} + SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { SProjectOperatorInfo* pProjectInfo = pOperator->info; SOptrBasicInfo* pInfo = &pProjectInfo->binfo; @@ -229,6 +239,7 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { // for stream interval if (pBlock->info.type == STREAM_RETRIEVE) { + printDataBlock1(pBlock, "project1"); return pBlock; } @@ -302,7 +313,8 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { if (pOperator->cost.openCost == 0) { pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0; } - + + printDataBlock1(p, "project"); return (p->info.rows > 0) ? p : NULL; } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 52b610228e..f782de95b9 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -26,7 +26,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes) } else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) { ASSERT(pTask->isDataScan); SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)data; - qDebug("task %d %p set submit input %p %p %d", pTask->taskId, pTask, pSubmit, pSubmit->data, *pSubmit->dataRef); + qDebug("task %d %p set submit input %p %p %d 1", pTask->taskId, pTask, pSubmit, pSubmit->data, *pSubmit->dataRef); qSetStreamInput(exec, pSubmit->data, STREAM_INPUT__DATA_SUBMIT, false); } else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) { SStreamDataBlock* pBlock = (SStreamDataBlock*)data; @@ -72,6 +72,8 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes) continue; } + qDebug("task %d(child %d) executed and get block"); + SSDataBlock block = {0}; assignOneDataBlock(&block, output); block.info.childId = pTask->selfChildId; @@ -188,7 +190,7 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) { if (pTask->execType == TASK_EXEC__NONE) { ASSERT(((SStreamQueueItem*)data)->type == STREAM_INPUT__DATA_BLOCK); streamTaskOutput(pTask, data); - return pRes; + continue; } qDebug("stream task %d exec begin, msg batch: %d", pTask->taskId, cnt); From 23f0feca92a4554286ef1d85845ba09766c4263e Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Mon, 25 Jul 2022 21:50:51 +0800 Subject: [PATCH 2/2] Update projectoperator.c --- source/libs/executor/src/projectoperator.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index 4b0aa71004..1edc086551 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -190,7 +190,7 @@ void printDataBlock1(SSDataBlock* pBlock, const char* flag) { } char* pBuf = NULL; qDebug("%s", dumpBlockData(pBlock, flag, &pBuf)); - taosMemoryFree(pBuf); + taosMemoryFreeClear(pBuf); } SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { @@ -241,7 +241,7 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { // for stream interval if (pBlock->info.type == STREAM_RETRIEVE) { - printDataBlock1(pBlock, "project1"); + // printDataBlock1(pBlock, "project1"); return pBlock; } @@ -316,7 +316,7 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0; } - printDataBlock1(p, "project"); + // printDataBlock1(p, "project"); return (p->info.rows > 0) ? p : NULL; } @@ -601,4 +601,4 @@ SSDataBlock* doGenerateSourceData(SOperatorInfo* pOperator) { } return (pRes->info.rows > 0) ? pRes : NULL; -} \ No newline at end of file +}