From 8d54d45054f9e2f85e7a045e6a45a56750efc689 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 28 May 2024 17:30:14 +0800 Subject: [PATCH] fix(stream): fix error found by CI. --- source/dnode/vnode/src/tq/tq.c | 4 ---- source/dnode/vnode/src/tqCommon/tqCommon.c | 2 +- source/libs/stream/src/streamCheckpoint.c | 18 +++++++++++++----- source/libs/stream/src/streamDispatch.c | 9 +++++---- 4 files changed, 19 insertions(+), 14 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 037c2a7b7a..b54429f6b6 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1088,10 +1088,6 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) int32_t len = pMsg->contLen - sizeof(SMsgHead); int32_t code = 0; -// if (pTq->pStreamMeta->vgId == 2) { -// ASSERT(0); -// } - // disable auto rsp to mnode pRsp->info.handle = NULL; diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index a137086d21..f87556e24e 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -864,7 +864,7 @@ int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { } int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { - SRetrieveChkptTriggerReq* pReq = (SRetrieveChkptTriggerReq*) pMsg; + SRetrieveChkptTriggerReq* pReq = (SRetrieveChkptTriggerReq*) pMsg->pCont; SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->upstreamTaskId); if (pTask == NULL) { diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 43b39b8574..968493b595 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -562,11 +562,11 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) { int32_t vgId = pTask->pMeta->vgId; int64_t now = taosGetTimestampMs(); - stDebug("s-task:%s vgId:%d checkpoint-trigger monit start, ts:%" PRId64, pTask->id.idStr, vgId, now); + stDebug("s-task:%s vgId:%d checkpoint-trigger monitor start, ts:%" PRId64, pTask->id.idStr, vgId, now); taosThreadMutexLock(&pTask->lock); SStreamTaskState* pState = streamTaskGetStatus(pTask); - if (pState->state == TASK_STATUS__CK) { + if (pState->state != TASK_STATUS__CK) { stDebug("s-task:%s vgId:%d not in checkpoint status, quit from monitor checkpoint-trigger", pTask->id.idStr, vgId); taosThreadMutexUnlock(&pTask->lock); return; @@ -599,12 +599,14 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) { } // do send retrieve checkpoint trigger msg to upstream + int32_t size = taosArrayGetSize(pNotSendList); doSendRetrieveTriggerMsg(pTask, pNotSendList); taosThreadMutexUnlock(&pActiveInfo->lock); // check every 100ms - if (taosArrayGetSize(pNotSendList) > 0) { + if (size > 0) { taosTmrReset(checkpointTriggerMonitorFn, 10000, pTask, streamTimer, &pActiveInfo->pCheckTmr); + stDebug("s-task:%s start monitor trigger in 10sec", pTask->id.idStr); } taosArrayDestroy(pNotSendList); @@ -614,8 +616,11 @@ int32_t doSendRetrieveTriggerMsg(SStreamTask* pTask, SArray* pNotSendList) { int32_t code = 0; int32_t vgId = pTask->pMeta->vgId; const char* pId = pTask->id.idStr; + int32_t size = taosArrayGetSize(pNotSendList); - for (int32_t i = 0; i < taosArrayGetSize(pNotSendList); i++) { + stDebug("s-task:%s start to send trigger-retrieve msg to %d upstream(s)", pId, size); + + for (int32_t i = 0; i < size; i++) { SStreamUpstreamEpInfo* pUpstreamTask = taosArrayGet(pNotSendList, i); SRetrieveChkptTriggerReq* pReq = rpcMallocCont(sizeof(SRetrieveChkptTriggerReq)); @@ -633,10 +638,13 @@ int32_t doSendRetrieveTriggerMsg(SStreamTask* pTask, SArray* pNotSendList) { pReq->upstreamNodeId = pUpstreamTask->nodeId; pReq->checkpointId = pTask->chkInfo.pActiveInfo->activeId; + SRpcMsg rpcMsg = {0}; - initRpcMsg(&rpcMsg, TDMT_STREAM_RETRIEVE, pReq, sizeof(SRetrieveChkptTriggerReq)); + initRpcMsg(&rpcMsg, TDMT_STREAM_RETRIEVE_TRIGGER, pReq, sizeof(SRetrieveChkptTriggerReq)); code = tmsgSendReq(&pUpstreamTask->epSet, &rpcMsg); + stDebug("s-task:%s vgId:%d send retrieve msg to 0x%x checkpointId:%" PRId64, pId, vgId, pUpstreamTask->taskId, + pReq->checkpointId); } return TSDB_CODE_SUCCESS; diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index c8a626a739..821ae68497 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -595,8 +595,9 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) { return 0; } - ASSERT(pBlock->type == STREAM_INPUT__DATA_BLOCK || pBlock->type == STREAM_INPUT__CHECKPOINT_TRIGGER || - pBlock->type == STREAM_INPUT__TRANS_STATE); + int32_t type = pBlock->type; + ASSERT(type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__CHECKPOINT_TRIGGER || + type == STREAM_INPUT__TRANS_STATE); pTask->execInfo.dispatch += 1; pTask->msgInfo.startTs = taosGetTimestampMs(); @@ -607,7 +608,7 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) { } else { // todo handle build dispatch msg failed } - if (pBlock->type == STREAM_INPUT__CHECKPOINT_TRIGGER) { + if (type == STREAM_INPUT__CHECKPOINT_TRIGGER) { streamTaskInitTriggerDispatchInfo(pTask); } @@ -829,7 +830,7 @@ int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHa } } else { taosArrayPush(pActiveInfo->pReadyMsgList, &info); - stDebug("s-task:%s add checkpoint source rsp msg, total:%d", pTask->id.idStr, size); + stDebug("s-task:%s add checkpoint source rsp msg, total:%d", pTask->id.idStr, size + 1); } taosThreadMutexUnlock(&pActiveInfo->lock);