fix(stream): set the correct res block info.

This commit is contained in:
Haojun Liao 2024-09-27 19:22:53 +08:00
parent f6f979e6ea
commit 75a6645925
1 changed files with 34 additions and 2 deletions

View File

@ -98,7 +98,7 @@ static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray*
static int32_t doAppendPullOverBlock(SStreamTask* pTask, int32_t* pNumOfBlocks, SStreamDataBlock* pRetrieveBlock, static int32_t doAppendPullOverBlock(SStreamTask* pTask, int32_t* pNumOfBlocks, SStreamDataBlock* pRetrieveBlock,
SArray* pRes) { SArray* pRes) {
SSDataBlock block = {.info.type = STREAM_PULL_OVER, .info.childId = pTask->info.selfChildId}; SSDataBlock block = {0};
int32_t num = taosArrayGetSize(pRetrieveBlock->blocks); int32_t num = taosArrayGetSize(pRetrieveBlock->blocks);
if (num != 1) { if (num != 1) {
stError("s-task:%s invalid retrieve block number:%d, ignore", pTask->id.idStr, num); stError("s-task:%s invalid retrieve block number:%d, ignore", pTask->id.idStr, num);
@ -112,6 +112,9 @@ static int32_t doAppendPullOverBlock(SStreamTask* pTask, int32_t* pNumOfBlocks,
return code; return code;
} }
block.info.type = STREAM_PULL_OVER;
block.info.childId = pTask->info.selfChildId;
p = taosArrayPush(pRes, &block); p = taosArrayPush(pRes, &block);
if (p != NULL) { if (p != NULL) {
(*pNumOfBlocks) += 1; (*pNumOfBlocks) += 1;
@ -171,6 +174,33 @@ int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t*
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
return code; return code;
} }
// SSDataBlock block = {0};
// const SStreamDataBlock* pRetrieveBlock = (const SStreamDataBlock*)pItem;
//
// int32_t num = taosArrayGetSize(pRetrieveBlock->blocks);
// if (num != 1) {
// stError("s-task:%s invalid retrieve block number:%d, ignore", pTask->id.idStr, num);
// continue;
// }
//
// code = assignOneDataBlock(&block, taosArrayGet(pRetrieveBlock->blocks, 0));
// if (code) {
// stError("s-task:%s failed to copy datablock, code:%s", pTask->id.idStr, tstrerror(code));
// continue;
// }
//
// block.info.type = STREAM_PULL_OVER;
// block.info.childId = pTask->info.selfChildId;
//
// void* p = taosArrayPush(pRes, &block);
// if (p != NULL) {
// numOfBlocks += 1;
// } else {
// stError("s-task:%s failed to add retrieve block", pTask->id.idStr);
// }
//
// stDebug("s-task:%s(child %d) retrieve process completed,QID:0x%" PRIx64 " dump results", pTask->id.idStr,
// pTask->info.selfChildId, pRetrieveBlock->reqId);
} }
break; break;
@ -185,13 +215,15 @@ int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t*
continue; // checkpoint block not dispatch to downstream tasks continue; // checkpoint block not dispatch to downstream tasks
} }
SSDataBlock block = {.info.childId = pTask->info.selfChildId}; SSDataBlock block = {0};
code = assignOneDataBlock(&block, output); code = assignOneDataBlock(&block, output);
if (code) { if (code) {
stError("s-task:%s failed to build result block due to out of memory", pTask->id.idStr); stError("s-task:%s failed to build result block due to out of memory", pTask->id.idStr);
continue; continue;
} }
block.info.childId = pTask->info.selfChildId;
size += blockDataGetSize(output) + sizeof(SSDataBlock) + sizeof(SColumnInfoData) * blockDataGetNumOfCols(&block); size += blockDataGetSize(output) + sizeof(SSDataBlock) + sizeof(SColumnInfoData) * blockDataGetNumOfCols(&block);
numOfBlocks += 1; numOfBlocks += 1;