diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 09143dde29..ed56b7e6b2 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -166,6 +166,8 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bo int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pBlock, uint64_t* useconds); +void qResetTaskCode(qTaskInfo_t tinfo); + void qCleanExecTaskBlockBuf(qTaskInfo_t tinfo); /** diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 62932513f4..9e33a3d890 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -1633,3 +1633,11 @@ int32_t qStreamOperatorReloadState(qTaskInfo_t tInfo) { pTaskInfo->pRoot->fpSet.reloadStreamStateFn(pTaskInfo->pRoot); return 0; } + +void qResetTaskCode(qTaskInfo_t tinfo) { + SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; + + int32_t code = pTaskInfo->code; + pTaskInfo->code = 0; + qDebug("0x%" PRIx64 " reset task code to be success, prev:%s", pTaskInfo->id.taskId, tstrerror(code)); +} diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 5bb9c993de..bf4567a273 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -244,9 +244,10 @@ static void streamScanHistoryDataImpl(SStreamTask* pTask, SArray* pRes, int32_t* SSDataBlock* output = NULL; uint64_t ts = 0; code = qExecTask(exec, &output, &ts); - if (code != TSDB_CODE_TSC_QUERY_KILLED && code != TSDB_CODE_SUCCESS) { + if (code != TSDB_CODE_TSC_QUERY_KILLED && code != TSDB_CODE_SUCCESS) { // if out of memory occurs, quit stError("s-task:%s scan-history data error occurred code:%s, continue scan-history", pTask->id.idStr, tstrerror(code)); + qResetTaskCode(exec); continue; }