diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 0f546ee869..99ca2104ff 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -753,7 +753,7 @@ tmr_h streamTimerGetInstance(); // checkpoint int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq); int32_t streamTaskProcessCheckpointTriggerRsp(SStreamTask* pTask, SCheckpointTriggerRsp* pRsp); -int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask); +int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask, int32_t downstreamNodeId, int32_t downstreamTaskId); int32_t streamTaskBuildCheckpoint(SStreamTask* pTask); void streamTaskClearCheckInfo(SStreamTask* pTask, bool clearChkpReadyMsg); int32_t streamAlignTransferState(SStreamTask* pTask); diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index c8ca324c5e..b779cbe932 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -492,7 +492,7 @@ int32_t tqStreamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg) tqDebug("vgId:%d s-task:%s received the checkpoint ready msg from task:0x%x (vgId:%d), handle it", vgId, pTask->id.idStr, req.downstreamTaskId, req.downstreamNodeId); - streamProcessCheckpointReadyMsg(pTask); + streamProcessCheckpointReadyMsg(pTask, req.downstreamTaskId, req.downstreamNodeId); streamMetaReleaseTask(pMeta, pTask); { // send checkpoint ready rsp diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 9a3cbdc963..c943f663e6 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -60,9 +60,9 @@ struct SActiveCheckpointInfo { int64_t failedId; bool dispatchTrigger; SArray* pDispatchTriggerList; // SArray - SArray* pReadyMsgList; // SArray + SArray* pReadyMsgList; // SArray int8_t allUpstreamTriggerRecv; - + SArray* pCheckpointReadyRecvList; // SArray int32_t checkCounter; tmr_h pCheckTmr; }; @@ -97,14 +97,14 @@ struct STokenBucket { }; typedef struct { - int32_t upStreamTaskId; + int32_t upstreamTaskId; SEpSet upstreamNodeEpset; int32_t nodeId; SRpcMsg msg; int64_t recvTs; int32_t transId; int64_t checkpointId; -} SStreamChkptReadyInfo; +} STaskCheckpointReadyInfo; typedef struct { int64_t sendTs; @@ -114,6 +114,15 @@ typedef struct { int32_t taskId; } STaskTriggerSendInfo; +typedef struct { + int64_t streamId; + int64_t recvTs; + int32_t downstreamNodeId; + int32_t downstreamTaskId; + int64_t checkpointId; + int32_t transId; +} STaskCheckpointReadyRecvInfo; + struct SStreamQueue { STaosQueue* pQueue; STaosQall* qall; @@ -203,6 +212,9 @@ int32_t streamTaskDownloadCheckpointData(const char* id, char* path); int32_t streamTaskOnNormalTaskReady(SStreamTask* pTask); int32_t streamTaskOnScanHistoryTaskReady(SStreamTask* pTask); +int32_t initCheckpointReadyInfo(STaskCheckpointReadyInfo* pReadyInfo, SStreamTask* pTask, int32_t upstreamNodeId, + int32_t upstreamTaskId, int32_t childId, SEpSet* pEpset, int64_t checkpointId); + typedef int32_t (*__stream_async_exec_fn_t)(void* param); int32_t streamMetaAsyncExec(SStreamMeta* pMeta, __stream_async_exec_fn_t fn, void* param, int32_t* code); diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 7f41629dac..6b6b740f01 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -182,7 +182,7 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo; taosThreadMutexLock(&pTask->lock); - if (pTask->chkInfo.checkpointId >= checkpointId) { + if (pTask->chkInfo.checkpointId > checkpointId) { stError("s-task:%s vgId:%d current checkpointId:%" PRId64 " recv expired checkpoint-trigger block, checkpointId:%" PRId64 " transId:%d, discard", id, vgId, pTask->chkInfo.checkpointId, checkpointId, transId); @@ -190,6 +190,26 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock return TSDB_CODE_SUCCESS; } + if (pTask->chkInfo.checkpointId == checkpointId) { + { // send checkpoint-ready msg to upstream + SStreamUpstreamEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, pBlock->srcTaskId); + + STaskCheckpointReadyInfo info = {0}; + initCheckpointReadyInfo(&info, pTask, pInfo->nodeId, pInfo->taskId, pInfo->childId, &pInfo->epSet, checkpointId); + + tmsgSendReq(&info.upstreamNodeEpset, &info.msg); + } + + stWarn( + "s-task:%s vgId:%d recv already finished checkpoint msg, send checkpoint-ready to upstream:0x%x to resume the " + "interrupted checkpoint", + id, vgId, pBlock->srcTaskId); + + streamTaskOpenUpstreamInput(pTask, pBlock->srcTaskId); + taosThreadMutexUnlock(&pTask->lock); + return TSDB_CODE_SUCCESS; + } + if (streamTaskGetStatus(pTask)->state == TASK_STATUS__CK) { if (pActiveInfo->activeId != checkpointId) { stError("s-task:%s vgId:%d active checkpointId:%" PRId64 ", recv invalid checkpoint-trigger checkpointId:%" PRId64 @@ -210,12 +230,12 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock if (taskLevel == TASK_LEVEL__SINK || taskLevel == TASK_LEVEL__AGG) { // check if already recv or not, and duplicated checkpoint-trigger msg recv, discard it for (int32_t i = 0; i < taosArrayGetSize(pActiveInfo->pReadyMsgList); ++i) { - SStreamChkptReadyInfo* p = taosArrayGet(pActiveInfo->pReadyMsgList, i); - if (p->upStreamTaskId == pBlock->srcTaskId) { + STaskCheckpointReadyInfo* p = taosArrayGet(pActiveInfo->pReadyMsgList, i); + if (p->upstreamTaskId == pBlock->srcTaskId) { ASSERT(p->checkpointId == checkpointId); stWarn("s-task:%s repeatly recv checkpoint-source msg from task:0x%x vgId:%d, checkpointId:%" PRId64 ", prev recvTs:%" PRId64 " discard", - pTask->id.idStr, p->upStreamTaskId, p->nodeId, p->checkpointId, p->recvTs); + pTask->id.idStr, p->upstreamTaskId, p->nodeId, p->checkpointId, p->recvTs); taosThreadMutexUnlock(&pTask->lock); return TSDB_CODE_SUCCESS; @@ -262,8 +282,7 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock stDebug("s-task:%s set childIdx:%d, and add checkpoint-trigger block into outputQ", id, pTask->info.selfChildId); continueDispatchCheckpointTriggerBlock(pBlock, pTask); } else { // only one task exists, no need to dispatch downstream info - atomic_add_fetch_32(&pTask->chkInfo.numOfNotReady, 1); - streamProcessCheckpointReadyMsg(pTask); + streamProcessCheckpointReadyMsg(pTask, 0, 0); streamFreeQitem((SStreamQueueItem*)pBlock); } } else if (taskLevel == TASK_LEVEL__SINK || taskLevel == TASK_LEVEL__AGG) { @@ -307,23 +326,47 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock * All down stream tasks have successfully completed the check point task. * Current stream task is allowed to start to do checkpoint things in ASYNC model. */ -int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask) { +int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask, int32_t downstreamNodeId, int32_t downstreamTaskId) { ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE || pTask->info.taskLevel == TASK_LEVEL__AGG); + SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo; + + const char* id = pTask->id.idStr; + bool received = false; + int32_t total = streamTaskGetNumOfDownstream(pTask); + + taosThreadMutexLock(&pInfo->lock); // only when all downstream tasks are send checkpoint rsp, we can start the checkpoint procedure for the agg task - int32_t notReady = atomic_sub_fetch_32(&pTask->chkInfo.numOfNotReady, 1); - ASSERT(notReady >= 0); - - if (notReady == 0) { - stDebug("s-task:%s all downstream tasks have completed the checkpoint, start to do checkpoint for current task", - pTask->id.idStr); - SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo; - appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT, pInfo->activeId, pInfo->transId); - } else { - int32_t total = streamTaskGetNumOfDownstream(pTask); - stDebug("s-task:%s %d/%d downstream tasks are not ready, wait", pTask->id.idStr, notReady, total); + int32_t size = taosArrayGetSize(pInfo->pCheckpointReadyRecvList); + for (int32_t i = 0; i < size; ++i) { + STaskCheckpointReadyRecvInfo* p = taosArrayGet(pInfo->pCheckpointReadyRecvList, i); + if (p->downstreamTaskId == downstreamTaskId) { + received = true; + break; + } } + if (received) { + stDebug("s-task:%s already recv checkpoint-ready msg from downstream:0x%x, %d/%d downstream not ready", id, + downstreamTaskId, (int32_t)(total - taosArrayGetSize(pInfo->pCheckpointReadyRecvList)), total); + } else { + STaskCheckpointReadyRecvInfo info = {.recvTs = taosGetTimestampMs(), + .downstreamTaskId = downstreamTaskId, + .checkpointId = pInfo->activeId, + .transId = pInfo->transId, + .streamId = pTask->id.streamId, + .downstreamNodeId = downstreamNodeId}; + taosArrayPush(pInfo->pCheckpointReadyRecvList, &info); + } + + int32_t notReady = total - taosArrayGetSize(pInfo->pCheckpointReadyRecvList); + if (notReady == 0) { + stDebug("s-task:%s all downstream task(s) have completed build checkpoint, start to do checkpoint for current task", + id); + appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT, pInfo->activeId, pInfo->transId); + } + + taosThreadMutexUnlock(&pInfo->lock); return 0; } @@ -685,7 +728,7 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) { bool recved = false; for(int32_t j = 0; j < taosArrayGetSize(pActiveInfo->pReadyMsgList); ++j) { - SStreamChkptReadyInfo* pReady = taosArrayGet(pActiveInfo->pReadyMsgList, j); + STaskCheckpointReadyInfo* pReady = taosArrayGet(pActiveInfo->pReadyMsgList, j); if (pInfo->nodeId == pReady->nodeId) { recved = true; break; diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index f134224196..aa0d7c3120 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -633,11 +633,11 @@ int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask) { ASSERT(taosArrayGetSize(pTask->upstreamInfo.pList) == num); for (int32_t i = 0; i < num; ++i) { - SStreamChkptReadyInfo* pInfo = taosArrayGet(pList, i); + STaskCheckpointReadyInfo* pInfo = taosArrayGet(pList, i); tmsgSendReq(&pInfo->upstreamNodeEpset, &pInfo->msg); stDebug("s-task:%s level:%d checkpoint ready msg sent to upstream:0x%x", pTask->id.idStr, pTask->info.taskLevel, - pInfo->upStreamTaskId); + pInfo->upstreamTaskId); } taosArrayClear(pList); @@ -657,7 +657,7 @@ int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask) { ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE); if (taosArrayGetSize(pList) == 1) { - SStreamChkptReadyInfo* pInfo = taosArrayGet(pList, 0); + STaskCheckpointReadyInfo* pInfo = taosArrayGet(pList, 0); tmsgSendRsp(&pInfo->msg); taosArrayClear(pList); @@ -785,7 +785,7 @@ int32_t streamTaskBuildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRp } int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SStreamTask* pTask) { - SStreamChkptReadyInfo info = { + STaskCheckpointReadyInfo info = { .recvTs = taosGetTimestampMs(), .transId = pReq->transId, .checkpointId = pReq->checkpointId}; streamTaskBuildCheckpointSourceRsp(pReq, pRpcInfo, &info.msg, TSDB_CODE_SUCCESS); @@ -797,7 +797,7 @@ int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHa if (size > 0) { ASSERT(size == 1); - SStreamChkptReadyInfo* pReady = taosArrayGet(pActiveInfo->pReadyMsgList, 0); + STaskCheckpointReadyInfo* pReady = taosArrayGet(pActiveInfo->pReadyMsgList, 0); if (pReady->transId == pReq->transId) { stWarn("s-task:%s repeatly recv checkpoint source msg from mnode, checkpointId:%" PRId64 ", ignore", pTask->id.idStr, pReq->checkpointId); @@ -816,24 +816,20 @@ int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHa return TSDB_CODE_SUCCESS; } -int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamTaskId, int32_t index, int64_t checkpointId) { +int32_t initCheckpointReadyInfo(STaskCheckpointReadyInfo* pReadyInfo, SStreamTask* pTask, int32_t upstreamNodeId, + int32_t upstreamTaskId, int32_t childId, SEpSet* pEpset, int64_t checkpointId) { int32_t code = 0; int32_t tlen = 0; void* buf = NULL; - if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { - return TSDB_CODE_SUCCESS; - } - - SStreamUpstreamEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, upstreamTaskId); SStreamCheckpointReadyMsg req = {0}; req.downstreamNodeId = pTask->pMeta->vgId; req.downstreamTaskId = pTask->id.taskId; req.streamId = pTask->id.streamId; req.checkpointId = checkpointId; - req.childId = pInfo->childId; - req.upstreamNodeId = pInfo->nodeId; - req.upstreamTaskId = pInfo->taskId; + req.childId = childId; + req.upstreamNodeId = upstreamNodeId; + req.upstreamTaskId = upstreamTaskId; tEncodeSize(tEncodeStreamCheckpointReadyMsg, &req, tlen, code); if (code < 0) { @@ -858,20 +854,29 @@ int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamTaskId, ASSERT(req.upstreamTaskId != 0); - SStreamChkptReadyInfo info = { - .upStreamTaskId = pInfo->taskId, - .upstreamNodeEpset = pInfo->epSet, - .nodeId = req.upstreamNodeId, - .recvTs = taosGetTimestampMs(), - .checkpointId = req.checkpointId, - }; + pReadyInfo->upstreamTaskId = upstreamTaskId; + pReadyInfo->upstreamNodeEpset = *pEpset; + pReadyInfo->nodeId = req.upstreamNodeId; + pReadyInfo->recvTs = taosGetTimestampMs(); + pReadyInfo->checkpointId = req.checkpointId; - initRpcMsg(&info.msg, TDMT_STREAM_TASK_CHECKPOINT_READY, buf, tlen + sizeof(SMsgHead)); + initRpcMsg(&pReadyInfo->msg, TDMT_STREAM_TASK_CHECKPOINT_READY, buf, tlen + sizeof(SMsgHead)); + return TSDB_CODE_SUCCESS; +} + +int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamTaskId, int32_t index, int64_t checkpointId) { + if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { + return TSDB_CODE_SUCCESS; + } + + SStreamUpstreamEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, upstreamTaskId); + + STaskCheckpointReadyInfo info = {0}; + initCheckpointReadyInfo(&info, pTask, pInfo->nodeId, pInfo->taskId, pInfo->childId, &pInfo->epSet, checkpointId); stDebug("s-task:%s (level:%d) prepare checkpoint-ready msg to upstream s-task:0x%" PRIx64 - ":0x%x (vgId:%d) idx:%d, vgId:%d", - pTask->id.idStr, pTask->info.taskLevel, req.streamId, req.upstreamTaskId, req.upstreamNodeId, index, - req.upstreamNodeId); + "-0x%x (vgId:%d) idx:%d", + pTask->id.idStr, pTask->info.taskLevel, pTask->id.streamId, pInfo->taskId, pInfo->nodeId, index); SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo; @@ -899,7 +904,7 @@ void streamClearChkptReadyMsg(SStreamTask* pTask) { } for (int i = 0; i < taosArrayGetSize(pActiveInfo->pReadyMsgList); i++) { - SStreamChkptReadyInfo* pInfo = taosArrayGet(pActiveInfo->pReadyMsgList, i); + STaskCheckpointReadyInfo* pInfo = taosArrayGet(pActiveInfo->pReadyMsgList, i); rpcFreeCont(pInfo->msg.pCont); } diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 01e2f89d8c..8084a978ef 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -984,7 +984,8 @@ SActiveCheckpointInfo* streamTaskCreateActiveChkptInfo() { taosThreadMutexInit(&pInfo->lock, NULL); pInfo->pDispatchTriggerList = taosArrayInit(4, sizeof(STaskTriggerSendInfo)); - pInfo->pReadyMsgList = taosArrayInit(4, sizeof(SStreamChkptReadyInfo)); + pInfo->pReadyMsgList = taosArrayInit(4, sizeof(STaskCheckpointReadyInfo)); + pInfo->pCheckpointReadyRecvList = taosArrayInit(4, sizeof(STaskCheckpointReadyRecvInfo)); return pInfo; } @@ -996,6 +997,7 @@ void streamTaskDestroyActiveChkptInfo(SActiveCheckpointInfo* pInfo) { taosThreadMutexDestroy(&pInfo->lock); pInfo->pDispatchTriggerList = taosArrayDestroy(pInfo->pDispatchTriggerList); pInfo->pReadyMsgList = taosArrayDestroy(pInfo->pReadyMsgList); + pInfo->pCheckpointReadyRecvList = taosArrayDestroy(pInfo->pCheckpointReadyRecvList); if (pInfo->pCheckTmr != NULL) { taosTmrStop(pInfo->pCheckTmr); @@ -1014,4 +1016,5 @@ void streamTaskClearActiveInfo(SActiveCheckpointInfo* pInfo) { taosArrayClear(pInfo->pReadyMsgList); taosArrayClear(pInfo->pDispatchTriggerList); + taosArrayClear(pInfo->pCheckpointReadyRecvList); } \ No newline at end of file