diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index e71a6c4dce..b77c8535f1 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -673,7 +673,7 @@ int8_t streamTaskSetSchedStatusInactive(SStreamTask* pTask); int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, int32_t clearRelHalt); int32_t streamExecTask(SStreamTask* pTask); -void streamResumeTask(SStreamTask* pTask); +int32_t streamResumeTask(SStreamTask* pTask); int32_t streamTrySchedExec(SStreamTask* pTask); int32_t streamTaskSchedTask(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int32_t taskId, int32_t execType); void streamTaskResumeInFuture(SStreamTask* pTask); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index f3279a0f01..4fc00a6f59 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -24,7 +24,7 @@ #define FILL_HISTORY_TASK_EXEC_INTERVAL 5000 // 5 sec static int32_t streamTransferStateDoPrepare(SStreamTask* pTask); -static void streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize, int32_t* totalBlocks); +static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize, int32_t* totalBlocks); bool streamTaskShouldStop(const SStreamTask* pTask) { SStreamTaskState pState = streamTaskGetStatus(pTask); @@ -95,7 +95,7 @@ static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray* return code; } -void streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize, int32_t* totalBlocks) { +int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize, int32_t* totalBlocks) { int32_t code = TSDB_CODE_SUCCESS; void* pExecutor = pTask->exec.pExecutor; int32_t size = 0; @@ -112,7 +112,7 @@ void streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* to if (streamTaskShouldStop(pTask) || (pRes == NULL)) { taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); - return; + return code; } SSDataBlock* output = NULL; @@ -122,8 +122,13 @@ void streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* to resetTaskInfo(pExecutor); } - stError("unexpected stream execution, s-task:%s since %s", pTask->id.idStr, tstrerror(code)); - continue; + if (code == TSDB_CODE_OUT_OF_MEMORY || code == TSDB_CODE_INVALID_PARA || code == TSDB_CODE_FILE_CORRUPTED) { + stFatal("s-task:%s failed to continue execute since %s", pTask->id.idStr, tstrerror(code)); + return code; + } else { + qResetTaskCode(pExecutor); + continue; + } } if (output == NULL) { @@ -194,7 +199,7 @@ void streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* to code = doDumpResult(pTask, pItem, pRes, size, totalSize, totalBlocks); // todo: here we need continue retry to put it into output buffer if (code != TSDB_CODE_SUCCESS) { - return; + return code; } pRes = NULL; @@ -208,6 +213,8 @@ void streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* to } else { taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); } + + return code; } // todo contiuous try to create result blocks @@ -627,7 +634,7 @@ static void doRecordThroughput(STaskExecStatisInfo* pInfo, int64_t totalBlocks, } } -static void doStreamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pBlock, int32_t num) { +static int32_t doStreamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pBlock, int32_t num) { const char* id = pTask->id.idStr; int32_t blockSize = 0; int64_t st = taosGetTimestampMs(); @@ -635,23 +642,28 @@ static void doStreamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pBlock, i int64_t ver = pInfo->processedVer; int64_t totalSize = 0; int32_t totalBlocks = 0; + int32_t code = 0; stDebug("s-task:%s start to process batch blocks, num:%d, type:%s", id, num, streamQueueItemGetTypeStr(pBlock->type)); - int32_t code = doSetStreamInputBlock(pTask, pBlock, &ver, id); + code = doSetStreamInputBlock(pTask, pBlock, &ver, id); if (code) { stError("s-task:%s failed to set input block, not exec for these blocks", id); - return; + return code; + } + + code = streamTaskExecImpl(pTask, pBlock, &totalSize, &totalBlocks); + if (code) { + return code; } - streamTaskExecImpl(pTask, pBlock, &totalSize, &totalBlocks); doRecordThroughput(&pTask->execInfo, totalBlocks, totalSize, blockSize, st, pTask->id.idStr); // update the currentVer if processing the submit blocks. if (!(pInfo->checkpointVer <= pInfo->nextProcessVer && ver >= pInfo->checkpointVer)) { stError("s-task:%s invalid info, checkpointVer:%" PRId64 ", nextProcessVer:%" PRId64 " currentVer:%" PRId64, id, pInfo->checkpointVer, pInfo->nextProcessVer, ver); - return; + return code; } if (ver != pInfo->processedVer) { @@ -660,6 +672,8 @@ static void doStreamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pBlock, i id, pInfo->processedVer, ver, pInfo->nextProcessVer, pInfo->checkpointVer); pInfo->processedVer = ver; } + + return code; } void flushStateDataInExecutor(SStreamTask* pTask, SStreamQueueItem* pCheckpointBlock) { @@ -712,6 +726,7 @@ void flushStateDataInExecutor(SStreamTask* pTask, SStreamQueueItem* pCheckpointB */ static int32_t doStreamExecTask(SStreamTask* pTask) { const char* id = pTask->id.idStr; + int32_t code = 0; // merge multiple input data if possible in the input queue. stDebug("s-task:%s start to extract data block from inputQ", id); @@ -784,9 +799,9 @@ static int32_t doStreamExecTask(SStreamTask* pTask) { if (type == STREAM_INPUT__DATA_BLOCK) { pTask->execInfo.sink.dataSize += blockSize; stDebug("s-task:%s sink task start to sink %d blocks, size:%.2fKiB", id, numOfBlocks, SIZE_IN_KiB(blockSize)); - int32_t code = doOutputResultBlockImpl(pTask, (SStreamDataBlock*)pInput); + code = doOutputResultBlockImpl(pTask, (SStreamDataBlock*)pInput); if (code != TSDB_CODE_SUCCESS) { - // todo handle error. + return code; } double el = (taosGetTimestampMs() - st) / 1000.0; @@ -801,17 +816,19 @@ static int32_t doStreamExecTask(SStreamTask* pTask) { } if (type != STREAM_INPUT__CHECKPOINT) { - doStreamTaskExecImpl(pTask, pInput, numOfBlocks); + code = doStreamTaskExecImpl(pTask, pInput, numOfBlocks); streamFreeQitem(pInput); + if (code) { + return code; + } } else { // todo other thread may change the status // do nothing after sync executor state to storage backend, untill the vnode-level checkpoint is completed. streamMutexLock(&pTask->lock); SStreamTaskState pState = streamTaskGetStatus(pTask); if (pState.state == TASK_STATUS__CK) { stDebug("s-task:%s checkpoint block received, set status:%s", id, pState.name); - int32_t code = streamTaskBuildCheckpoint(pTask); // ignore this error msg, and continue + code = streamTaskBuildCheckpoint(pTask); // ignore this error msg, and continue } else { // todo refactor - int32_t code = 0; if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { code = streamTaskSendCheckpointSourceRsp(pTask); } else { @@ -827,7 +844,7 @@ static int32_t doStreamExecTask(SStreamTask* pTask) { streamMutexUnlock(&pTask->lock); streamFreeQitem(pInput); - return 0; + return code; } } } @@ -858,21 +875,21 @@ bool streamTaskReadyToRun(const SStreamTask* pTask, char** pStatus) { } } -void streamResumeTask(SStreamTask* pTask) { +int32_t streamResumeTask(SStreamTask* pTask) { const char* id = pTask->id.idStr; int32_t code = 0; if (pTask->status.schedStatus != TASK_SCHED_STATUS__ACTIVE) { - stError("s-task:%s invalid sched status:%d, not resume task", id, pTask->status.schedStatus); - return; + stError("s-task:%s invalid sched status:%d, not resume task", pTask->id.idStr, pTask->status.schedStatus); + return code; } while (1) { code = doStreamExecTask(pTask); if (code) { stError("s-task:%s failed to exec stream task, code:%s", id, tstrerror(code)); + return code; } - // check if continue streamMutexLock(&pTask->lock); @@ -888,7 +905,7 @@ void streamResumeTask(SStreamTask* pTask) { stDebug("s-task:%s exec completed, status:%s, sched-status:%d, lastExecTs:%" PRId64, id, p, pTask->status.schedStatus, pTask->status.lastExecTs); - return; + return code; } else { // check if this task needs to be idle for a while if (pTask->status.schedIdleTime > 0) { @@ -896,28 +913,31 @@ void streamResumeTask(SStreamTask* pTask) { streamMutexUnlock(&pTask->lock); setLastExecTs(pTask, taosGetTimestampMs()); - return; + return code; } } streamMutexUnlock(&pTask->lock); } + + return code; } int32_t streamExecTask(SStreamTask* pTask) { // this function may be executed by multi-threads, so status check is required. const char* id = pTask->id.idStr; + int32_t code = 0; int8_t schedStatus = streamTaskSetSchedStatusActive(pTask); if (schedStatus == TASK_SCHED_STATUS__WAITING) { - streamResumeTask(pTask); + code = streamResumeTask(pTask); } else { char* p = streamTaskGetStatus(pTask).name; stDebug("s-task:%s already started to exec by other thread, status:%s, sched-status:%d", id, p, pTask->status.schedStatus); } - return 0; + return code; } int32_t streamTaskReleaseState(SStreamTask* pTask) {