diff --git a/include/dnode/vnode/tqCommon.h b/include/dnode/vnode/tqCommon.h index 6d52b10182..0076d79312 100644 --- a/include/dnode/vnode/tqCommon.h +++ b/include/dnode/vnode/tqCommon.h @@ -37,6 +37,7 @@ int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta); int32_t tqStreamTasksGetTotalNum(SStreamMeta* pMeta); int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, SRpcMsg* pMsg); int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg); +int32_t tqStreamTaskProcessRetrieveTriggerRsp(SStreamMeta* pMeta, SRpcMsg* pMsg); int32_t tqStreamTaskProcessTaskPauseReq(SStreamMeta* pMeta, char* pMsg); int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* pMsg, bool fromVnode); int32_t tqStreamTaskProcessUpdateCheckpointReq(SStreamMeta* pMeta, char* msg, int32_t msgLen); diff --git a/include/libs/stream/streammsg.h b/include/libs/stream/streammsg.h index 87c756b10c..96701fe21d 100644 --- a/include/libs/stream/streammsg.h +++ b/include/libs/stream/streammsg.h @@ -181,6 +181,14 @@ typedef struct SRetrieveChkptTriggerReq { int64_t downstreamTaskId; } SRetrieveChkptTriggerReq; +typedef struct SCheckpointTriggerRsp { + int64_t streamId; + int64_t checkpointId; + int32_t upstreamTaskId; + int32_t taskId; + int32_t transId; +} SCheckpointTriggerRsp; + typedef struct { SMsgHead head; int64_t streamId; diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 185ab7ad51..0d01913235 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -678,8 +678,7 @@ bool streamTaskAlreadySendTrigger(SStreamTask* pTask, int32_t downstreamNodeI void streamTaskGetTriggerRecvStatus(SStreamTask* pTask, int32_t* pRecved, int32_t* pTotal); void streamTaskInitTriggerDispatchInfo(SStreamTask* pTask); void streamTaskSetTriggerDispatchConfirmed(SStreamTask* pTask, int32_t vgId); -int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t checkpointType, int32_t dstTaskId, int32_t vgId, - SEpSet* pEpset); +int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId, SRpcHandleInfo* pRpcInfo); int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue); @@ -753,6 +752,7 @@ tmr_h streamTimerGetInstance(); // checkpoint int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq); +int32_t streamTaskProcessCheckpointTriggerRsp(SStreamTask* pTask, SCheckpointTriggerRsp* pRsp); int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask); int32_t streamTaskBuildCheckpoint(SStreamTask* pTask); void streamTaskClearCheckInfo(SStreamTask* pTask, bool clearChkpReadyMsg); @@ -764,6 +764,7 @@ int32_t streamTaskBuildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRp int32_t streamBuildAndSendCheckpointUpdateMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskId* pTaskId, STaskId* pHTaskId, SCheckpointInfo* pCheckpointInfo, int8_t dropRelHTask); int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, SVUpdateCheckpointInfoReq* pReq); +SActiveCheckpointInfo* streamTaskCreateActiveChkptInfo(); // stream task state machine, and event handling SStreamTaskSM* streamCreateStateMachine(SStreamTask* pTask); diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index ac10aa83a4..89ab6d52c3 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -136,6 +136,10 @@ int32_t sndProcessStreamMsg(SSnode *pSnode, SRpcMsg *pMsg) { return tqStreamProcessReqCheckpointRsp(pSnode->pMeta, pMsg); case TDMT_STREAM_TASK_CHECKPOINT_READY_RSP: return tqStreamProcessCheckpointReadyRsp(pSnode->pMeta, pMsg); + case TDMT_STREAM_RETRIEVE_TRIGGER: + return tqStreamTaskProcessRetrieveTriggerReq(pSnode->pMeta, pMsg); + case TDMT_STREAM_RETRIEVE_TRIGGER_RSP: + return tqStreamTaskProcessRetrieveTriggerRsp(pSnode->pMeta, pMsg); default: sndError("invalid snode msg:%d", pMsg->msgType); ASSERT(0); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 59599fdae6..b369bd6039 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -254,7 +254,8 @@ int tqScanWalAsync(STQ* pTq, bool ckPause); int32_t tqStopStreamTasksAsync(STQ* pTq); int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp); int32_t tqProcessTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg); -int32_t tqProcessTaskRetrieveTriggerMsg(STQ* pTq, SRpcMsg* pMsg); +int32_t tqProcessTaskRetrieveTriggerReq(STQ* pTq, SRpcMsg* pMsg); +int32_t tqProcessTaskRetrieveTriggerRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessStreamHbRsp(STQ* pTq, SRpcMsg* pMsg); diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 6babcf3c80..7138ecbeaa 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -298,7 +298,7 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat pStreamTask->chkInfo.checkpointId = streamMetaGetLatestCheckpointId(pStreamTask->pMeta); tdRSmaTaskInit(pStreamTask->pMeta, pItem, &pStreamTask->id); pStreamTask->status.pSM = streamCreateStateMachine(pStreamTask); - + pStreamTask->chkInfo.pActiveInfo = streamTaskCreateActiveChkptInfo(); pStreamState = streamStateOpen(taskInfDir, pStreamTask, true, -1, -1); if (!pStreamState) { terrno = TSDB_CODE_RSMA_STREAM_STATE_OPEN; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index b54429f6b6..d8460d2cb5 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1240,10 +1240,14 @@ int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg) { return tqStreamTaskProcessTaskResetReq(pTq->pStreamMeta, pMsg); } -int32_t tqProcessTaskRetrieveTriggerMsg(STQ* pTq, SRpcMsg* pMsg) { +int32_t tqProcessTaskRetrieveTriggerReq(STQ* pTq, SRpcMsg* pMsg) { return tqStreamTaskProcessRetrieveTriggerReq(pTq->pStreamMeta, pMsg); } +int32_t tqProcessTaskRetrieveTriggerRsp(STQ* pTq, SRpcMsg* pMsg) { + return tqStreamTaskProcessRetrieveTriggerRsp(pTq->pStreamMeta, pMsg); +} + // this function is needed, do not try to remove it. int32_t tqProcessStreamHbRsp(STQ* pTq, SRpcMsg* pMsg) { return tqStreamProcessStreamHbRsp(pTq->pStreamMeta, pMsg); diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 5a29f67ae3..bcf17bf1e1 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -291,8 +291,7 @@ bool doPutDataIntoInputQ(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems } } else { walReaderSeekVer(pTask->exec.pWalReader, pTask->chkInfo.nextProcessVer); - tqError("s-task:%s append input queue failed, code:too many items, ver:%" PRId64, id, - pTask->chkInfo.nextProcessVer); + tqTrace("s-task:%s append input queue failed, code:too many items, ver:%" PRId64, id, pTask->chkInfo.nextProcessVer); break; } } diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index f87556e24e..ee3f1a3760 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -887,9 +887,9 @@ int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg) if (streamTaskAlreadySendTrigger(pTask, pReq->downstreamNodeId)) { // re-send the lost checkpoint-trigger msg to downstream task - SEpSet* pEpset = streamTaskGetDownstreamEpInfo(pTask, pReq->downstreamTaskId); - streamTaskSendCheckpointTriggerMsg(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER, pReq->downstreamTaskId, - pReq->downstreamNodeId, pEpset); + tqDebug("s-task:%s re-send checkpoint-trigger to:0x%x, checkpointId:%" PRId64 ", transId:%d", pTask->id.idStr, + (int32_t)pReq->downstreamTaskId, checkpointId, transId); + streamTaskSendCheckpointTriggerMsg(pTask, pReq->downstreamTaskId, &pMsg->info); } else { // not send checkpoint-trigger yet, wait int32_t recv = 0, total = 0; streamTaskGetTriggerRecvStatus(pTask, &recv, &total); @@ -914,6 +914,25 @@ int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg) return TSDB_CODE_SUCCESS; } +int32_t tqStreamTaskProcessRetrieveTriggerRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { + SCheckpointTriggerRsp* pRsp = pMsg->pCont; + + SStreamTask* pTask = streamMetaAcquireTask(pMeta, pRsp->streamId, pRsp->taskId); + if (pTask == NULL) { + tqError( + "vgId:%d process retrieve checkpoint-trigger, failed to acquire task:0x%x, it may have been dropped already", + pMeta->vgId, pRsp->taskId); + return TSDB_CODE_STREAM_TASK_NOT_EXIST; + } + + tqDebug("s-task:%s recv re-send checkpoint-trigger msg from upstream:0x%x, checkpointId:%"PRId64", transId:%d", + pTask->id.idStr, pRsp->upstreamTaskId, pRsp->checkpointId, pRsp->transId); + + streamTaskProcessCheckpointTriggerRsp(pTask, pRsp); + streamMetaReleaseTask(pMeta, pTask); + return TSDB_CODE_SUCCESS; +} + int32_t tqStreamTaskProcessTaskPauseReq(SStreamMeta* pMeta, char* pMsg) { SVPauseStreamTaskReq* pReq = (SVPauseStreamTaskReq*)pMsg; diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 490c3f08ce..426f85fa5e 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -843,7 +843,7 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) case TDMT_STREAM_TASK_CHECKPOINT_READY: return tqProcessTaskCheckpointReadyMsg(pVnode->pTq, pMsg); case TDMT_STREAM_RETRIEVE_TRIGGER: - return tqProcessTaskRetrieveTriggerMsg(pVnode->pTq, pMsg); + return tqProcessTaskRetrieveTriggerReq(pVnode->pTq, pMsg); case TDMT_MND_STREAM_HEARTBEAT_RSP: return tqProcessStreamHbRsp(pVnode->pTq, pMsg); case TDMT_MND_STREAM_REQ_CHKPT_RSP: @@ -851,7 +851,7 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) case TDMT_STREAM_TASK_CHECKPOINT_READY_RSP: return tqProcessTaskCheckpointReadyRsp(pVnode->pTq, pMsg); case TDMT_STREAM_RETRIEVE_TRIGGER_RSP: - return tqProcessTaskCheckpointReadyMsg(pVnode->pTq, pMsg); + return tqProcessTaskRetrieveTriggerRsp(pVnode->pTq, pMsg); case TDMT_VND_GET_STREAM_PROGRESS: return tqStreamProgressRetrieveReq(pVnode->pTq, pMsg); default: diff --git a/source/libs/stream/src/streamCheckStatus.c b/source/libs/stream/src/streamCheckStatus.c index f083ff8a61..d5c676433f 100644 --- a/source/libs/stream/src/streamCheckStatus.c +++ b/source/libs/stream/src/streamCheckStatus.c @@ -246,7 +246,7 @@ int32_t streamTaskProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* } int32_t streamTaskSendCheckRsp(const SStreamMeta* pMeta, int32_t vgId, SStreamTaskCheckRsp* pRsp, - SRpcHandleInfo* pRpcInfo, int32_t taskId) { + SRpcHandleInfo* pRpcInfo, int32_t taskId) { SEncoder encoder; int32_t code; int32_t len; diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 968493b595..fb3c705a4d 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -118,23 +118,40 @@ int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSo return appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER); } -int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t checkpointType, int32_t dstTaskId, int32_t vgId, - SEpSet* pEpset) { - SStreamDataBlock* pChkpoint = createChkptTriggerBlock(pTask, checkpointType); +int32_t streamTaskProcessCheckpointTriggerRsp(SStreamTask* pTask, SCheckpointTriggerRsp* pRsp) { + ASSERT(pTask->info.taskLevel != TASK_LEVEL__SOURCE); - pChkpoint->srcTaskId = pTask->id.taskId; - pChkpoint->srcVgId = pTask->pMeta->vgId; - - int32_t code = streamTaskBuildAndSendTriggerMsg(pTask, pChkpoint, dstTaskId, vgId, pEpset); - if (code == TSDB_CODE_SUCCESS) { - stDebug("s-task:%s build and send checkpoint-trigger dispatch msg succ, stage:%" PRId64, pTask->id.idStr, - pTask->pMeta->stage); - } else { - // todo handle send data failure - stError("s-task:%s failed to build and send trigger msg", pTask->id.idStr); + SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo; + if (pInfo->transId != pRsp->transId || pInfo->activeId != pRsp->checkpointId) { + // todo handle error + return -1; } - return code; + taosThreadMutexLock(&pTask->lock); + SStreamTaskState* pState = streamTaskGetStatus(pTask); + if (pState->state != TASK_STATUS__CK) { + // todo handle error + taosThreadMutexUnlock(&pTask->lock); + return -1; + } + + taosThreadMutexUnlock(&pTask->lock); + + appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER); + return TSDB_CODE_SUCCESS; +} + +int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId, SRpcHandleInfo* pRpcInfo) { + SCheckpointTriggerRsp* pRsp = rpcMallocCont(sizeof(SCheckpointTriggerRsp)); + pRsp->streamId = pTask->id.streamId; + pRsp->upstreamTaskId = pTask->id.taskId; + pRsp->taskId = dstTaskId; + pRsp->checkpointId = pTask->chkInfo.pActiveInfo->activeId; + pRsp->transId = pTask->chkInfo.pActiveInfo->transId; + + SRpcMsg rspMsg = {.code = 0, .pCont = pRsp, .contLen = sizeof(SCheckpointTriggerRsp), .info = *pRpcInfo}; + tmsgSendRsp(&rspMsg); + return 0; } int32_t continueDispatchCheckpointTriggerBlock(SStreamDataBlock* pBlock, SStreamTask* pTask) { @@ -617,8 +634,15 @@ int32_t doSendRetrieveTriggerMsg(SStreamTask* pTask, SArray* pNotSendList) { int32_t vgId = pTask->pMeta->vgId; const char* pId = pTask->id.idStr; int32_t size = taosArrayGetSize(pNotSendList); + int32_t numOfUpstream = streamTaskGetNumOfUpstream(pTask); - stDebug("s-task:%s start to send trigger-retrieve msg to %d upstream(s)", pId, size); + if (size <= 0) { + stDebug("s-task:%s all upstream checkpoint trigger recved, no need to send retrieve", pId); + return code; + } + + stDebug("s-task:%s %d/%d not recv checkpoint-trigger from upstream(s), start to send trigger-retrieve", pId, size, + numOfUpstream); for (int32_t i = 0; i < size; i++) { SStreamUpstreamEpInfo* pUpstreamTask = taosArrayGet(pNotSendList, i); @@ -643,21 +667,23 @@ int32_t doSendRetrieveTriggerMsg(SStreamTask* pTask, SArray* pNotSendList) { initRpcMsg(&rpcMsg, TDMT_STREAM_RETRIEVE_TRIGGER, pReq, sizeof(SRetrieveChkptTriggerReq)); code = tmsgSendReq(&pUpstreamTask->epSet, &rpcMsg); - stDebug("s-task:%s vgId:%d send retrieve msg to 0x%x checkpointId:%" PRId64, pId, vgId, pUpstreamTask->taskId, - pReq->checkpointId); + stDebug("s-task:%s vgId:%d send checkpoint-trigger retrieve msg to 0x%x(vgId:%d) checkpointId:%" PRId64, pId, vgId, + pUpstreamTask->taskId, pUpstreamTask->nodeId, pReq->checkpointId); } return TSDB_CODE_SUCCESS; } bool streamTaskAlreadySendTrigger(SStreamTask* pTask, int32_t downstreamNodeId) { - SStreamTaskState* pStatus = streamTaskGetStatus(pTask); + int64_t now = taosGetTimestampMs(); + const char* id = pTask->id.idStr; + SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo; + SStreamTaskState* pStatus = streamTaskGetStatus(pTask); + if (pStatus->state != TASK_STATUS__CK) { return false; } - SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo; - taosThreadMutexLock(&pInfo->lock); if (!pInfo->dispatchTrigger) { taosThreadMutexUnlock(&pInfo->lock); @@ -671,12 +697,15 @@ bool streamTaskAlreadySendTrigger(SStreamTask* pTask, int32_t downstreamNodeId) } // has send trigger msg to downstream node, + double before = (now - pSendInfo->sendTs) / 1000.0; if (pSendInfo->recved) { - stWarn("s-task:%s checkpoint-trigger msg send at:%"PRId64" and recv confirmed, checkpointId:%"PRId64 ", transId:%d", - pTask->id.idStr, pSendInfo->sendTs, pInfo->activeId, pInfo->transId); + stWarn("s-task:%s checkpoint-trigger msg already send at:%" PRId64 + "(%.2fs before) and recv confirmed by downstream:0x%x, checkpointId:%" PRId64 ", transId:%d", + id, pSendInfo->sendTs, before, pSendInfo->taskId, pInfo->activeId, pInfo->transId); } else { - stWarn("s-task:%s checkpoint-trigger send at:%"PRId64", checkpointId:%"PRId64", transId:%d", pTask->id.idStr, - pSendInfo->sendTs, pInfo->activeId, pInfo->transId); + stWarn("s-task:%s checkpoint-trigger already send at:%" PRId64 "(%.2fs before), checkpointId:%" PRId64 + ", transId:%d", + id, pSendInfo->sendTs, before, pInfo->activeId, pInfo->transId); } taosThreadMutexUnlock(&pInfo->lock); diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 821ae68497..ecea494be2 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -480,7 +480,7 @@ static void doRetryDispatchData(void* param, void* tmrId) { void streamRetryDispatchData(SStreamTask* pTask, int64_t waitDuration) { pTask->msgInfo.retryCount++; - stWarn("s-task:%s retry send dispatch data in %" PRId64 "ms, in timer msgId:%d, retryTimes:%d", pTask->id.idStr, + stTrace("s-task:%s retry send dispatch data in %" PRId64 "ms, in timer msgId:%d, retryTimes:%d", pTask->id.idStr, waitDuration, pTask->execInfo.dispatch, pTask->msgInfo.retryCount); if (pTask->msgInfo.pTimer != NULL) { diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index e0415c8467..6d30cc6759 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -25,7 +25,6 @@ static void streamTaskDestroyUpstreamInfo(SUpstreamInfo* pUpstreamInfo); static void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdated); static void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdate); static void streamTaskDestroyActiveChkptInfo(SActiveCheckpointInfo* pInfo); -static SActiveCheckpointInfo* streamTaskCreateActiveChkptInfo(); static int32_t addToTaskset(SArray* pArray, SStreamTask* pTask) { int32_t childId = taosArrayGetSize(pArray);