From b0a4ed3217c8122b261704c990ecda5f1f187c49 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 28 Sep 2023 23:00:29 +0800 Subject: [PATCH] fix(stream): drop related fill-history task when dropping stream tasks. --- source/dnode/vnode/src/inc/vnodeInt.h | 2 +- source/dnode/vnode/src/tq/tq.c | 32 ++++++++++++++++++++------- source/dnode/vnode/src/vnd/vnodeSvr.c | 2 +- 3 files changed, 26 insertions(+), 10 deletions(-) diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 2fab7c087a..2ffdf2fced 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -247,7 +247,7 @@ int32_t tqProcessVgCommittedInfoReq(STQ* pTq, SRpcMsg* pMsg); // tq-stream int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen); -int32_t tqProcessTaskDropReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen); +int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen); int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen); int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen); int32_t tqProcessTaskCheckReq(STQ* pTq, SRpcMsg* pMsg); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 9780a1f046..455937b603 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1406,20 +1406,36 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) { } } -int32_t tqProcessTaskDropReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { +int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen) { SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg; - tqDebug("vgId:%d receive msg to drop stream task:0x%x", TD_VID(pTq->pVnode), pReq->taskId); - streamMetaUnregisterTask(pTq->pStreamMeta, pReq->streamId, pReq->taskId); + + int32_t vgId = TD_VID(pTq->pVnode); + SStreamMeta* pMeta = pTq->pStreamMeta; + tqDebug("vgId:%d receive msg to drop s-task:0x%x", vgId, pReq->taskId); + + SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId); + if (pTask != NULL) { + // drop the related fill-history task firstly + if (pTask->hTaskInfo.id.taskId != 0) { + STaskId* pHTaskId = &pTask->hTaskInfo.id; + streamMetaUnregisterTask(pMeta, pHTaskId->streamId, pHTaskId->taskId); + tqDebug("vgId:%d drop fill-history task:0x%x dropped firstly", vgId, (int32_t)pHTaskId->taskId); + } + } + streamMetaReleaseTask(pMeta, pTask); + + // drop the stream task now + streamMetaUnregisterTask(pMeta, pReq->streamId, pReq->taskId); // commit the update - taosWLockLatch(&pTq->pStreamMeta->lock); - int32_t numOfTasks = streamMetaGetNumOfTasks(pTq->pStreamMeta); - tqDebug("vgId:%d task:0x%x dropped, remain tasks:%d", TD_VID(pTq->pVnode), pReq->taskId, numOfTasks); + taosWLockLatch(&pMeta->lock); + int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); + tqDebug("vgId:%d task:0x%x dropped, remain tasks:%d", vgId, pReq->taskId, numOfTasks); - if (streamMetaCommit(pTq->pStreamMeta) < 0) { + if (streamMetaCommit(pMeta) < 0) { // persist to disk } - taosWUnLockLatch(&pTq->pStreamMeta->lock); + taosWUnLockLatch(&pMeta->lock); return 0; } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 5622568b7b..f0cfe6cd0b 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -567,7 +567,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg } } break; case TDMT_STREAM_TASK_DROP: { - if (tqProcessTaskDropReq(pVnode->pTq, ver, pMsg->pCont, pMsg->contLen) < 0) { + if (tqProcessTaskDropReq(pVnode->pTq, pMsg->pCont, pMsg->contLen) < 0) { goto _err; } } break;