From 8c15f59d68f3dabf22f0ac34aa1b8b6b7d0d95f1 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 20 Sep 2024 14:22:59 +0800 Subject: [PATCH] fix(stream): fix the deadlock. --- source/libs/stream/src/streamBackendRocksdb.c | 3 --- source/libs/stream/src/streamCheckpoint.c | 11 +---------- 2 files changed, 1 insertion(+), 13 deletions(-) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 57f75743ac..d27aaf39e1 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -3439,7 +3439,6 @@ int32_t streamStateGetFirst_rocksdb(SStreamState* pState, SWinKey* key) { } int32_t streamStateGetGroupKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) { - stDebug("streamStateGetGroupKVByCur_rocksdb"); if (!pCur) { return -1; } @@ -3885,7 +3884,6 @@ SStreamStateCur* streamStateSessionSeekKeyPrev_rocksdb(SStreamState* pState, con } int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen) { - stDebug("streamStateSessionGetKVByCur_rocksdb"); if (!pCur) { return -1; } @@ -3985,7 +3983,6 @@ SStreamStateCur* streamStateFillGetCur_rocksdb(SStreamState* pState, const SWinK return NULL; } int32_t streamStateFillGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) { - stDebug("streamStateFillGetKVByCur_rocksdb"); if (!pCur) { return -1; } diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 769116264d..47eec2119a 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -900,9 +900,6 @@ static int32_t doChkptStatusCheck(SStreamTask* pTask) { int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); stDebug("s-task:%s vgId:%d all checkpoint-trigger recv, quit from monitor checkpoint-trigger, ref:%d", id, vgId, ref); - -// streamMutexUnlock(&pTask->lock); -// streamMetaReleaseTask(pTask->pMeta, pTask); return -1; } @@ -911,9 +908,6 @@ static int32_t doChkptStatusCheck(SStreamTask* pTask) { stWarn("s-task:%s vgId:%d checkpoint-trigger retrieve by previous checkpoint procedure, checkpointId:%" PRId64 ", quit, ref:%d", id, vgId, pTmrInfo->launchChkptId, ref); - -// streamMutexUnlock(&pActiveInfo->lock); -// streamMetaReleaseTask(pTask->pMeta, pTask); return -1; } @@ -922,9 +916,6 @@ static int32_t doChkptStatusCheck(SStreamTask* pTask) { int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); stWarn("s-task:%s vgId:%d active checkpoint may be cleared, quit from retrieve checkpoint-trigger send tmr, ref:%d", id, vgId, ref); - -// streamMutexUnlock(&pActiveInfo->lock); -// streamMetaReleaseTask(pTask->pMeta, pTask); return -1; } @@ -1020,7 +1011,7 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) { int32_t code = doChkptStatusCheck(pTask); if (code) { - streamMutexUnlock(&pTask->lock); + streamMutexUnlock(&pActiveInfo->lock); streamMetaReleaseTask(pTask->pMeta, pTask); return; }