From 3701ded76713eec68812236b7cf5a5e66f3d716a Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 12 Aug 2024 16:56:43 +0800 Subject: [PATCH] fix(stream): remove invalid assert. --- source/dnode/vnode/src/tqCommon/tqCommon.c | 8 ++-- source/libs/stream/src/streamCheckpoint.c | 53 +++++++++++++++++----- source/libs/stream/src/streamDispatch.c | 4 +- 3 files changed, 48 insertions(+), 17 deletions(-) diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 09acfcbafc..dc58bfd8c4 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -1008,7 +1008,7 @@ int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg) tqDebug("s-task:%s re-send checkpoint-trigger to:0x%x, checkpointId:%" PRId64 ", transId:%d", pTask->id.idStr, (int32_t)pReq->downstreamTaskId, checkpointId, transId); code = streamTaskSendCheckpointTriggerMsg(pTask, pReq->downstreamTaskId, pReq->downstreamNodeId, &pMsg->info, - TSDB_CODE_SUCCESS); + TSDB_CODE_SUCCESS); } else { // not send checkpoint-trigger yet, wait int32_t recv = 0, total = 0; streamTaskGetTriggerRecvStatus(pTask, &recv, &total); @@ -1050,8 +1050,10 @@ int32_t tqStreamTaskProcessRetrieveTriggerRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) return code; } - tqDebug("s-task:%s recv re-send checkpoint-trigger msg from upstream:0x%x, checkpointId:%" PRId64 ", transId:%d", - pTask->id.idStr, pRsp->upstreamTaskId, pRsp->checkpointId, pRsp->transId); + tqDebug( + "s-task:%s recv re-send checkpoint-trigger msg from through retrieve/rsp channel, upstream:0x%x, " + "checkpointId:%" PRId64 ", transId:%d", + pTask->id.idStr, pRsp->upstreamTaskId, pRsp->checkpointId, pRsp->transId); code = streamTaskProcessCheckpointTriggerRsp(pTask, pRsp); streamMetaReleaseTask(pMeta, pTask); diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 270f678d26..4bf74d8d4f 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -128,6 +128,28 @@ int32_t streamTaskProcessCheckpointTriggerRsp(SStreamTask* pTask, SCheckpointTri return TSDB_CODE_SUCCESS; } + streamMutexLock(&pTask->lock); + SStreamTaskState status = streamTaskGetStatus(pTask); + if (status.state != TASK_STATUS__CK) { + stError("s-task:%s status:%s not in checkpoint status, discard the checkpoint-trigger msg", pTask->id.idStr, status.name); + streamMutexUnlock(&pTask->lock); + return TSDB_CODE_STREAM_TASK_IVLD_STATUS; + } + + streamMutexUnlock(&pTask->lock); + + SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo; + streamMutexLock(&pInfo->lock); + if (pInfo->activeId != pRsp->checkpointId || pInfo->transId != pRsp->transId) { + stError("s-task:%s status:%s not in checkpoint status, discard the checkpoint-trigger msg", pTask->id.idStr, status.name); + + streamMutexUnlock(&pInfo->lock); + return TSDB_CODE_STREAM_TASK_IVLD_STATUS; + } + + streamMutexUnlock(&pInfo->lock); + + // NOTE: here we do not do the duplicated checkpoint-trigger msg check, since it will be done by following functions. (void)appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER, pRsp->checkpointId, pRsp->transId, pRsp->upstreamTaskId); return TSDB_CODE_SUCCESS; @@ -136,10 +158,8 @@ int32_t streamTaskProcessCheckpointTriggerRsp(SStreamTask* pTask, SCheckpointTri int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId, int32_t downstreamNodeId, SRpcHandleInfo* pRpcInfo, int32_t code) { int32_t size = sizeof(SMsgHead) + sizeof(SCheckpointTriggerRsp); - - void* pBuf = rpcMallocCont(size); + void* pBuf = rpcMallocCont(size); if (pBuf == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; return terrno; } @@ -1016,7 +1036,6 @@ bool streamTaskAlreadySendTrigger(SStreamTask* pTask, int32_t downstreamNodeId) const char* id = pTask->id.idStr; SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo; SStreamTaskState pStatus = streamTaskGetStatus(pTask); - bool alreadySend = false; if (pStatus.state != TASK_STATUS__CK) { return false; @@ -1126,7 +1145,11 @@ int32_t streamTaskGetNumOfConfirmed(SActiveCheckpointInfo* pInfo) { void streamTaskSetTriggerDispatchConfirmed(SStreamTask* pTask, int32_t vgId) { SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo; + int64_t now = taosGetTimestampMs(); int32_t taskId = 0; + int32_t total = streamTaskGetNumOfDownstream(pTask); + bool alreadyRecv = false; + streamMutexLock(&pInfo->lock); for (int32_t i = 0; i < taosArrayGetSize(pInfo->pDispatchTriggerList); ++i) { @@ -1136,11 +1159,16 @@ void streamTaskSetTriggerDispatchConfirmed(SStreamTask* pTask, int32_t vgId) { } if (p->nodeId == vgId) { - ASSERT(p->recved == false); - - p->recved = true; - p->recvTs = taosGetTimestampMs(); - taskId = p->taskId; + if (p->recved) { + stWarn("s-task:%s already recv checkpoint-trigger msg rsp from vgId:%d down:0x%x %.2fs ago, req send:%" PRId64 + " discard", + pTask->id.idStr, vgId, p->taskId, (now - p->recvTs) / 1000.0, p->sendTs); + alreadyRecv = true; + } else { + p->recved = true; + p->recvTs = taosGetTimestampMs(); + taskId = p->taskId; + } break; } } @@ -1148,12 +1176,13 @@ void streamTaskSetTriggerDispatchConfirmed(SStreamTask* pTask, int32_t vgId) { int32_t numOfConfirmed = streamTaskGetNumOfConfirmed(pInfo); streamMutexUnlock(&pInfo->lock); - int32_t total = streamTaskGetNumOfDownstream(pTask); if (taskId == 0) { stError("s-task:%s recv invalid trigger-dispatch confirm, vgId:%d", pTask->id.idStr, vgId); } else { - stDebug("s-task:%s set downstream:0x%x(vgId:%d) checkpoint-trigger dispatch confirmed, total confirmed:%d/%d", - pTask->id.idStr, taskId, vgId, numOfConfirmed, total); + if (!alreadyRecv) { + stDebug("s-task:%s set downstream:0x%x(vgId:%d) checkpoint-trigger dispatch confirmed, total confirmed:%d/%d", + pTask->id.idStr, taskId, vgId, numOfConfirmed, total); + } } } diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 4da108507a..5a9a60db1d 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -1388,7 +1388,8 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i // we only set the dispatch msg info for current checkpoint trans streamMutexLock(&pTask->lock); triggerDispatchRsp = (streamTaskGetStatus(pTask).state == TASK_STATUS__CK) && - (pTask->chkInfo.pActiveInfo->activeId == pMsgInfo->checkpointId); + (pTask->chkInfo.pActiveInfo->activeId == pMsgInfo->checkpointId) && + (pTask->chkInfo.pActiveInfo->transId != pMsgInfo->transId); streamMutexUnlock(&pTask->lock); streamMutexLock(&pMsgInfo->lock); @@ -1449,7 +1450,6 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i if (delayDispatch) { // we only set the dispatch msg info for current checkpoint trans if (triggerDispatchRsp) { - ASSERT(pTask->chkInfo.pActiveInfo->transId == pMsgInfo->transId); stDebug("s-task:%s checkpoint-trigger msg to 0x%x rsp for checkpointId:%" PRId64 " transId:%d confirmed", pTask->id.idStr, pRsp->downstreamTaskId, pMsgInfo->checkpointId, pMsgInfo->transId);