From e7105edaa44d08cac986ea3dc7427dbcb2ab6aea Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 21 Jun 2024 14:14:44 +0800 Subject: [PATCH] fix(stream): disable pause if task is un-init. --- source/dnode/mnode/impl/src/mndStream.c | 44 ++++++++++++++++++++- source/dnode/vnode/src/tqCommon/tqCommon.c | 38 +++++++++++++----- source/libs/stream/src/streamCheckStatus.c | 5 +-- source/libs/stream/src/streamStartHistory.c | 1 - source/libs/stream/src/streamTask.c | 2 +- source/libs/stream/src/streamTaskSm.c | 4 +- 6 files changed, 76 insertions(+), 18 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index c9598c4b38..a0fb8ae40a 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1075,7 +1075,7 @@ static bool taskNodeIsUpdated(SMnode *pMnode) { mWarn("not all vnodes ready, quit from vnodes status check"); taosArrayDestroy(pNodeSnapshot); taosThreadMutexUnlock(&execInfo.lock); - return 0; + return true; } SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, pNodeSnapshot); @@ -1911,9 +1911,51 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) { bool updated = taskNodeIsUpdated(pMnode); if (updated) { mError("tasks are not ready for pause, node update detected"); + sdbRelease(pMnode->pSdb, pStream); return -1; } + + { // check for tasks, if tasks are not ready, not allowed to pause + bool found = false; + bool readyToPause = true; + taosThreadMutexLock(&execInfo.lock); + + for(int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) { + STaskId *p = taosArrayGet(execInfo.pTaskList, i); + + STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p)); + if (pEntry == NULL) { + continue; + } + + if (pEntry->id.streamId != pStream->uid) { + continue; + } + + if (pEntry->status == TASK_STATUS__UNINIT || pEntry->status == TASK_STATUS__CK) { + mError("stream:%s uid:0x%" PRIx64 " vgId:%d task:0x%x in checkpoint/uninit status, not ready for pause", + pStream->name, pStream->uid, pEntry->nodeId, pEntry->id.taskId); + readyToPause = false; + } + + found = true; + } + + taosThreadMutexUnlock(&execInfo.lock); + if (!found) { + mError("stream:%s task not report status yet, not ready for pause", pauseReq.name); + sdbRelease(pMnode->pSdb, pStream); + return -1; + } + + if (!readyToPause) { + mError("stream:%s task not ready for pause yet", pauseReq.name); + sdbRelease(pMnode->pSdb, pStream); + return -1; + } + } + STrans *pTrans = doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_PAUSE_NAME, "pause the stream"); if (pTrans == NULL) { diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 5e28568763..688be61015 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -997,11 +997,19 @@ static int32_t tqProcessTaskResumeImpl(void* handle, SStreamTask* pTask, int64_t ETaskStatus status = streamTaskGetStatus(pTask)->state; int32_t level = pTask->info.taskLevel; - if (level == TASK_LEVEL__SINK && pTask->info.fillHistory == 0) { - if (status == TASK_STATUS__UNINIT) { - tqDebug("s-task:%s initialize the uninit sink stream task after resume from pause", pTask->id.idStr); - tqStreamStartOneTaskAsync(pMeta, pTask->pMsgCb, pTask->id.streamId, pTask->id.taskId); - } + if (level == TASK_LEVEL__SINK) { + ASSERT (status != TASK_STATUS__UNINIT); /*{ +// tqDebug("s-task:%s initialize the uninit sink stream task after resume from pause", pTask->id.idStr); +// +// if (pTask->pBackend == NULL) { // TODO: add test cases for this +// int32_t code = pMeta->expandTaskFn(pTask); +// if (code != TSDB_CODE_SUCCESS) { +// tqError("s-task:%s vgId:%d failed to expand stream backend", pTask->id.idStr, vgId); +// streamMetaAddFailedTaskSelf(pTask, pTask->execInfo.readyTs); +// } +// } +// int32_t ret = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT); + }*/ streamMetaReleaseTask(pMeta, pTask); return 0; } @@ -1027,11 +1035,21 @@ static int32_t tqProcessTaskResumeImpl(void* handle, SStreamTask* pTask, int64_t } else { streamTrySchedExec(pTask); } - } else if (status == TASK_STATUS__UNINIT) { // todo: fill-history task init ? - if (pTask->info.fillHistory == 0) { - tqDebug("s-task:%s initialize the uninit task after resume from pause", pTask->id.idStr); - tqStreamStartOneTaskAsync(pMeta, pTask->pMsgCb, pTask->id.streamId, pTask->id.taskId); - } + } else { + ASSERT (status != TASK_STATUS__UNINIT);// { // todo: fill-history task init ? +// if (pTask->info.fillHistory == 0) { + // tqDebug("s-task:%s initialize the uninit task after resume from pause", pTask->id.idStr); + // + // if (pTask->pBackend == NULL) { // TODO: add test cases for this + // int32_t code = pMeta->expandTaskFn(pTask); + // if (code != TSDB_CODE_SUCCESS) { + // tqError("s-task:%s vgId:%d failed to expand stream backend", pTask->id.idStr, vgId); + // streamMetaAddFailedTaskSelf(pTask, pTask->execInfo.readyTs); + // } + // } + // int32_t ret = */streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT); +// } +// } } streamMetaReleaseTask(pMeta, pTask); diff --git a/source/libs/stream/src/streamCheckStatus.c b/source/libs/stream/src/streamCheckStatus.c index b64e0bb6d2..1728147c11 100644 --- a/source/libs/stream/src/streamCheckStatus.c +++ b/source/libs/stream/src/streamCheckStatus.c @@ -58,8 +58,7 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_ } if (pInfo->stage < stage) { - stError("s-task:%s receive check msg from upstream task:0x%x(vgId:%d), new stage received:%" PRId64 - ", prev:%" PRId64, + stError("s-task:%s receive check msg from upstream task:0x%x(vgId:%d), new stage received:%" PRId64 ", prev:%" PRId64, id, upstreamTaskId, vgId, stage, pInfo->stage); // record the checkpoint failure id and sent to mnode taosThreadMutexLock(&pTask->lock); @@ -170,13 +169,13 @@ void streamTaskProcessCheckMsg(SStreamMeta* pMeta, SStreamTaskCheckReq* pReq, SS SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, taskId); if (pTask != NULL) { pRsp->status = streamTaskCheckStatus(pTask, pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->stage, &pRsp->oldStage); - streamMetaReleaseTask(pMeta, pTask); SStreamTaskState* pState = streamTaskGetStatus(pTask); stDebug("s-task:%s status:%s, stage:%" PRId64 " recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), check_status:%d", pTask->id.idStr, pState->name, pRsp->oldStage, pRsp->reqId, pRsp->upstreamTaskId, pRsp->upstreamNodeId, pRsp->status); + streamMetaReleaseTask(pMeta, pTask); } else { pRsp->status = TASK_DOWNSTREAM_NOT_READY; stDebug("tq recv task check(taskId:0x%" PRIx64 "-0x%x not built yet) req(reqId:0x%" PRIx64 diff --git a/source/libs/stream/src/streamStartHistory.c b/source/libs/stream/src/streamStartHistory.c index 050d88aaf1..adf4c3bef9 100644 --- a/source/libs/stream/src/streamStartHistory.c +++ b/source/libs/stream/src/streamStartHistory.c @@ -240,7 +240,6 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) { if (code == TSDB_CODE_SUCCESS) { checkFillhistoryTaskStatus(pTask, pHisTask); } - } streamMetaReleaseTask(pMeta, pHisTask); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 7d869ce538..4a6e98f4a0 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -836,7 +836,7 @@ static int32_t taskPauseCallback(SStreamTask* pTask, void* param) { SStreamMeta* pMeta = pTask->pMeta; int32_t num = atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1); - stInfo("vgId:%d s-task:%s pause stream task. pause task num:%d", pMeta->vgId, pTask->id.idStr, num); + stInfo("vgId:%d s-task:%s pause stream task. paused task num:%d", pMeta->vgId, pTask->id.idStr, num); // in case of fill-history task, stop the tsdb file scan operation. if (pTask->info.fillHistory == 1) { diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index 82ea2f88ef..75d62ff324 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -623,9 +623,9 @@ void doInitStateTransferTable(void) { taosArrayPush(streamTaskSMTrans, &trans); trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, &info); taosArrayPush(streamTaskSMTrans, &trans); - - trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, NULL); + trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, &info); taosArrayPush(streamTaskSMTrans, &trans); + trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, NULL); taosArrayPush(streamTaskSMTrans, &trans); trans = createStateTransform(TASK_STATUS__STOP, TASK_STATUS__STOP, TASK_EVENT_PAUSE, NULL, NULL, NULL);