diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 88e40b247b..0eb87df9b0 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -24,7 +24,8 @@ #define FILL_HISTORY_TASK_EXEC_INTERVAL 5000 // 5 sec static int32_t streamTransferStateDoPrepare(SStreamTask* pTask); -static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize, int32_t* totalBlocks); +static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize, + int32_t* totalBlocks); bool streamTaskShouldStop(const SStreamTask* pTask) { SStreamTaskState pState = streamTaskGetStatus(pTask); @@ -95,17 +96,53 @@ static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray* return code; } +static int32_t doAppendPullOverBlock(SStreamTask* pTask, int32_t* pNumOfBlocks, SStreamDataBlock* pRetrieveBlock, + SArray* pRes) { + SSDataBlock block = {0}; + int32_t num = taosArrayGetSize(pRetrieveBlock->blocks); + if (num != 1) { + stError("s-task:%s invalid retrieve block number:%d, ignore", pTask->id.idStr, num); + return TSDB_CODE_INVALID_PARA; + } + + void* p = taosArrayGet(pRetrieveBlock->blocks, 0); + int32_t code = assignOneDataBlock(&block, p); + if (code) { + stError("s-task:%s failed to assign retrieve block, code:%s", pTask->id.idStr, tstrerror(code)); + return code; + } + + block.info.type = STREAM_PULL_OVER; + block.info.childId = pTask->info.selfChildId; + + p = taosArrayPush(pRes, &block); + if (p != NULL) { + (*pNumOfBlocks) += 1; + stDebug("s-task:%s(child %d) retrieve res from upstream completed, QID:0x%" PRIx64, pTask->id.idStr, + pTask->info.selfChildId, pRetrieveBlock->reqId); + } else { + code = terrno; + stError("s-task:%s failed to append pull over block for retrieve data, QID:0x%" PRIx64" code:%s", pTask->id.idStr, + pRetrieveBlock->reqId, tstrerror(code)); + } + + return code; +} + int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize, int32_t* totalBlocks) { - int32_t code = TSDB_CODE_SUCCESS; - void* pExecutor = pTask->exec.pExecutor; int32_t size = 0; int32_t numOfBlocks = 0; + int32_t code = TSDB_CODE_SUCCESS; + void* pExecutor = pTask->exec.pExecutor; SArray* pRes = NULL; *totalBlocks = 0; *totalSize = 0; while (1) { + SSDataBlock* output = NULL; + uint64_t ts = 0; + if (pRes == NULL) { pRes = taosArrayInit(4, sizeof(SSDataBlock)); } @@ -115,8 +152,6 @@ int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* return code; } - SSDataBlock* output = NULL; - uint64_t ts = 0; if ((code = qExecTask(pExecutor, &output, &ts)) < 0) { if (code == TSDB_CODE_QRY_IN_EXEC) { resetTaskInfo(pExecutor); @@ -124,6 +159,7 @@ int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* if (code == TSDB_CODE_OUT_OF_MEMORY || code == TSDB_CODE_INVALID_PARA || code == TSDB_CODE_FILE_CORRUPTED) { stFatal("s-task:%s failed to continue execute since %s", pTask->id.idStr, tstrerror(code)); + taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); return code; } else { qResetTaskCode(pExecutor); @@ -133,33 +169,11 @@ int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* if (output == NULL) { if (pItem->type == STREAM_INPUT__DATA_RETRIEVE) { - SSDataBlock block = {0}; - const SStreamDataBlock* pRetrieveBlock = (const SStreamDataBlock*)pItem; - - int32_t num = taosArrayGetSize(pRetrieveBlock->blocks); - if (num != 1) { - stError("s-task:%s invalid retrieve block number:%d, ignore", pTask->id.idStr, num); - continue; - } - - code = assignOneDataBlock(&block, taosArrayGet(pRetrieveBlock->blocks, 0)); - if (code) { - stError("s-task:%s failed to copy datablock, code:%s", pTask->id.idStr, tstrerror(code)); - continue; - } - - block.info.type = STREAM_PULL_OVER; - block.info.childId = pTask->info.selfChildId; - - void* p = taosArrayPush(pRes, &block); - if (p != NULL) { - numOfBlocks += 1; - } else { - stError("s-task:%s failed to add retrieve block", pTask->id.idStr); - } - - stDebug("s-task:%s(child %d) retrieve process completed,QID:0x%" PRIx64 " dump results", pTask->id.idStr, - pTask->info.selfChildId, pRetrieveBlock->reqId); + code = doAppendPullOverBlock(pTask, &numOfBlocks, (SStreamDataBlock*) pItem, pRes); + if (code) { + taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); + return code; + } } break; @@ -189,11 +203,11 @@ int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* void* p = taosArrayPush(pRes, &block); if (p == NULL) { stError("s-task:%s failed to add computing results, the final res may be incorrect", pTask->id.idStr); + } else { + stDebug("s-task:%s (child %d) executed and get %d result blocks, size:%.2fMiB", pTask->id.idStr, + pTask->info.selfChildId, numOfBlocks, SIZE_IN_MiB(size)); } - stDebug("s-task:%s (child %d) executed and get %d result blocks, size:%.2fMiB", pTask->id.idStr, - pTask->info.selfChildId, numOfBlocks, SIZE_IN_MiB(size)); - // current output should be dispatched to down stream nodes if (numOfBlocks >= STREAM_RESULT_DUMP_THRESHOLD || size >= STREAM_RESULT_DUMP_SIZE_THRESHOLD) { code = doDumpResult(pTask, pItem, pRes, size, totalSize, totalBlocks); @@ -303,7 +317,7 @@ SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st) { bool finished = false; const char* id = pTask->id.idStr; - if(pTask->info.taskLevel != TASK_LEVEL__SOURCE) { + if (pTask->info.taskLevel != TASK_LEVEL__SOURCE) { stError("s-task:%s not source scan-history task, not exec, quit", pTask->id.idStr); return buildScanhistoryExecRet(TASK_SCANHISTORY_QUIT, 0); } @@ -408,7 +422,7 @@ int32_t streamTransferStateDoPrepare(SStreamTask* pTask) { } } else { if (!(status == TASK_STATUS__READY || status == TASK_STATUS__PAUSE || status == TASK_STATUS__DROPPING || - status == TASK_STATUS__STOP)) { + status == TASK_STATUS__STOP)) { stError("s-task:%s invalid task status:%d", id, status); return TSDB_CODE_STREAM_INTERNAL_ERROR; } @@ -718,7 +732,7 @@ int32_t flushStateDataInExecutor(SStreamTask* pTask, SStreamQueueItem* pCheckpoi // 2. flush data in executor to K/V store, which should be completed before do checkpoint in the K/V. int32_t code = doStreamTaskExecImpl(pTask, pCheckpointBlock, 1); - if(code) { + if (code) { stError("s-task:%s failed to exec stream task before checkpoint, code:%s", id, tstrerror(code)); } @@ -833,7 +847,7 @@ static int32_t doStreamExecTask(SStreamTask* pTask) { if (pState.state == TASK_STATUS__CK) { stDebug("s-task:%s checkpoint block received, set status:%s", id, pState.name); code = streamTaskBuildCheckpoint(pTask); // ignore this error msg, and continue - } else { // todo refactor + } else { // todo refactor if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { code = streamTaskSendCheckpointSourceRsp(pTask); } else {