From 8833a8bdfba45277df7debcc80863a39debac230 Mon Sep 17 00:00:00 2001 From: jiajingbin Date: Mon, 25 Jul 2022 21:39:12 +0800 Subject: [PATCH] fix: fix bug for TD-17801 --- source/libs/executor/src/projectoperator.c | 20 ++++++++++++++++---- source/libs/stream/src/streamExec.c | 6 ++++-- 2 files changed, 20 insertions(+), 6 deletions(-) diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index 34149d7499..0ddbfbd4d7 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -68,9 +68,9 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys pInfo->mergeDataBlocks = pProjPhyNode->mergeDataBlock; // todo remove it soon - if (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM) { - pInfo->mergeDataBlocks = true; - } + // if (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM) { + // pInfo->mergeDataBlocks = true; + // } int32_t numOfRows = 4096; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; @@ -181,6 +181,16 @@ static int32_t doIngroupLimitOffset(SLimitInfo* pLimitInfo, uint64_t groupId, SS return PROJECT_RETRIEVE_DONE; } +void printDataBlock1(SSDataBlock* pBlock, const char* flag) { + if (!pBlock || pBlock->info.rows == 0) { + qDebug("===stream===printDataBlock: Block is Null or Empty"); + return; + } + char* pBuf = NULL; + qDebug("%s", dumpBlockData(pBlock, flag, &pBuf)); + taosMemoryFree(pBuf); +} + SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { SProjectOperatorInfo* pProjectInfo = pOperator->info; SOptrBasicInfo* pInfo = &pProjectInfo->binfo; @@ -229,6 +239,7 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { // for stream interval if (pBlock->info.type == STREAM_RETRIEVE) { + printDataBlock1(pBlock, "project1"); return pBlock; } @@ -302,7 +313,8 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { if (pOperator->cost.openCost == 0) { pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0; } - + + printDataBlock1(p, "project"); return (p->info.rows > 0) ? p : NULL; } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 52b610228e..f782de95b9 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -26,7 +26,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes) } else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) { ASSERT(pTask->isDataScan); SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)data; - qDebug("task %d %p set submit input %p %p %d", pTask->taskId, pTask, pSubmit, pSubmit->data, *pSubmit->dataRef); + qDebug("task %d %p set submit input %p %p %d 1", pTask->taskId, pTask, pSubmit, pSubmit->data, *pSubmit->dataRef); qSetStreamInput(exec, pSubmit->data, STREAM_INPUT__DATA_SUBMIT, false); } else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) { SStreamDataBlock* pBlock = (SStreamDataBlock*)data; @@ -72,6 +72,8 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes) continue; } + qDebug("task %d(child %d) executed and get block"); + SSDataBlock block = {0}; assignOneDataBlock(&block, output); block.info.childId = pTask->selfChildId; @@ -188,7 +190,7 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) { if (pTask->execType == TASK_EXEC__NONE) { ASSERT(((SStreamQueueItem*)data)->type == STREAM_INPUT__DATA_BLOCK); streamTaskOutput(pTask, data); - return pRes; + continue; } qDebug("stream task %d exec begin, msg batch: %d", pTask->taskId, cnt);