From 5a9500055daf8f28d21f4bbd2ce21ed5ce150130 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 21 Jun 2024 18:44:54 +0800 Subject: [PATCH] fix(stream): check the task status checkpoint-ready msg send timer. --- source/libs/stream/src/streamBackendRocksdb.c | 6 +++++- source/libs/stream/src/streamCheckpoint.c | 9 ++++----- source/libs/stream/src/streamDispatch.c | 8 ++++++++ 3 files changed, 17 insertions(+), 6 deletions(-) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index cd2f304991..c151193284 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -2053,7 +2053,11 @@ STaskDbWrapper* taskDbOpenImpl(const char* key, char* statePath, char* dbPath) { stInfo("%s newly create db in state-backend", key); // pre create db pTaskDb->db = rocksdb_open(pTaskDb->pCfOpts[0], dbPath, &err); - if (pTaskDb->db == NULL) goto _EXIT; + if (pTaskDb->db == NULL) { + stError("%s open state-backend failed, reason:%s", key, err); + goto _EXIT; + } + rocksdb_close(pTaskDb->db); if (cfNames != NULL) { diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index d8b86edf53..4c4f5e98ab 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -737,14 +737,13 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) { } pActiveInfo->checkCounter = 0; - stDebug("s-task:%s vgId:%d checkpoint-trigger monitor in tmr, ts:%" PRId64, pTask->id.idStr, vgId, now); + stDebug("s-task:%s vgId:%d checkpoint-trigger monitor in tmr, ts:%" PRId64, id, vgId, now); taosThreadMutexLock(&pTask->lock); SStreamTaskState* pState = streamTaskGetStatus(pTask); if (pState->state != TASK_STATUS__CK) { int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); - stDebug("s-task:%s vgId:%d not in checkpoint status, quit from monitor checkpoint-trigger, ref:%d", pTask->id.idStr, - vgId, ref); + stDebug("s-task:%s vgId:%d not in checkpoint status, quit from monitor checkpoint-trigger, ref:%d", id, vgId, ref); taosThreadMutexUnlock(&pTask->lock); streamMetaReleaseTask(pTask->pMeta, pTask); @@ -754,8 +753,8 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) { // checkpoint-trigger recv flag is set, quit if (pActiveInfo->allUpstreamTriggerRecv) { int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); - stDebug("s-task:%s vgId:%d all checkpoint-trigger recv, quit from monitor checkpoint-trigger, ref:%d", - pTask->id.idStr, vgId, ref); + stDebug("s-task:%s vgId:%d all checkpoint-trigger recv, quit from monitor checkpoint-trigger, ref:%d", id, vgId, + ref); taosThreadMutexUnlock(&pTask->lock); streamMetaReleaseTask(pTask->pMeta, pTask); diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 3652de3eba..dce30cc028 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -796,6 +796,14 @@ static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) { stDebug("s-task:%s in sending checkpoint-ready msg monitor timer", id); taosThreadMutexLock(&pActiveInfo->lock); + SStreamTaskState* pState = streamTaskGetStatus(pTask); + if (pState->state != TASK_STATUS__CK) { + int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + stDebug("s-task:%s vgId:%d not in checkpoint, quit from monitor checkpoint-ready send, ref:%d", id, vgId, ref); + taosThreadMutexUnlock(&pTask->lock); + streamMetaReleaseTask(pTask->pMeta, pTask); + return; + } SArray* pList = pActiveInfo->pReadyMsgList; SArray* pNotRspList = taosArrayInit(4, sizeof(int32_t));