From 77961ea79102b7b318dc29038a559ea8392f69e6 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 26 Apr 2024 09:45:42 +0800 Subject: [PATCH] fix(stream): handle disorder node Update trans Id. --- source/common/src/tglobal.c | 2 +- source/dnode/vnode/src/tqCommon/tqCommon.c | 21 +++++++++++++++------ source/libs/stream/src/streamStart.c | 1 - 3 files changed, 16 insertions(+), 8 deletions(-) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index ba96dc0adf..d34e23c0ba 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -271,7 +271,7 @@ int32_t tsTtlBatchDropNum = 10000; // number of tables dropped per batch int32_t tsTransPullupInterval = 2; int32_t tsCompactPullupInterval = 10; int32_t tsMqRebalanceInterval = 2; -int32_t tsStreamCheckpointInterval = 60; +int32_t tsStreamCheckpointInterval = 300; float tsSinkDataRate = 2.0; int32_t tsStreamNodeCheckInterval = 16; int32_t tsTtlUnit = 86400; diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index ee4f5366d6..04c0c0d204 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -195,13 +195,22 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM const char* idstr = pTask->id.idStr; if (pMeta->updateInfo.transId != req.transId) { - ASSERT(req.transId > pMeta->updateInfo.transId); - tqInfo("s-task:%s vgId:%d receive new trans to update nodeEp msg from mnode, transId:%d, prev transId:%d", idstr, - vgId, req.transId, pMeta->updateInfo.transId); + if (req.transId < pMeta->updateInfo.transId) { + tqError("s-task:%s vgId:%d disorder update nodeEp msg recv, discarded, newest transId:%d, recv:%d", idstr, vgId, + pMeta->updateInfo.transId, req.transId); + rsp.code = TSDB_CODE_SUCCESS; + streamMetaWUnLock(pMeta); - // info needs to be kept till the new trans to update the nodeEp arrived. - taosHashClear(pMeta->updateInfo.pTasks); - pMeta->updateInfo.transId = req.transId; + taosArrayDestroy(req.pNodeList); + return rsp.code; + } else { + tqInfo("s-task:%s vgId:%d receive new trans to update nodeEp msg from mnode, transId:%d, prev transId:%d", idstr, + vgId, req.transId, pMeta->updateInfo.transId); + + // info needs to be kept till the new trans to update the nodeEp arrived. + taosHashClear(pMeta->updateInfo.pTasks); + pMeta->updateInfo.transId = req.transId; + } } else { tqDebug("s-task:%s vgId:%d recv trans to update nodeEp from mnode, transId:%d", idstr, vgId, req.transId); } diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index 1f6c5add42..7b8e6e2129 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -432,7 +432,6 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs STaskId* pId = &pTask->hTaskInfo.id; streamMetaAddTaskLaunchResult(pTask->pMeta, pId->streamId, pId->taskId, startTs, now, false); } - } else { // TASK_DOWNSTREAM_NOT_READY, let's retry in 100ms ASSERT(left > 0); stDebug("s-task:%s (vgId:%d) recv check rsp from task:0x%x (vgId:%d) status:%d, total:%d not ready:%d", id,