From b3d57e4a2de66806af596bb37a9523d506b58fc1 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 2 Nov 2023 19:12:36 +0800 Subject: [PATCH] fix(stream): not transfer state if state is not appropriate. --- include/util/taoserror.h | 1 + source/dnode/vnode/src/tq/tq.c | 4 ---- source/libs/stream/src/streamExec.c | 16 +++++++++++----- source/util/src/terror.c | 3 ++- 4 files changed, 14 insertions(+), 10 deletions(-) diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 124c8bac4c..8457f26967 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -804,6 +804,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_STREAM_TASK_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x4100) #define TSDB_CODE_STREAM_EXEC_CANCELLED TAOS_DEF_ERROR_CODE(0, 0x4102) #define TSDB_CODE_STREAM_INVALID_STATETRANS TAOS_DEF_ERROR_CODE(0, 0x4103) +#define TSDB_CODE_STREAM_TASK_IVLD_STATUS TAOS_DEF_ERROR_CODE(0, 0x4104) // TDLite #define TSDB_CODE_TDLITE_IVLD_OPEN_FLAGS TAOS_DEF_ERROR_CODE(0, 0x5100) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 13dec9ca54..4802988f18 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1164,10 +1164,6 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { return 0; } - if (pTask->info.fillHistory == 1) { - ASSERT(pTask->status.pauseAllowed == true); - } - streamScanHistoryData(pTask); double el = (taosGetTimestampMs() - pTask->execInfo.step1Start) / 1000.0; diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 7064e48641..49f691c558 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -285,13 +285,14 @@ static void waitForTaskIdle(SStreamTask* pTask, SStreamTask* pStreamTask) { int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { SStreamMeta* pMeta = pTask->pMeta; + const char* id = pTask->id.idStr; SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId); if (pStreamTask == NULL) { stError( "s-task:%s failed to find related stream task:0x%x, it may have been destroyed or closed, destroy the related " "fill-history task", - pTask->id.idStr, (int32_t) pTask->streamTaskId.taskId); + id, (int32_t) pTask->streamTaskId.taskId); // 1. free it and remove fill-history task from disk meta-store streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id); @@ -304,7 +305,7 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { streamMetaWUnLock(pMeta); return TSDB_CODE_STREAM_TASK_NOT_EXIST; } else { - stDebug("s-task:%s fill-history task end, update related stream task:%s info, transfer exec state", pTask->id.idStr, + stDebug("s-task:%s fill-history task end, update related stream task:%s info, transfer exec state", id, pStreamTask->id.idStr); } @@ -318,7 +319,7 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { } else { ASSERT(status == TASK_STATUS__READY|| status == TASK_STATUS__DROPPING || status == TASK_STATUS__STOP); streamTaskHandleEvent(pStreamTask->status.pSM, TASK_EVENT_HALT); - stDebug("s-task:%s halt by related fill-history task:%s", pStreamTask->id.idStr, pTask->id.idStr); + stDebug("s-task:%s halt by related fill-history task:%s", pStreamTask->id.idStr, id); } // wait for the stream task to handle all in the inputQ, and to be idle @@ -328,7 +329,12 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { // In case of source tasks and agg tasks, we should HALT them, and wait for them to be idle. And then, it's safe to // start the task state transfer procedure. char* p = NULL; - streamTaskGetStatus(pStreamTask, &p); + status = streamTaskGetStatus(pStreamTask, &p); + if (status == TASK_STATUS__STOP || status == TASK_STATUS__DROPPING) { + stError("s-task:%s failed to transfer state from fill-history task:%s, status:%s", id, pStreamTask->id.idStr, p); + streamMetaReleaseTask(pMeta, pStreamTask); + return TSDB_CODE_STREAM_TASK_IVLD_STATUS; + } if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) { // update the scan data range for source task. @@ -351,7 +357,7 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { // 3. resume the state of stream task, after this function, the stream task will run immediately. streamTaskResume(pStreamTask); - stDebug("s-task:%s fill-history task set status to be dropping, save the state into disk", pTask->id.idStr); + stDebug("s-task:%s fill-history task set status to be dropping, save the state into disk", id); // 4. free it and remove fill-history task from disk meta-store streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id); diff --git a/source/util/src/terror.c b/source/util/src/terror.c index b5a4fc4008..92ab56b71b 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -665,7 +665,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_SAME_COMMITTED_VALUE, "Same committed valu // stream TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_NOT_EXIST, "Stream task not exist") TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_EXEC_CANCELLED, "Stream task exec cancelled") -TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_INVALID_STATETRANS, "Invalid task state transfer") +TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_INVALID_STATETRANS, "Invalid task state to handle event") +TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_IVLD_STATUS,"Invalid task status to proceed") // TDLite TAOS_DEFINE_ERROR(TSDB_CODE_TDLITE_IVLD_OPEN_FLAGS, "Invalid TDLite open flags")