From 3b8c85f632ae25ca1529a9afbff95d3c952260d2 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 20 Oct 2023 14:19:57 +0800 Subject: [PATCH] fix(stream): fix bugs caused by refactor sm. --- include/libs/stream/tstream.h | 4 +- source/dnode/vnode/src/tq/tq.c | 6 +-- source/libs/stream/src/streamExec.c | 8 +++- source/libs/stream/src/streamStart.c | 57 +++++++--------------------- 4 files changed, 26 insertions(+), 49 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 6c2cec6292..0ca401e3a4 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -717,6 +717,7 @@ int32_t streamSchedExec(SStreamTask* pTask); bool streamTaskShouldStop(const SStreamTask* pStatus); bool streamTaskShouldPause(const SStreamTask* pStatus); bool streamTaskIsIdle(const SStreamTask* pTask); +bool streamTaskReadyToRun(const SStreamTask* pTask, char** pStatus); char* createStreamTaskIdStr(int64_t streamId, int32_t taskId); ETaskStatus streamTaskGetStatus(const SStreamTask* pTask, char** pStr); @@ -758,8 +759,7 @@ int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue); // common int32_t streamRestoreParam(SStreamTask* pTask); void streamTaskPause(SStreamTask* pTask, SStreamMeta* pMeta); -void streamTaskResume(SStreamTask* pTask, SStreamMeta* pMeta); -void streamTaskResumeFromHalt(SStreamTask* pTask); +void streamTaskResume(SStreamTask* pTask); void streamTaskDisablePause(SStreamTask* pTask); void streamTaskEnablePause(SStreamTask* pTask); int32_t streamTaskSetUpstreamInfo(SStreamTask* pTask, const SStreamTask* pUpstreamTask); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index e99503df33..f9abb5f2c9 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1365,8 +1365,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->streamId, taskId); if (pTask != NULL) { // even in halt status, the data in inputQ must be processed char* p = NULL; - ETaskStatus st = streamTaskGetStatus(pTask, &p); - if (st == TASK_STATUS__READY || st == TASK_STATUS__SCAN_HISTORY || st == TASK_STATUS__CK) { + if (streamTaskReadyToRun(pTask, &p)) { tqDebug("vgId:%d s-task:%s start to process block from inputQ, next checked ver:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.nextProcessVer); streamExecTask(pTask); @@ -1515,7 +1514,7 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion, return -1; } - streamTaskResume(pTask, pTq->pStreamMeta); + streamTaskResume(pTask); int32_t level = pTask->info.taskLevel; if (level == TASK_LEVEL__SINK) { @@ -1874,6 +1873,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { if (ppHTask == NULL || *ppHTask == NULL) { tqError("vgId:%d failed to acquire fill-history task:0x%x when handling update, it may have been dropped already", pMeta->vgId, req.taskId); + CLEAR_RELATED_FILLHISTORY_TASK(pTask); } else { tqDebug("s-task:%s fill-history task update nodeEp along with stream task", (*ppHTask)->id.idStr); streamTaskUpdateEpsetInfo(*ppHTask, req.pNodeList); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 2b1ea7c911..2d5788fc4d 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -352,7 +352,7 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { // 3. resume the state of stream task, after this function, the stream task will run immidately. But it can not be // pause, since the pause allowed attribute is not set yet. - streamTaskResumeFromHalt(pStreamTask); // todo refactor: use streamTaskResume. + streamTaskResume(pStreamTask); // todo refactor: use streamTaskResume. stDebug("s-task:%s fill-history task set status to be dropping, save the state into disk", pTask->id.idStr); @@ -610,6 +610,12 @@ bool streamTaskIsIdle(const SStreamTask* pTask) { status == TASK_STATUS__DROPPING); } +bool streamTaskReadyToRun(const SStreamTask* pTask, char** pStatus) { + ETaskStatus st = streamTaskGetStatus(pTask, NULL); + return (st == TASK_STATUS__READY || st == TASK_STATUS__SCAN_HISTORY || st == TASK_STATUS__STREAM_SCAN_HISTORY || + st == TASK_STATUS__CK); +} + int32_t streamExecTask(SStreamTask* pTask) { // this function may be executed by multi-threads, so status check is required. const char* id = pTask->id.idStr; diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index 66865c8e25..b52d9177b7 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -1021,36 +1021,25 @@ void streamTaskPause(SStreamTask* pTask, SStreamMeta* pMeta) { stDebug("vgId:%d s-task:%s set pause flag and pause task", pMeta->vgId, pTask->id.idStr); } -void streamTaskResume(SStreamTask* pTask, SStreamMeta* pMeta) { - char* p = NULL; - ETaskStatus status = streamTaskGetStatus(pTask, &p); +void streamTaskResume(SStreamTask* pTask) { + char* p = NULL; + ETaskStatus status = streamTaskGetStatus(pTask, &p); + SStreamMeta* pMeta = pTask->pMeta; - if (status == TASK_STATUS__PAUSE) { + if (status == TASK_STATUS__PAUSE || status == TASK_STATUS__HALT) { streamTaskRestoreStatus(pTask); - streamTaskGetStatus(pTask, &p); - int32_t num = atomic_sub_fetch_32(&pMeta->numOfPausedTasks, 1); - stInfo("vgId:%d s-task:%s sink task.resume from pause, status:%s. pause task num:%d", pMeta->vgId, pTask->id.idStr, - p, num); + char* pNew = NULL; + streamTaskGetStatus(pTask, &pNew); + if (status == TASK_STATUS__PAUSE) { + int32_t num = atomic_sub_fetch_32(&pMeta->numOfPausedTasks, 1); + stInfo("s-task:%s status:%s resume from %s, paused task(s):%d", pTask->id.idStr, pNew, p, num); + } else { + stInfo("s-task:%s status:%s resume from %s", pTask->id.idStr, pNew, p); + } } else { - stDebug("s-task:%s status:%s not in pause status, no need to resume", pTask->id.idStr, p); + stDebug("s-task:%s status:%s not in pause/halt status, no need to resume", pTask->id.idStr, p); } - -#if 0 - int8_t status = pTask->status.taskStatus; - if (status == TASK_STATUS__PAUSE) { - pTask->status.taskStatus = pTask->status.keepTaskStatus; - pTask->status.keepTaskStatus = TASK_STATUS__READY; - int32_t num = atomic_sub_fetch_32(&pMeta->numOfPausedTasks, 1); - stInfo("vgId:%d s-task:%s resume from pause, status:%s. pause task num:%d", pMeta->vgId, pTask->id.idStr, streamGetTaskStatusStr(status), num); - } else if (pTask->info.taskLevel == TASK_LEVEL__SINK) { - int32_t num = atomic_sub_fetch_32(&pMeta->numOfPausedTasks, 1); - stInfo("vgId:%d s-task:%s sink task.resume from pause, status:%s. pause task num:%d", pMeta->vgId, pTask->id.idStr, streamGetTaskStatusStr(status), num); - } else { - stError("s-task:%s not in pause, failed to resume, status:%s", pTask->id.idStr, streamGetTaskStatusStr(status)); - } -#endif - } // todo fix race condition @@ -1071,24 +1060,6 @@ void streamTaskEnablePause(SStreamTask* pTask) { pTask->status.pauseAllowed = 1; } -void streamTaskResumeFromHalt(SStreamTask* pTask) { - const char* id = pTask->id.idStr; - char* p = NULL; - - ASSERT(streamTaskGetStatus(pTask, NULL) == TASK_STATUS__HALT); -// int8_t status = pTask->status.taskStatus; -// if (status != TASK_STATUS__HALT) { -// stError("s-task:%s not in halt status, status:%s", id, streamGetTaskStatusStr(status)); -// return; -// } - -// pTask->status.taskStatus = pTask->status.keepTaskStatus; -// pTask->status.keepTaskStatus = TASK_STATUS__READY; - streamTaskRestoreStatus(pTask); - streamTaskGetStatus(pTask, &p); - stDebug("s-task:%s resume from halt, current status:%s", id, p); -} - int32_t updateTaskReadyInMeta(SStreamTask* pTask) { SStreamMeta* pMeta = pTask->pMeta;