From e734569de07850ef02c7077fd95b702debb44a1b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 18 Dec 2023 18:42:57 +0800 Subject: [PATCH] fix(stream): fix dead lock. --- source/dnode/vnode/src/tqCommon/tqCommon.c | 6 +++--- source/libs/stream/src/streamMeta.c | 12 ------------ source/libs/stream/src/streamStart.c | 3 ++- 3 files changed, 5 insertions(+), 16 deletions(-) diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 004eb240b3..a42f81edab 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -811,7 +811,7 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta) { STaskStartInfo* pStartInfo = &pMeta->startInfo; - taosWLockLatch(&pMeta->lock); + streamMetaWLock(pMeta); if (pStartInfo->restartCount > 0) { pStartInfo->restartCount -= 1; @@ -820,10 +820,10 @@ int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta) { tqDebug("vgId:%d role:%d need to restart all tasks again, restartCounter:%d", pMeta->vgId, pMeta->role, pStartInfo->restartCount); - taosWUnLockLatch(&pMeta->lock); + streamMetaWUnLock(pMeta); restartStreamTasks(pMeta, (pMeta->role == NODE_ROLE_LEADER)); } else { - taosWUnLockLatch(&pMeta->lock); + streamMetaWUnLock(pMeta); tqDebug("vgId:%d start all tasks completed", pMeta->vgId); } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 8868a5719c..e94c1b7596 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -348,18 +348,6 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF pMeta->startInfo.completeFn = fn; pMeta->pTaskDbUnique = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); - // pMeta->chkpId = streamGetLatestCheckpointId(pMeta); - // pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId); - // while (pMeta->streamBackend == NULL) { - // qError("vgId:%d failed to init stream backend", pMeta->vgId); - // taosMsleep(2 * 1000); - // qInfo("vgId:%d retry to init stream backend", pMeta->vgId); - // pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId); - // if (pMeta->streamBackend == NULL) { - // } - // } - // pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend); - pMeta->numOfPausedTasks = 0; pMeta->numOfStreamTasks = 0; stInfo("vgId:%d open stream meta successfully, latest checkpoint:%" PRId64 ", stage:%" PRId64, vgId, pMeta->chkpId, diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index dd713290f3..8bd7984d29 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -1107,12 +1107,13 @@ int32_t streamMetaUpdateTaskDownstreamStatus(SStreamMeta* pMeta, int64_t streamI displayStatusInfo(pMeta, pStartInfo->pReadyTaskSet, true); displayStatusInfo(pMeta, pStartInfo->pFailedTaskSet, false); streamMetaResetStartInfo(pStartInfo); + streamMetaWUnLock(pMeta); pStartInfo->completeFn(pMeta); } else { + streamMetaWUnLock(pMeta); stDebug("vgId:%d recv check down results:%d, total:%d", pMeta->vgId, numOfRecv, numOfTotal); } - streamMetaWUnLock(pMeta); return TSDB_CODE_SUCCESS; }