diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 9137965849..9772c7ecf3 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -845,6 +845,7 @@ int32_t streamMetaRemoveTask(SStreamMeta* pMeta, STaskId* pKey); int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask, bool* pAdded); int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId); int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta); +SStreamTask* streamMetaAcquireTaskNoLock(SStreamMeta* pMeta, int64_t streamId, int32_t taskId); SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId); void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask); void streamMetaClear(SStreamMeta* pMeta); diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index fde5082212..63e71f8ee8 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -123,6 +123,7 @@ int32_t tqScanWalAsync(STQ* pTq, bool ckPause) { return 0; } +// todo: createMsg to invoke this function in stream threads, to avoid blocking the syn thread int32_t tqStopStreamTasks(STQ* pTq) { SStreamMeta* pMeta = pTq->pStreamMeta; int32_t vgId = TD_VID(pTq->pVnode); @@ -142,7 +143,7 @@ int32_t tqStopStreamTasks(STQ* pTq) { for (int32_t i = 0; i < numOfTasks; ++i) { SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i); - SStreamTask* pTask = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId); + SStreamTask* pTask = streamMetaAcquireTaskNoLock(pMeta, pTaskId->streamId, pTaskId->taskId); if (pTask == NULL) { continue; } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 151605493a..686cf841e1 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -607,22 +607,23 @@ int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta) { return (int32_t)size; } -SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) { - streamMetaRLock(pMeta); - +SStreamTask* streamMetaAcquireTaskNoLock(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) { STaskId id = {.streamId = streamId, .taskId = taskId}; SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); - if (ppTask != NULL) { - if (!streamTaskShouldStop(*ppTask)) { - int32_t ref = atomic_add_fetch_32(&(*ppTask)->refCnt, 1); - streamMetaRUnLock(pMeta); - stTrace("s-task:%s acquire task, ref:%d", (*ppTask)->id.idStr, ref); - return *ppTask; - } + if (ppTask == NULL || streamTaskShouldStop(*ppTask)) { + return NULL; } + int32_t ref = atomic_add_fetch_32(&(*ppTask)->refCnt, 1); + stTrace("s-task:%s acquire task, ref:%d", (*ppTask)->id.idStr, ref); + return *ppTask; +} + +SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) { + streamMetaRLock(pMeta); + SStreamTask* p = streamMetaAcquireTaskNoLock(pMeta, streamId, taskId); streamMetaRUnLock(pMeta); - return NULL; + return p; } void streamMetaReleaseTask(SStreamMeta* UNUSED_PARAM(pMeta), SStreamTask* pTask) { @@ -1293,6 +1294,7 @@ void streamMetaResetStartInfo(STaskStartInfo* pStartInfo) { void streamMetaRLock(SStreamMeta* pMeta) { stTrace("vgId:%d meta-rlock", pMeta->vgId); taosRLockLatch(&pMeta->lock); + ASSERT(pMeta->lock != 0x40000001); } void streamMetaRUnLock(SStreamMeta* pMeta) { @@ -1303,6 +1305,7 @@ void streamMetaRUnLock(SStreamMeta* pMeta) { void streamMetaWLock(SStreamMeta* pMeta) { stTrace("vgId:%d meta-wlock", pMeta->vgId); taosWLockLatch(&pMeta->lock); + ASSERT(pMeta->lock != 0x40000001); stTrace("vgId:%d meta-wlock completed", pMeta->vgId); }