diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 99ca2104ff..26c6631ee4 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -271,7 +271,6 @@ typedef struct SCheckpointInfo { int64_t checkpointTime; // latest checkpoint time int64_t processedVer; int64_t nextProcessVer; // current offset in WAL, not serialize it - int32_t numOfNotReady; SActiveCheckpointInfo* pActiveInfo; int64_t msgVer; } SCheckpointInfo; @@ -753,7 +752,8 @@ tmr_h streamTimerGetInstance(); // checkpoint int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq); int32_t streamTaskProcessCheckpointTriggerRsp(SStreamTask* pTask, SCheckpointTriggerRsp* pRsp); -int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask, int32_t downstreamNodeId, int32_t downstreamTaskId); +int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask, int64_t checkpointId, int32_t downstreamNodeId, int32_t downstreamTaskId); +int32_t streamTaskProcessCheckpointReadyRsp(SStreamTask* pTask, int32_t upstreamTaskId, int64_t checkpointId); int32_t streamTaskBuildCheckpoint(SStreamTask* pTask); void streamTaskClearCheckInfo(SStreamTask* pTask, bool clearChkpReadyMsg); int32_t streamAlignTransferState(SStreamTask* pTask); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 1076f1f2c6..712cfbaa55 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1229,6 +1229,15 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) // downstream task has complete the stream task checkpoint procedure, let's start the handle the rsp by execute task int32_t tqProcessTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg) { + int32_t vgId = TD_VID(pTq->pVnode); + + SRetrieveChkptTriggerReq* pReq = (SRetrieveChkptTriggerReq*) pMsg->pCont; + if (!vnodeIsRoleLeader(pTq->pVnode)) { + tqError("vgId:%d not leader, ignore the retrieve checkpoint-trigger msg from 0x%x", vgId, + (int32_t)pReq->downstreamTaskId); + return TSDB_CODE_STREAM_NOT_LEADER; + } + return tqStreamTaskProcessCheckpointReadyMsg(pTq->pStreamMeta, pMsg); } diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index b779cbe932..c55745e5c5 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -19,6 +19,13 @@ typedef struct SMStreamCheckpointReadyRspMsg { SMsgHead head; + int64_t streamId; + int32_t upstreamTaskId; + int32_t upstreamNodeId; + int32_t downstreamTaskId; + int32_t downstreamNodeId; + int64_t checkpointId; + int32_t transId; } SMStreamCheckpointReadyRspMsg; static int32_t doProcessDummyRspMsg(SStreamMeta* pMeta, SRpcMsg* pMsg); @@ -486,21 +493,27 @@ int32_t tqStreamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg) SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.upstreamTaskId); if (pTask == NULL) { tqError("vgId:%d failed to find s-task:0x%x, it may have been destroyed already", vgId, req.downstreamTaskId); - return code; + return TSDB_CODE_STREAM_TASK_NOT_EXIST; } - tqDebug("vgId:%d s-task:%s received the checkpoint ready msg from task:0x%x (vgId:%d), handle it", vgId, + tqDebug("vgId:%d s-task:%s received the checkpoint-ready msg from task:0x%x (vgId:%d), handle it", vgId, pTask->id.idStr, req.downstreamTaskId, req.downstreamNodeId); - streamProcessCheckpointReadyMsg(pTask, req.downstreamTaskId, req.downstreamNodeId); + streamProcessCheckpointReadyMsg(pTask, req.checkpointId, req.downstreamTaskId, req.downstreamNodeId); streamMetaReleaseTask(pMeta, pTask); { // send checkpoint ready rsp - SRpcMsg rsp = {.code = 0, .info = pMsg->info, .contLen = sizeof(SMStreamCheckpointReadyRspMsg)}; - rsp.pCont = rpcMallocCont(rsp.contLen); - SMsgHead* pHead = rsp.pCont; - pHead->vgId = htonl(req.downstreamNodeId); + SMStreamCheckpointReadyRspMsg* pReadyRsp = rpcMallocCont(sizeof(SMStreamCheckpointReadyRspMsg)); + pReadyRsp->upstreamTaskId = req.upstreamTaskId; + pReadyRsp->upstreamNodeId = req.upstreamNodeId; + pReadyRsp->downstreamTaskId = req.downstreamTaskId; + pReadyRsp->downstreamNodeId = req.downstreamNodeId; + pReadyRsp->checkpointId = req.checkpointId; + pReadyRsp->streamId = req.streamId; + pReadyRsp->head.vgId = htonl(req.downstreamNodeId); + + SRpcMsg rsp = {.code = 0, .info = pMsg->info, .pCont = pReadyRsp, .contLen = sizeof(SMStreamCheckpointReadyRspMsg)}; tmsgSendRsp(&rsp); pMsg->info.handle = NULL; // disable auto rsp @@ -1066,5 +1079,16 @@ int32_t tqStreamProcessStreamHbRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { return d int32_t tqStreamProcessReqCheckpointRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { return doProcessDummyRspMsg(pMeta, pMsg); } int32_t tqStreamProcessCheckpointReadyRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { - return doProcessDummyRspMsg(pMeta, pMsg); + SMStreamCheckpointReadyRspMsg* pRsp = pMsg->pCont; + + SStreamTask* pTask = streamMetaAcquireTask(pMeta, pRsp->streamId, pRsp->downstreamTaskId); + if (pTask == NULL) { + tqError("vgId:%d failed to acquire task:0x%x when handling checkpoint-ready msg, it may have been dropped", + pRsp->downstreamNodeId, pRsp->downstreamTaskId); + return TSDB_CODE_STREAM_TASK_NOT_EXIST; + } + + streamTaskProcessCheckpointReadyRsp(pTask, pRsp->upstreamTaskId, pRsp->checkpointId); + streamMetaReleaseTask(pMeta, pTask); + return TSDB_CODE_SUCCESS; } \ No newline at end of file diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index c4c3298ea7..154f623b9d 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -64,7 +64,9 @@ struct SActiveCheckpointInfo { int8_t allUpstreamTriggerRecv; SArray* pCheckpointReadyRecvList; // SArray int32_t checkCounter; - tmr_h pCheckTmr; + tmr_h pChkptTriggerTmr; + int32_t sendReadyCheckCounter; + tmr_h pSendReadyMsgTmr; }; typedef struct { @@ -99,12 +101,13 @@ struct STokenBucket { typedef struct { int32_t upstreamTaskId; SEpSet upstreamNodeEpset; - int32_t nodeId; + int32_t upstreamNodeId; int32_t transId; - SRpcMsg msg; + int32_t childId; + SRpcMsg msg; // for mnode checkpoint-source rsp int64_t checkpointId; int64_t recvTs; - int32_t sendToUpstream; + int32_t sendCompleted; } STaskCheckpointReadyInfo; typedef struct { @@ -213,8 +216,10 @@ int32_t streamTaskDownloadCheckpointData(const char* id, char* path); int32_t streamTaskOnNormalTaskReady(SStreamTask* pTask); int32_t streamTaskOnScanHistoryTaskReady(SStreamTask* pTask); -int32_t initCheckpointReadyInfo(STaskCheckpointReadyInfo* pReadyInfo, SStreamTask* pTask, int32_t upstreamNodeId, - int32_t upstreamTaskId, int32_t childId, SEpSet* pEpset, int64_t checkpointId); +int32_t initCheckpointReadyInfo(STaskCheckpointReadyInfo* pReadyInfo, int32_t upstreamNodeId, int32_t upstreamTaskId, + int32_t childId, SEpSet* pEpset, int64_t checkpointId); +int32_t initCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamNodeId, int32_t upstreamTaskId, int32_t childId, + int64_t checkpointId, SRpcMsg* pMsg); typedef int32_t (*__stream_async_exec_fn_t)(void* param); diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index f2868fea96..b0c4884c73 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -111,7 +111,6 @@ int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSo pTask->chkInfo.pActiveInfo->transId = pReq->transId; pTask->chkInfo.pActiveInfo->activeId = pReq->checkpointId; - pTask->chkInfo.numOfNotReady = streamTaskGetNumOfDownstream(pTask); pTask->chkInfo.startTs = taosGetTimestampMs(); pTask->execInfo.checkpoint += 1; @@ -192,12 +191,11 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock if (pTask->chkInfo.checkpointId == checkpointId) { { // send checkpoint-ready msg to upstream + SRpcMsg msg ={0}; + SStreamUpstreamEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, pBlock->srcTaskId); - - STaskCheckpointReadyInfo info = {0}; - initCheckpointReadyInfo(&info, pTask, pInfo->nodeId, pInfo->taskId, pInfo->childId, &pInfo->epSet, checkpointId); - - tmsgSendReq(&info.upstreamNodeEpset, &info.msg); + initCheckpointReadyMsg(pTask, pInfo->nodeId, pBlock->srcTaskId, pInfo->childId, checkpointId, &msg); + tmsgSendReq(&pInfo->epSet, &msg); } stWarn( @@ -235,7 +233,7 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock ASSERT(p->checkpointId == checkpointId); stWarn("s-task:%s repeatly recv checkpoint-source msg from task:0x%x vgId:%d, checkpointId:%" PRId64 ", prev recvTs:%" PRId64 " discard", - pTask->id.idStr, p->upstreamTaskId, p->nodeId, p->checkpointId, p->recvTs); + pTask->id.idStr, p->upstreamTaskId, p->upstreamNodeId, p->checkpointId, p->recvTs); taosThreadMutexUnlock(&pTask->lock); return TSDB_CODE_SUCCESS; @@ -267,10 +265,10 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock stDebug("s-task:%s start checkpoint-trigger monitor in 10s, ref:%d ", pTask->id.idStr, ref); streamMetaAcquireOneTask(pTask); - if (pActiveInfo->pCheckTmr == NULL) { - pActiveInfo->pCheckTmr = taosTmrStart(checkpointTriggerMonitorFn, 100, pTask, streamTimer); + if (pActiveInfo->pChkptTriggerTmr == NULL) { + pActiveInfo->pChkptTriggerTmr = taosTmrStart(checkpointTriggerMonitorFn, 100, pTask, streamTimer); } else { - taosTmrReset(checkpointTriggerMonitorFn, 100, pTask, streamTimer, &pActiveInfo->pCheckTmr); + taosTmrReset(checkpointTriggerMonitorFn, 100, pTask, streamTimer, &pActiveInfo->pChkptTriggerTmr); } } @@ -282,7 +280,7 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock stDebug("s-task:%s set childIdx:%d, and add checkpoint-trigger block into outputQ", id, pTask->info.selfChildId); continueDispatchCheckpointTriggerBlock(pBlock, pTask); } else { // only one task exists, no need to dispatch downstream info - streamProcessCheckpointReadyMsg(pTask, 0, 0); + appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT, pActiveInfo->activeId, pActiveInfo->transId); streamFreeQitem((SStreamQueueItem*)pBlock); } } else if (taskLevel == TASK_LEVEL__SINK || taskLevel == TASK_LEVEL__AGG) { @@ -308,11 +306,6 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock streamTaskBuildCheckpoint(pTask); } else { // source & agg tasks need to forward the checkpoint msg downwards stDebug("s-task:%s process checkpoint-trigger block, all %d upstreams sent, forwards to downstream", id, num); - - // set the needed checked downstream tasks, only when all downstream tasks do checkpoint complete, this task - // can start local checkpoint procedure - pTask->chkInfo.numOfNotReady = streamTaskGetNumOfDownstream(pTask); - // Put the checkpoint block into inputQ, to make sure all blocks with less version have been handled by this task // already. And then, dispatch check point msg to all downstream tasks code = continueDispatchCheckpointTriggerBlock(pBlock, pTask); @@ -326,19 +319,31 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock * All down stream tasks have successfully completed the check point task. * Current stream task is allowed to start to do checkpoint things in ASYNC model. */ -int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask, int32_t downstreamNodeId, int32_t downstreamTaskId) { +int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask, int64_t checkpointId, int32_t downstreamNodeId, + int32_t downstreamTaskId) { ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE || pTask->info.taskLevel == TASK_LEVEL__AGG); SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo; const char* id = pTask->id.idStr; bool received = false; int32_t total = streamTaskGetNumOfDownstream(pTask); + ASSERT(total > 0); - // only one task in this stream - if (total == 0 && pTask->info.taskLevel == TASK_LEVEL__SOURCE) { - appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT, pInfo->activeId, pInfo->transId); - taosThreadMutexUnlock(&pInfo->lock); - return 0; + // 1. not in checkpoint status now + SStreamTaskState* pStat = streamTaskGetStatus(pTask); + if (pStat->state != TASK_STATUS__CK) { + return TSDB_CODE_STREAM_TASK_IVLD_STATUS; + } + + // 2. expired checkpoint-ready msg + if (pTask->chkInfo.checkpointId > checkpointId) { + // discard it directly + return -1; + } + + // invalid checkpoint-ready msg + if (pInfo->activeId != checkpointId) { + return -1; } taosThreadMutexLock(&pInfo->lock); @@ -354,7 +359,7 @@ int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask, int32_t downstreamNo } if (received) { - stDebug("s-task:%s already recv checkpoint-ready msg from downstream:0x%x, %d/%d downstream not ready", id, + stDebug("s-task:%s already recv checkpoint-ready msg from downstream:0x%x, ignore. %d/%d downstream not ready", id, downstreamTaskId, (int32_t)(total - taosArrayGetSize(pInfo->pCheckpointReadyRecvList)), total); } else { STaskDownstreamReadyInfo info = {.recvTs = taosGetTimestampMs(), @@ -377,9 +382,38 @@ int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask, int32_t downstreamNo return 0; } +int32_t streamTaskProcessCheckpointReadyRsp(SStreamTask* pTask, int32_t upstreamTaskId, int64_t checkpointId) { + SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo; + int64_t now = taosGetTimestampMs(); + int32_t numOfConfirmed = 0; + + taosThreadMutexLock(&pInfo->lock); + for(int32_t i = 0; i < taosArrayGetSize(pInfo->pReadyMsgList); ++i) { + STaskCheckpointReadyInfo* pReadyInfo = taosArrayGet(pInfo->pReadyMsgList, i); + if (pReadyInfo->upstreamTaskId == upstreamTaskId && pReadyInfo->checkpointId == checkpointId) { + pReadyInfo->sendCompleted = 1; + stDebug("s-task:%s send checkpoint-ready msg to upstream:0x%x confirmed, checkpointId:%" PRId64 " ts:%" PRId64, + pTask->id.idStr, upstreamTaskId, checkpointId, now); + break; + } + } + + for(int32_t i = 0; i < taosArrayGetSize(pInfo->pReadyMsgList); ++i) { + STaskCheckpointReadyInfo* pReadyInfo = taosArrayGet(pInfo->pReadyMsgList, i); + if (pReadyInfo->sendCompleted == 1) { + numOfConfirmed += 1; + } + } + + stDebug("s-task:%s send checkpoint-ready msg to %d upstream confirmed, checkpointId:%" PRId64, pTask->id.idStr, + numOfConfirmed, checkpointId); + + taosThreadMutexUnlock(&pInfo->lock); + return TSDB_CODE_SUCCESS; +} + void streamTaskClearCheckInfo(SStreamTask* pTask, bool clearChkpReadyMsg) { pTask->chkInfo.startTs = 0; // clear the recorded start time - pTask->chkInfo.numOfNotReady = 0; streamTaskClearActiveInfo(pTask->chkInfo.pActiveInfo); streamTaskOpenAllUpstreamInput(pTask); // open inputQ for all upstream tasks @@ -703,7 +737,7 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) { } if (++pActiveInfo->checkCounter < 100) { - taosTmrReset(checkpointTriggerMonitorFn, 100, pTask, streamTimer, &pActiveInfo->pCheckTmr); + taosTmrReset(checkpointTriggerMonitorFn, 100, pTask, streamTimer, &pActiveInfo->pChkptTriggerTmr); return; } @@ -736,7 +770,7 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) { bool recved = false; for(int32_t j = 0; j < taosArrayGetSize(pActiveInfo->pReadyMsgList); ++j) { STaskCheckpointReadyInfo* pReady = taosArrayGet(pActiveInfo->pReadyMsgList, j); - if (pInfo->nodeId == pReady->nodeId) { + if (pInfo->nodeId == pReady->upstreamNodeId) { recved = true; break; } @@ -756,7 +790,7 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) { // check every 100ms if (size > 0) { stDebug("s-task:%s start to monitor checkpoint-trigger in 10s", id); - taosTmrReset(checkpointTriggerMonitorFn, 100, pTask, streamTimer, &pActiveInfo->pCheckTmr); + taosTmrReset(checkpointTriggerMonitorFn, 100, pTask, streamTimer, &pActiveInfo->pChkptTriggerTmr); } else { int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); stDebug("s-task:%s all checkpoint-trigger recved, quit from monitor checkpoint-trigger tmr, ref:%d", id, ref); diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index aa0d7c3120..42a4e4e8fb 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -623,28 +623,163 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) { return TSDB_CODE_SUCCESS; } -// this function is usually invoked by sink/agg task -int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask) { - SArray* pList = pTask->chkInfo.pActiveInfo->pReadyMsgList; +int32_t initCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamNodeId, int32_t upstreamTaskId, int32_t childId, + int64_t checkpointId, SRpcMsg* pMsg) { + int32_t code = 0; + int32_t tlen = 0; + void* buf = NULL; - taosThreadMutexLock(&pTask->chkInfo.pActiveInfo->lock); + SStreamCheckpointReadyMsg req = {0}; + req.downstreamNodeId = pTask->pMeta->vgId; + req.downstreamTaskId = pTask->id.taskId; + req.streamId = pTask->id.streamId; + req.checkpointId = checkpointId; + req.childId = childId; + req.upstreamNodeId = upstreamNodeId; + req.upstreamTaskId = upstreamTaskId; + + tEncodeSize(tEncodeStreamCheckpointReadyMsg, &req, tlen, code); + if (code < 0) { + return -1; + } + + buf = rpcMallocCont(sizeof(SMsgHead) + tlen); + if (buf == NULL) { + return -1; + } + + ((SMsgHead*)buf)->vgId = htonl(req.upstreamNodeId); + void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); + + SEncoder encoder; + tEncoderInit(&encoder, abuf, tlen); + if ((code = tEncodeStreamCheckpointReadyMsg(&encoder, &req)) < 0) { + rpcFreeCont(buf); + return code; + } + tEncoderClear(&encoder); + + initRpcMsg(pMsg, TDMT_STREAM_TASK_CHECKPOINT_READY, buf, tlen + sizeof(SMsgHead)); + return TSDB_CODE_SUCCESS; +} + +static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) { + SStreamTask* pTask = param; + int32_t vgId = pTask->pMeta->vgId; + const char* id = pTask->id.idStr; + + // check the status every 100ms + if (streamTaskShouldStop(pTask)) { + int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + stDebug("s-task:%s vgId:%d quit from monitor checkpoint-trigger, ref:%d", id, vgId, ref); + streamMetaReleaseTask(pTask->pMeta, pTask); + return; + } + + SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo; + if (++pActiveInfo->sendReadyCheckCounter < 100) { + taosTmrReset(checkpointReadyMsgSendMonitorFn, 100, pTask, streamTimer, &pActiveInfo->pSendReadyMsgTmr); + return; + } + + pActiveInfo->sendReadyCheckCounter = 0; + stDebug("s-task:%s in sending checkpoint-ready msg monitor timer", id); + + taosThreadMutexLock(&pActiveInfo->lock); + + SArray* pList = pActiveInfo->pReadyMsgList; + SArray* pNotRspList = taosArrayInit(4, sizeof(int32_t)); int32_t num = taosArrayGetSize(pList); ASSERT(taosArrayGetSize(pTask->upstreamInfo.pList) == num); for (int32_t i = 0; i < num; ++i) { STaskCheckpointReadyInfo* pInfo = taosArrayGet(pList, i); - tmsgSendReq(&pInfo->upstreamNodeEpset, &pInfo->msg); + if (pInfo->sendCompleted == 1) { + continue; + } - stDebug("s-task:%s level:%d checkpoint ready msg sent to upstream:0x%x", pTask->id.idStr, pTask->info.taskLevel, + taosArrayPush(pNotRspList, &pInfo->upstreamTaskId); + stDebug("s-task:%s vgId:%d level:%d checkpoint-ready rsp from upstream:0x%x not confirmed yet", id, vgId, + pTask->info.taskLevel, pInfo->upstreamTaskId); + } + + int32_t checkpointId = pActiveInfo->activeId; + + int32_t notRsp = taosArrayGetSize(pNotRspList); + if (notRsp > 0) { // send checkpoint-ready msg again + for (int32_t i = 0; i < taosArrayGetSize(pNotRspList); ++i) { + int32_t taskId = *(int32_t*)taosArrayGet(pNotRspList, i); + + for (int32_t j = 0; j < num; ++j) { + STaskCheckpointReadyInfo* pReadyInfo = taosArrayGet(pList, j); + if (taskId == pReadyInfo->upstreamTaskId) { // send msg again + + SRpcMsg msg = {0}; + initCheckpointReadyMsg(pTask, pReadyInfo->upstreamNodeId, pReadyInfo->upstreamTaskId, pReadyInfo->childId, + checkpointId, &msg); + tmsgSendReq(&pReadyInfo->upstreamNodeEpset, &msg); + stDebug("s-task:%s level:%d checkpoint-ready msg sent to upstream:0x%x again", id, pTask->info.taskLevel, + pReadyInfo->upstreamTaskId); + } + } + } + + taosTmrReset(checkpointReadyMsgSendMonitorFn, 100, pTask, streamTimer, &pActiveInfo->pSendReadyMsgTmr); + taosThreadMutexUnlock(&pActiveInfo->lock); + } else { + int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + stDebug( + "s-task:%s vgId:%d recv of checkpoint-ready msg confirmed by all upstream task(s), quit from timer and clear " + "checkpoint-ready msg, ref:%d", + id, vgId, ref); + + streamClearChkptReadyMsg(pTask); + taosThreadMutexUnlock(&pActiveInfo->lock); + streamMetaReleaseTask(pTask->pMeta, pTask); + } + + taosArrayDestroy(pNotRspList); +} + +// this function is usually invoked by sink/agg task +int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask) { + SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo; + + const char* id = pTask->id.idStr; + SArray* pList = pActiveInfo->pReadyMsgList; + + taosThreadMutexLock(&pActiveInfo->lock); + + int32_t num = taosArrayGetSize(pList); + ASSERT(taosArrayGetSize(pTask->upstreamInfo.pList) == num); + + for (int32_t i = 0; i < num; ++i) { + STaskCheckpointReadyInfo* pInfo = taosArrayGet(pList, i); + + SRpcMsg msg = {0}; + initCheckpointReadyMsg(pTask, pInfo->upstreamNodeId, pInfo->upstreamTaskId, pInfo->childId, pInfo->checkpointId, &msg); + tmsgSendReq(&pInfo->upstreamNodeEpset, &msg); + + stDebug("s-task:%s level:%d checkpoint-ready msg sent to upstream:0x%x", id, pTask->info.taskLevel, pInfo->upstreamTaskId); } - taosArrayClear(pList); + taosThreadMutexUnlock(&pActiveInfo->lock); + stDebug("s-task:%s level:%d checkpoint-ready msg sent to all %d upstreams", id, pTask->info.taskLevel, num); - taosThreadMutexUnlock(&pTask->chkInfo.pActiveInfo->lock); - stDebug("s-task:%s level:%d checkpoint ready msg sent to all %d upstreams", pTask->id.idStr, pTask->info.taskLevel, - num); + // start to check if checkpoint ready msg has successfully received by upstream tasks. + pActiveInfo->pSendReadyMsgTmr = NULL; + + int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); + stDebug("s-task:%s start checkpoint-trigger monitor in 10s, ref:%d ", pTask->id.idStr, ref); + streamMetaAcquireOneTask(pTask); + + if (pActiveInfo->pSendReadyMsgTmr == NULL) { + pActiveInfo->pSendReadyMsgTmr = taosTmrStart(checkpointReadyMsgSendMonitorFn, 100, pTask, streamTimer); + } else { + taosTmrReset(checkpointReadyMsgSendMonitorFn, 100, pTask, streamTimer, &pActiveInfo->pSendReadyMsgTmr); + } return TSDB_CODE_SUCCESS; } @@ -816,51 +951,17 @@ int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHa return TSDB_CODE_SUCCESS; } -int32_t initCheckpointReadyInfo(STaskCheckpointReadyInfo* pReadyInfo, SStreamTask* pTask, int32_t upstreamNodeId, - int32_t upstreamTaskId, int32_t childId, SEpSet* pEpset, int64_t checkpointId) { - int32_t code = 0; - int32_t tlen = 0; - void* buf = NULL; - - SStreamCheckpointReadyMsg req = {0}; - req.downstreamNodeId = pTask->pMeta->vgId; - req.downstreamTaskId = pTask->id.taskId; - req.streamId = pTask->id.streamId; - req.checkpointId = checkpointId; - req.childId = childId; - req.upstreamNodeId = upstreamNodeId; - req.upstreamTaskId = upstreamTaskId; - - tEncodeSize(tEncodeStreamCheckpointReadyMsg, &req, tlen, code); - if (code < 0) { - return -1; - } - - buf = rpcMallocCont(sizeof(SMsgHead) + tlen); - if (buf == NULL) { - return -1; - } - - ((SMsgHead*)buf)->vgId = htonl(req.upstreamNodeId); - void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); - - SEncoder encoder; - tEncoderInit(&encoder, abuf, tlen); - if ((code = tEncodeStreamCheckpointReadyMsg(&encoder, &req)) < 0) { - rpcFreeCont(buf); - return code; - } - tEncoderClear(&encoder); - - ASSERT(req.upstreamTaskId != 0); +int32_t initCheckpointReadyInfo(STaskCheckpointReadyInfo* pReadyInfo, int32_t upstreamNodeId, int32_t upstreamTaskId, + int32_t childId, SEpSet* pEpset, int64_t checkpointId) { + ASSERT(upstreamTaskId != 0); pReadyInfo->upstreamTaskId = upstreamTaskId; pReadyInfo->upstreamNodeEpset = *pEpset; - pReadyInfo->nodeId = req.upstreamNodeId; + pReadyInfo->upstreamNodeId = upstreamNodeId; pReadyInfo->recvTs = taosGetTimestampMs(); - pReadyInfo->checkpointId = req.checkpointId; + pReadyInfo->checkpointId = checkpointId; + pReadyInfo->childId = childId; - initRpcMsg(&pReadyInfo->msg, TDMT_STREAM_TASK_CHECKPOINT_READY, buf, tlen + sizeof(SMsgHead)); return TSDB_CODE_SUCCESS; } @@ -872,7 +973,7 @@ int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamTaskId, SStreamUpstreamEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, upstreamTaskId); STaskCheckpointReadyInfo info = {0}; - initCheckpointReadyInfo(&info, pTask, pInfo->nodeId, pInfo->taskId, pInfo->childId, &pInfo->epSet, checkpointId); + initCheckpointReadyInfo(&info, pInfo->nodeId, pInfo->taskId, pInfo->childId, &pInfo->epSet, checkpointId); stDebug("s-task:%s (level:%d) prepare checkpoint-ready msg to upstream s-task:0x%" PRIx64 "-0x%x (vgId:%d) idx:%d", diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index ae1d86e317..f6449829a3 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -1121,6 +1121,7 @@ void metaHbToMnode(void* param, void* tmrId) { SStreamMeta* pMeta = taosAcquireRef(streamMetaId, rid); if (pMeta == NULL) { + stError("invalid rid:%" PRId64 " failed to acquired stream-meta", rid); return; } @@ -1341,8 +1342,8 @@ void streamMetaUpdateStageRole(SStreamMeta* pMeta, int64_t stage, bool isLeader) streamMetaWUnLock(pMeta); if (isLeader) { - stInfo("vgId:%d update meta stage:%" PRId64 ", prev:%" PRId64 " leader:%d, start to send Hb", pMeta->vgId, - prevStage, stage, isLeader); + stInfo("vgId:%d update meta stage:%" PRId64 ", prev:%" PRId64 " leader:%d, start to send Hb, rid:%" PRId64, + pMeta->vgId, prevStage, stage, isLeader, pMeta->rid); streamMetaStartHb(pMeta); } else { stInfo("vgId:%d update meta stage:%" PRId64 " prev:%" PRId64 " leader:%d sendMsg beforeClosing:%d", pMeta->vgId, diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index f6524a69ab..7fb45a884c 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -999,9 +999,9 @@ void streamTaskDestroyActiveChkptInfo(SActiveCheckpointInfo* pInfo) { pInfo->pReadyMsgList = taosArrayDestroy(pInfo->pReadyMsgList); pInfo->pCheckpointReadyRecvList = taosArrayDestroy(pInfo->pCheckpointReadyRecvList); - if (pInfo->pCheckTmr != NULL) { - taosTmrStop(pInfo->pCheckTmr); - pInfo->pCheckTmr = NULL; + if (pInfo->pChkptTriggerTmr != NULL) { + taosTmrStop(pInfo->pChkptTriggerTmr); + pInfo->pChkptTriggerTmr = NULL; } taosMemoryFree(pInfo); @@ -1014,7 +1014,6 @@ void streamTaskClearActiveInfo(SActiveCheckpointInfo* pInfo) { pInfo->allUpstreamTriggerRecv = 0; pInfo->dispatchTrigger = false; - taosArrayClear(pInfo->pReadyMsgList); taosArrayClear(pInfo->pDispatchTriggerList); taosArrayClear(pInfo->pCheckpointReadyRecvList); } \ No newline at end of file