From 4f852704f587be7196df55b38cad848acfe07af4 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 17 Dec 2023 02:06:01 +0800 Subject: [PATCH] fix(stream): send checkpoint complete msg to mnode when dropping it. --- source/dnode/vnode/src/tqCommon/tqCommon.c | 5 +---- source/libs/stream/src/streamDispatch.c | 17 +++++++++++----- source/libs/stream/src/streamTask.c | 23 +++++++++++++--------- source/libs/stream/src/streamTaskSm.c | 10 +++++++++- 4 files changed, 36 insertions(+), 19 deletions(-) diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 9421b6b1af..13385a544a 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -603,7 +603,7 @@ int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, int64_t sversion, char* int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen) { SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg; - int32_t vgId = pMeta->vgId; + int32_t vgId = pMeta->vgId; tqDebug("vgId:%d receive msg to drop s-task:0x%x", vgId, pReq->taskId); SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId); @@ -617,9 +617,6 @@ int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen streamMetaReleaseTask(pMeta, pTask); } - // TODO: send the checkpoint complete msg if it is in checkpoint procedure. - - // drop the stream task now streamMetaUnregisterTask(pMeta, pReq->streamId, pReq->taskId); diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 1a67b08749..5fb7db233f 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -720,14 +720,21 @@ int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask) { // this function is only invoked by source task, and send rsp to mnode int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask) { - ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE && taosArrayGetSize(pTask->pReadyMsgList) == 1); - SStreamChkptReadyInfo* pInfo = taosArrayGet(pTask->pReadyMsgList, 0); + taosThreadMutexLock(&pTask->lock); - tmsgSendRsp(&pInfo->msg); + ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE); - taosArrayClear(pTask->pReadyMsgList); - stDebug("s-task:%s level:%d source checkpoint completed msg sent to mnode", pTask->id.idStr, pTask->info.taskLevel); + if (taosArrayGetSize(pTask->pReadyMsgList) == 1) { + SStreamChkptReadyInfo* pInfo = taosArrayGet(pTask->pReadyMsgList, 0); + tmsgSendRsp(&pInfo->msg); + taosArrayClear(pTask->pReadyMsgList); + stDebug("s-task:%s level:%d source checkpoint completed msg sent to mnode", pTask->id.idStr, pTask->info.taskLevel); + } else { + stDebug("s-task:%s level:%d already send rsp to mnode", pTask->id.idStr, pTask->info.taskLevel); + } + + taosThreadMutexUnlock(&pTask->lock); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index db0217f000..335f9d27d5 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -302,17 +302,25 @@ static void freeUpstreamItem(void* p) { } void tFreeStreamTask(SStreamTask* pTask) { - int32_t taskId = pTask->id.taskId; - + char* p = NULL; + int32_t taskId = pTask->id.taskId; STaskExecStatisInfo* pStatis = &pTask->execInfo; - stDebug("start to free s-task:0x%x, %p, state:%p", taskId, pTask, pTask->pState); + ETaskStatus status1 = TASK_STATUS__UNINIT; + taosThreadMutexLock(&pTask->lock); + if (pTask->status.pSM != NULL) { + status1 = streamTaskGetStatus(pTask, &p); + } + taosThreadMutexUnlock(&pTask->lock); + + stDebug("start to free s-task:0x%x, %p, state:%s", taskId, pTask, p); + + SCheckpointInfo* pCkInfo = &pTask->chkInfo; stDebug("s-task:0x%x task exec summary: create:%" PRId64 ", init:%" PRId64 ", start:%" PRId64 ", updateCount:%d latestUpdate:%" PRId64 ", latestCheckPoint:%" PRId64 ", ver:%" PRId64 " nextProcessVer:%" PRId64 ", checkpointCount:%d", taskId, pStatis->created, pStatis->init, pStatis->start, pStatis->updateCount, pStatis->latestUpdateTs, - pTask->chkInfo.checkpointId, pTask->chkInfo.checkpointVer, pTask->chkInfo.nextProcessVer, - pStatis->checkpoint); + pCkInfo->checkpointId, pCkInfo->checkpointVer, pCkInfo->nextProcessVer, pStatis->checkpoint); // remove the ref by timer while (pTask->status.timerActive > 0) { @@ -335,7 +343,6 @@ void tFreeStreamTask(SStreamTask* pTask) { pTask->msgInfo.pTimer = NULL; } - int32_t status = atomic_load_8((int8_t*)&(pTask->status.taskStatus)); if (pTask->inputq.queue) { streamQueueClose(pTask->inputq.queue, pTask->id.taskId); } @@ -377,9 +384,8 @@ void tFreeStreamTask(SStreamTask* pTask) { if (pTask->pState) { stDebug("s-task:0x%x start to free task state", taskId); - streamStateClose(pTask->pState, status == TASK_STATUS__DROPPING); + streamStateClose(pTask->pState, status1 == TASK_STATUS__DROPPING); taskDbRemoveRef(pTask->pBackend); - } if (pTask->id.idStr != NULL) { @@ -396,7 +402,6 @@ void tFreeStreamTask(SStreamTask* pTask) { } pTask->status.pSM = streamDestroyStateMachine(pTask->status.pSM); - streamTaskDestroyUpstreamInfo(&pTask->upstreamInfo); pTask->msgInfo.pRetryList = taosArrayDestroy(pTask->msgInfo.pRetryList); diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index cac3766893..68ae1cce36 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -56,6 +56,7 @@ static int32_t streamTaskInitStatus(SStreamTask* pTask); static int32_t streamTaskKeepCurrentVerInWal(SStreamTask* pTask); static int32_t initStateTransferTable(); static void doInitStateTransferTable(void); +static int32_t streamTaskSendTransSuccessMsg(SStreamTask* pTask); static STaskStateTrans createStateTransform(ETaskStatus current, ETaskStatus next, EStreamTaskEvent event, __state_trans_fn fn, __state_trans_succ_fn succFn, @@ -87,6 +88,13 @@ static int32_t streamTaskDoCheckpoint(SStreamTask* pTask) { return 0; } +int32_t streamTaskSendTransSuccessMsg(SStreamTask* pTask) { + if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { + streamTaskSendCheckpointSourceRsp(pTask); + } + return 0; +} + int32_t streamTaskKeepCurrentVerInWal(SStreamTask* pTask) { ASSERT(HAS_RELATED_FILLHISTORY_TASK(pTask)); @@ -551,7 +559,7 @@ void doInitStateTransferTable(void) { taosArrayPush(streamTaskSMTrans, &trans); trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL, true); taosArrayPush(streamTaskSMTrans, &trans); - trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL, true); + trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, streamTaskSendTransSuccessMsg, NULL, NULL, true); taosArrayPush(streamTaskSMTrans, &trans); trans = createStateTransform(TASK_STATUS__STREAM_SCAN_HISTORY, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL, true); taosArrayPush(streamTaskSMTrans, &trans);