From e1c4451f81fb3fae33197187b5f2761655ddd85d Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 28 Feb 2025 17:46:54 +0800 Subject: [PATCH] fix(stream): avoid lock in timer to avoid sync operation blocks. --- include/libs/stream/tstream.h | 1 + source/dnode/vnode/src/tq/tqStreamTask.c | 11 +++++++---- source/libs/stream/src/streamUtil.c | 9 +++++++++ 3 files changed, 17 insertions(+), 4 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index c4c0aaf742..de9466f736 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -788,6 +788,7 @@ void streamMetaClearSetUpdateTaskListComplete(SStreamMeta* pMeta); bool streamMetaInitUpdateTaskList(SStreamMeta* pMeta, int32_t transId); void streamMetaRLock(SStreamMeta* pMeta); +int32_t streamMetaTryRlock(SStreamMeta* pMeta); void streamMetaRUnLock(SStreamMeta* pMeta); void streamMetaWLock(SStreamMeta* pMeta); void streamMetaWUnLock(SStreamMeta* pMeta); diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index b34ea78f64..77765e94c6 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -146,9 +146,13 @@ static void doStartScanWal(void* param, void* tmrId) { return; } - streamMetaRLock(pMeta); - numOfTasks = taosArrayGetSize(pMeta->pTaskList); - streamMetaRUnLock(pMeta); + code = streamMetaTryRlock(pMeta); + if (code == 0) { + numOfTasks = taosArrayGetSize(pMeta->pTaskList); + streamMetaRUnLock(pMeta); + } else { + numOfTasks = 0; + } if (numOfTasks == 0) { goto _end; @@ -167,7 +171,6 @@ static void doStartScanWal(void* param, void* tmrId) { } _end: - streamTmrStart(doStartScanWal, SCAN_WAL_IDLE_DURATION, pParam, pTimer, &pMeta->scanInfo.scanTimer, vgId, "scan-wal"); tqDebug("vgId:%d scan-wal will start in %dms", vgId, SCAN_WAL_IDLE_DURATION*SCAN_WAL_WAIT_COUNT); diff --git a/source/libs/stream/src/streamUtil.c b/source/libs/stream/src/streamUtil.c index 4c481e6041..11e291c876 100644 --- a/source/libs/stream/src/streamUtil.c +++ b/source/libs/stream/src/streamUtil.c @@ -54,6 +54,15 @@ void streamMetaRUnLock(SStreamMeta* pMeta) { } } +int32_t streamMetaTryRlock(SStreamMeta* pMeta) { + int32_t code = taosThreadRwlockTryRdlock(&pMeta->lock); + if (code) { + stError("vgId:%d try meta-rlock failed, code:%s", pMeta->vgId, tstrerror(code)); + } + + return code; +} + void streamMetaWLock(SStreamMeta* pMeta) { // stTrace("vgId:%d meta-wlock", pMeta->vgId); int32_t code = taosThreadRwlockWrlock(&pMeta->lock);