From f9801ba9c54235cb9bd0c1b57d506c6e8f9388c7 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 20 Apr 2023 18:58:07 +0800 Subject: [PATCH] enh(stream): stop stream asap. --- source/dnode/mgmt/mgmt_vnode/src/vmInt.c | 2 +- source/dnode/vnode/src/inc/tq.h | 1 + source/dnode/vnode/src/inc/vnodeInt.h | 1 + source/dnode/vnode/src/tq/tq.c | 23 +++++++++++++++++++ source/libs/executor/src/timewindowoperator.c | 5 ++++ 5 files changed, 31 insertions(+), 1 deletion(-) diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c index 0244a4fd6e..16e7ffc536 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c @@ -119,6 +119,7 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal) pVnode->pFetchQ->threadId); while (!taosQueueEmpty(pVnode->pFetchQ)) taosMsleep(10); + tqNotifyClose(pVnode->pImpl->pTq); dInfo("vgId:%d, wait for vnode stream queue:%p is empty", pVnode->vgId, pVnode->pStreamQ); while (!taosQueueEmpty(pVnode->pStreamQ)) taosMsleep(10); @@ -141,7 +142,6 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal) dInfo("vgId:%d, vnode is closed", pVnode->vgId); if (commitAndRemoveWal) { - char path[TSDB_FILENAME_LEN] = {0}; snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d%swal", TD_DIRSEP, pVnode->vgId, TD_DIRSEP); dInfo("vgId:%d, remove all wals, path:%s", pVnode->vgId, path); tfsRmdir(pMgmt->pTfs, path); diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index db17e4f533..fdd1ece41a 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -121,6 +121,7 @@ struct STQ { TTB* pExecStore; TTB* pCheckStore; SStreamMeta* pStreamMeta; + bool closing; }; typedef struct { diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 81f7c3d52a..416bc6cdc7 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -190,6 +190,7 @@ int32_t tsdbSetKeepCfg(STsdb* pTsdb, STsdbCfg* pCfg); int tqInit(); void tqCleanUp(); STQ* tqOpen(const char* path, SVnode* pVnode); +void tqNotifyClose(STQ*); void tqClose(STQ*); int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver); int tqRegisterPushHandle(STQ* pTq, void* pHandle, const SMqPollReq* pRequest, SRpcMsg* pRpcMsg, SMqDataRsp* pDataRsp, diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 962b89732e..1a7af742ba 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -97,6 +97,7 @@ STQ* tqOpen(const char* path, SVnode* pVnode) { return NULL; } + pTq->closing = false; pTq->path = taosStrdup(path); pTq->pVnode = pVnode; pTq->walLogLastVer = pVnode->pWal->vers.lastVer; @@ -154,6 +155,28 @@ void tqClose(STQ* pTq) { taosMemoryFree(pTq); } +void tqNotifyClose(STQ* pTq) { + if (pTq != NULL) { + pTq->closing = true; + taosWLockLatch(&pTq->pStreamMeta->lock); + + void* pIter = NULL; + while (1) { + pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter); + if (pIter == NULL) { + break; + } + + SStreamTask* pTask = *(SStreamTask**)pIter; + tqDebug("vgId:%d s-task:%s set dropping flag", pTq->pStreamMeta->vgId, pTask->id.idStr); + pTask->status.taskStatus = TASK_STATUS__DROPPING; + qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS); + } + + taosWUnLockLatch(&pTq->pStreamMeta->lock); + } +} + static int32_t doSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch, int64_t consumerId, int32_t type) { int32_t len = 0; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 007a6f63d1..ef9dc779d9 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -2541,6 +2541,10 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { } while (1) { + if (isTaskKilled(pTaskInfo)) { + T_LONG_JMP(pTaskInfo->env, pTaskInfo->code); + } + SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); if (pBlock == NULL) { pOperator->status = OP_RES_TO_RETURN; @@ -2635,6 +2639,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { while ((pIte = tSimpleHashIterate(pInfo->pUpdatedMap, pIte, &iter)) != NULL) { taosArrayPush(pInfo->pUpdated, pIte); } + tSimpleHashCleanup(pInfo->pUpdatedMap); pInfo->pUpdatedMap = NULL; taosArraySort(pInfo->pUpdated, winKeyCmprImpl);