From 077745390dc75dfc19d311e92f0c6e29a115e403 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 3 Aug 2024 16:34:26 +0800 Subject: [PATCH 1/6] fix(stream): add more check in tmr. --- source/libs/stream/src/streamCheckStatus.c | 7 ------- source/libs/stream/src/streamCheckpoint.c | 22 ++++++++++++++++++++++ source/libs/stream/src/streamDispatch.c | 13 ++++++++++++- 3 files changed, 34 insertions(+), 8 deletions(-) diff --git a/source/libs/stream/src/streamCheckStatus.c b/source/libs/stream/src/streamCheckStatus.c index c9ba6ffcfe..b7661e72d4 100644 --- a/source/libs/stream/src/streamCheckStatus.c +++ b/source/libs/stream/src/streamCheckStatus.c @@ -74,13 +74,6 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_ } if (pInfo->stage != stage) { - streamMutexLock(&pTask->lock); - ETaskStatus status = streamTaskGetStatus(pTask).state; - if (status == TASK_STATUS__CK) { - streamTaskSetFailedCheckpointId(pTask); - } - streamMutexUnlock(&pTask->lock); - return TASK_UPSTREAM_NEW_STAGE; } else if (pTask->status.downstreamReady != 1) { stDebug("s-task:%s vgId:%d leader:%d, downstream not ready", id, vgId, (pTask->pMeta->role == NODE_ROLE_LEADER)); diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index f817447099..741e3cc882 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -838,6 +838,28 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) { return; } + if ((pTmrInfo->launchChkptId != pActiveInfo->activeId) || (pActiveInfo->activeId == 0)) { + streamMutexUnlock(&pActiveInfo->lock); + int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); + stWarn("s-task:%s vgId:%d checkpoint-trigger retrieve by previous checkpoint procedure, checkpointId:%" PRId64 + ", quit, ref:%d", + id, vgId, pTmrInfo->launchChkptId, ref); + + streamMetaReleaseTask(pTask->pMeta, pTask); + return; + } + + // active checkpoint info is cleared for now + if ((pActiveInfo->activeId == 0) || (pActiveInfo->transId == 0) || (pTask->chkInfo.startTs == 0)) { + streamMutexUnlock(&pActiveInfo->lock); + int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); + stWarn("s-task:%s vgId:%d active checkpoint may be cleared, quit from retrieve checkpoint-trigger send tmr, ref:%d", + id, vgId, ref); + + streamMetaReleaseTask(pTask->pMeta, pTask); + return; + } + for (int32_t i = 0; i < taosArrayGetSize(pList); ++i) { SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pList, i); diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 255afb44f9..9e07059e53 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -819,8 +819,19 @@ static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) { SArray* pList = pActiveInfo->pReadyMsgList; int32_t num = taosArrayGetSize(pList); + if (pTmrInfo->launchChkptId != pActiveInfo->activeId) { + streamMutexUnlock(&pActiveInfo->lock); + int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); + stWarn("s-task:%s vgId:%d ready-msg send tmr launched by previous checkpoint procedure, checkpointId:%" PRId64 + ", quit, ref:%d", + id, vgId, pTmrInfo->launchChkptId, ref); + + streamMetaReleaseTask(pTask->pMeta, pTask); + return; + } + // active checkpoint info is cleared for now - if ((pActiveInfo->activeId == 0) && (pActiveInfo->transId == 0) && (num == 0) && (pTask->chkInfo.startTs == 0)) { + if ((pActiveInfo->activeId == 0) || (pActiveInfo->transId == 0) || (num == 0) || (pTask->chkInfo.startTs == 0)) { streamMutexUnlock(&pActiveInfo->lock); int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); stWarn("s-task:%s vgId:%d active checkpoint may be cleared, quit from readyMsg send tmr, ref:%d", id, vgId, ref); From 6a6ab9ff6a0b57426d2ab290192df0a5a5b9f02e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 4 Aug 2024 11:37:23 +0800 Subject: [PATCH 2/6] fix(stream): add check for checkpointId in retrieve-checkpoint id msg. --- source/dnode/vnode/src/tqCommon/tqCommon.c | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index b56c474ed5..11d38dde87 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -989,7 +989,12 @@ int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg) int64_t checkpointId = 0; streamTaskGetActiveCheckpointInfo(pTask, &transId, &checkpointId); - ASSERT(checkpointId == pReq->checkpointId); + if (checkpointId != pReq->checkpointId) { + tqError("s-task:%s invalid checkpoint-trigger retrieve msg from %x, current checkpointId:%"PRId64" req:%"PRId64, + pTask->id.idStr, pReq->downstreamTaskId, checkpointId, pReq->checkpointId); + streamMetaReleaseTask(pMeta, pTask); + return TSDB_CODE_INVALID_MSG; + } if (streamTaskAlreadySendTrigger(pTask, pReq->downstreamNodeId)) { // re-send the lost checkpoint-trigger msg to downstream task From 59270dfd0dfe27c2b1f77bafdf8a2133e3b10af7 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 15 Aug 2024 16:56:30 +0800 Subject: [PATCH 3/6] fix(stream): check status before start timer. --- source/libs/stream/src/streamDispatch.c | 30 ++++++++++++++++--------- 1 file changed, 19 insertions(+), 11 deletions(-) diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 5a9a60db1d..7937402ccc 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -762,18 +762,26 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) { code = sendDispatchMsg(pTask, pTask->msgInfo.pData); - streamMutexLock(&pTask->msgInfo.lock); - if (pTask->msgInfo.inMonitor == 0) { - int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); - stDebug("s-task:%s start dispatch monitor tmr in %dms, ref:%d, dispatch code:%s", id, DISPATCH_RETRY_INTERVAL_MS, - ref, tstrerror(code)); - streamStartMonitorDispatchData(pTask, DISPATCH_RETRY_INTERVAL_MS); - pTask->msgInfo.inMonitor = 1; - } else { - stDebug("s-task:%s already in dispatch monitor tmr", id); - } + streamMutexLock(&pTask->lock); + bool shouldStop = streamTaskShouldStop(pTask); + streamMutexLock(&pTask->lock); - streamMutexUnlock(&pTask->msgInfo.lock); + if (shouldStop) { + stDebug("s-task:%s in stop/dropping status, not start dispatch monitor tmr", id); + } else { + streamMutexLock(&pTask->msgInfo.lock); + if (pTask->msgInfo.inMonitor == 0) { + int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); + stDebug("s-task:%s start dispatch monitor tmr in %dms, ref:%d, dispatch code:%s", id, DISPATCH_RETRY_INTERVAL_MS, + ref, tstrerror(code)); + streamStartMonitorDispatchData(pTask, DISPATCH_RETRY_INTERVAL_MS); + pTask->msgInfo.inMonitor = 1; + } else { + stDebug("s-task:%s already in dispatch monitor tmr", id); + } + + streamMutexUnlock(&pTask->msgInfo.lock); + } // this block can not be deleted until it has been sent to downstream task successfully. return TSDB_CODE_SUCCESS; From 761ae2ab4b3767806ce8a44ef009044688eb090f Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 15 Aug 2024 16:59:43 +0800 Subject: [PATCH 4/6] refactor: do some internal refactor. --- source/libs/stream/src/streamCheckpoint.c | 22 ---------------------- 1 file changed, 22 deletions(-) diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 7e4d212457..4bf74d8d4f 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -938,28 +938,6 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) { return; } - if ((pTmrInfo->launchChkptId != pActiveInfo->activeId) || (pActiveInfo->activeId == 0)) { - streamMutexUnlock(&pActiveInfo->lock); - int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); - stWarn("s-task:%s vgId:%d checkpoint-trigger retrieve by previous checkpoint procedure, checkpointId:%" PRId64 - ", quit, ref:%d", - id, vgId, pTmrInfo->launchChkptId, ref); - - streamMetaReleaseTask(pTask->pMeta, pTask); - return; - } - - // active checkpoint info is cleared for now - if ((pActiveInfo->activeId == 0) || (pActiveInfo->transId == 0) || (pTask->chkInfo.startTs == 0)) { - streamMutexUnlock(&pActiveInfo->lock); - int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); - stWarn("s-task:%s vgId:%d active checkpoint may be cleared, quit from retrieve checkpoint-trigger send tmr, ref:%d", - id, vgId, ref); - - streamMetaReleaseTask(pTask->pMeta, pTask); - return; - } - for (int32_t i = 0; i < taosArrayGetSize(pList); ++i) { SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pList, i); From 3274194d219d151fb0b75b6bc3158b9bbe267edb Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 15 Aug 2024 17:11:43 +0800 Subject: [PATCH 5/6] refactor: do some internal refactor. --- source/libs/stream/src/streamDispatch.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 7937402ccc..86970f80fa 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -762,6 +762,7 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) { code = sendDispatchMsg(pTask, pTask->msgInfo.pData); + // todo: secure the timerActive and start timer in after lock pTask->lock streamMutexLock(&pTask->lock); bool shouldStop = streamTaskShouldStop(pTask); streamMutexLock(&pTask->lock); From 44466a4bcdb3e0eb14c13c688b69c3e7b0c0e508 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 15 Aug 2024 22:28:13 +0800 Subject: [PATCH 6/6] fix(stream): fix a typo --- source/libs/stream/src/streamDispatch.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 86970f80fa..bf64af6558 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -765,7 +765,7 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) { // todo: secure the timerActive and start timer in after lock pTask->lock streamMutexLock(&pTask->lock); bool shouldStop = streamTaskShouldStop(pTask); - streamMutexLock(&pTask->lock); + streamMutexUnlock(&pTask->lock); if (shouldStop) { stDebug("s-task:%s in stop/dropping status, not start dispatch monitor tmr", id);