From 1ccbb7798131c14ecf7485c583c9dc4162a55889 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 24 Oct 2023 19:11:24 +0800 Subject: [PATCH] fix(stream): fix dead lock when shutdown all tasks. --- include/libs/stream/tstream.h | 6 +----- source/dnode/vnode/src/tq/tq.c | 9 ++++++++- source/libs/stream/inc/streamInt.h | 5 +++++ source/libs/stream/src/streamMeta.c | 11 ++++++----- 4 files changed, 20 insertions(+), 11 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 54ca7f1566..a027b4f9ba 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -181,11 +181,6 @@ typedef struct { SSDataBlock* pBlock; } SStreamRefDataBlock; -typedef struct { - int8_t type; - SSDataBlock* pBlock; -} SStreamTrigger; - typedef struct SStreamQueueNode SStreamQueueNode; struct SStreamQueueNode { @@ -804,6 +799,7 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta); void streamMetaNotifyClose(SStreamMeta* pMeta); void streamMetaStartHb(SStreamMeta* pMeta); void streamMetaInitForSnode(SStreamMeta* pMeta); +bool streamMetaTaskInTimer(SStreamMeta* pMeta); // checkpoint int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index c8fec925db..dbd1e02732 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1924,8 +1924,15 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { taosWUnLockLatch(&pMeta->lock); } else { tqDebug("vgId:%d tasks are all updated and stopped, restart them", vgId); - terrno = 0; + taosWUnLockLatch(&pMeta->lock); + + while (streamMetaTaskInTimer(pMeta)) { + qDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pMeta->vgId); + taosMsleep(100); + } + + taosWLockLatch(&pMeta->lock); int32_t code = streamMetaReopen(pMeta); if (code != 0) { tqError("vgId:%d failed to reopen stream meta", vgId); diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index d98fa2f436..c63b51d745 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -52,6 +52,11 @@ extern "C" { #define stTrace(...) do { if (stDebugFlag & DEBUG_TRACE) { taosPrintLog("STM ", DEBUG_TRACE, tqDebugFlag, __VA_ARGS__); }} while(0) // clang-format on +typedef struct { + int8_t type; + SSDataBlock* pBlock; +} SStreamTrigger; + typedef struct SStreamGlobalEnv { int8_t inited; void* timer; diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 7ffddf9207..c536d01516 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -274,6 +274,7 @@ int32_t streamMetaReopen(SStreamMeta* pMeta) { } void streamMetaClear(SStreamMeta* pMeta) { + // remove all existed tasks in this vnode void* pIter = NULL; while ((pIter = taosHashIterate(pMeta->pTasksMap, pIter)) != NULL) { SStreamTask* p = *(SStreamTask**)pIter; @@ -694,7 +695,8 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) { tFreeStreamTask(pTask); stError( "vgId:%d stream read incompatible data, rm %s/vnode/vnode*/tq/stream if taosd cannot start, and rebuild " - "stream manually", vgId, tsDataDir); + "stream manually", + vgId, tsDataDir); return -1; } tDecoderClear(&decoder); @@ -855,7 +857,7 @@ void metaHbToMnode(void* param, void* tmrId) { } // not leader not send msg - if (pMeta->role == NODE_ROLE_FOLLOWER) { + if (pMeta->role != NODE_ROLE_LEADER) { stInfo("vgId:%d follower not send hb to mnode", pMeta->vgId); taosReleaseRef(streamMetaId, rid); pMeta->pHbInfo->hbStart = 0; @@ -978,9 +980,8 @@ void metaHbToMnode(void* param, void* tmrId) { taosReleaseRef(streamMetaId, rid); } -static bool hasStreamTaskInTimer(SStreamMeta* pMeta) { +bool streamMetaTaskInTimer(SStreamMeta* pMeta) { bool inTimer = false; - taosWLockLatch(&pMeta->lock); void* pIter = NULL; @@ -1034,7 +1035,7 @@ void streamMetaNotifyClose(SStreamMeta* pMeta) { stDebug("vgId:%d start to check all tasks", vgId); int64_t st = taosGetTimestampMs(); - while (hasStreamTaskInTimer(pMeta)) { + while (streamMetaTaskInTimer(pMeta)) { stDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pMeta->vgId); taosMsleep(100); }