diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index f7d42794be..a458db8e68 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -789,6 +789,7 @@ void streamMetaClearSetUpdateTaskListComplete(SStreamMeta* pMeta); bool streamMetaInitUpdateTaskList(SStreamMeta* pMeta, int32_t transId); void streamMetaRLock(SStreamMeta* pMeta); +int32_t streamMetaTryRlock(SStreamMeta* pMeta); void streamMetaRUnLock(SStreamMeta* pMeta); void streamMetaWLock(SStreamMeta* pMeta); void streamMetaWUnLock(SStreamMeta* pMeta); diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index bb1bedd022..f37f77ef2a 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -148,9 +148,13 @@ static void doStartScanWal(void* param, void* tmrId) { return; } - streamMetaRLock(pMeta); - numOfTasks = taosArrayGetSize(pMeta->pTaskList); - streamMetaRUnLock(pMeta); + code = streamMetaTryRlock(pMeta); + if (code == 0) { + numOfTasks = taosArrayGetSize(pMeta->pTaskList); + streamMetaRUnLock(pMeta); + } else { + numOfTasks = 0; + } if (numOfTasks == 0) { goto _end; @@ -169,7 +173,6 @@ static void doStartScanWal(void* param, void* tmrId) { } _end: - streamTmrStart(doStartScanWal, SCAN_WAL_IDLE_DURATION, pParam, pTimer, &pMeta->scanInfo.scanTimer, vgId, "scan-wal"); tqDebug("vgId:%d scan-wal will start in %dms", vgId, SCAN_WAL_IDLE_DURATION*SCAN_WAL_WAIT_COUNT); diff --git a/source/libs/stream/src/streamUtil.c b/source/libs/stream/src/streamUtil.c index 4c481e6041..11e291c876 100644 --- a/source/libs/stream/src/streamUtil.c +++ b/source/libs/stream/src/streamUtil.c @@ -54,6 +54,15 @@ void streamMetaRUnLock(SStreamMeta* pMeta) { } } +int32_t streamMetaTryRlock(SStreamMeta* pMeta) { + int32_t code = taosThreadRwlockTryRdlock(&pMeta->lock); + if (code) { + stError("vgId:%d try meta-rlock failed, code:%s", pMeta->vgId, tstrerror(code)); + } + + return code; +} + void streamMetaWLock(SStreamMeta* pMeta) { // stTrace("vgId:%d meta-wlock", pMeta->vgId); int32_t code = taosThreadRwlockWrlock(&pMeta->lock);