diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index e4dc0b5854..5300792338 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -98,7 +98,7 @@ static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray* static int32_t doAppendPullOverBlock(SStreamTask* pTask, int32_t* pNumOfBlocks, SStreamDataBlock* pRetrieveBlock, SArray* pRes) { - SSDataBlock block = {.info.type = STREAM_PULL_OVER, .info.childId = pTask->info.selfChildId}; + SSDataBlock block = {0}; int32_t num = taosArrayGetSize(pRetrieveBlock->blocks); if (num != 1) { stError("s-task:%s invalid retrieve block number:%d, ignore", pTask->id.idStr, num); @@ -112,6 +112,9 @@ static int32_t doAppendPullOverBlock(SStreamTask* pTask, int32_t* pNumOfBlocks, return code; } + block.info.type = STREAM_PULL_OVER; + block.info.childId = pTask->info.selfChildId; + p = taosArrayPush(pRes, &block); if (p != NULL) { (*pNumOfBlocks) += 1; @@ -171,6 +174,33 @@ int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); return code; } +// SSDataBlock block = {0}; +// const SStreamDataBlock* pRetrieveBlock = (const SStreamDataBlock*)pItem; +// +// int32_t num = taosArrayGetSize(pRetrieveBlock->blocks); +// if (num != 1) { +// stError("s-task:%s invalid retrieve block number:%d, ignore", pTask->id.idStr, num); +// continue; +// } +// +// code = assignOneDataBlock(&block, taosArrayGet(pRetrieveBlock->blocks, 0)); +// if (code) { +// stError("s-task:%s failed to copy datablock, code:%s", pTask->id.idStr, tstrerror(code)); +// continue; +// } +// +// block.info.type = STREAM_PULL_OVER; +// block.info.childId = pTask->info.selfChildId; +// +// void* p = taosArrayPush(pRes, &block); +// if (p != NULL) { +// numOfBlocks += 1; +// } else { +// stError("s-task:%s failed to add retrieve block", pTask->id.idStr); +// } +// +// stDebug("s-task:%s(child %d) retrieve process completed,QID:0x%" PRIx64 " dump results", pTask->id.idStr, +// pTask->info.selfChildId, pRetrieveBlock->reqId); } break; @@ -185,13 +215,15 @@ int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* continue; // checkpoint block not dispatch to downstream tasks } - SSDataBlock block = {.info.childId = pTask->info.selfChildId}; + SSDataBlock block = {0}; code = assignOneDataBlock(&block, output); if (code) { stError("s-task:%s failed to build result block due to out of memory", pTask->id.idStr); continue; } + block.info.childId = pTask->info.selfChildId; + size += blockDataGetSize(output) + sizeof(SSDataBlock) + sizeof(SColumnInfoData) * blockDataGetNumOfCols(&block); numOfBlocks += 1;