From dfa74f82d78b4bfbfcbba4768cf8d0794de45ccc Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 9 Aug 2024 00:40:05 +0800 Subject: [PATCH] fix(stream): avoid repeat send checkpoint-report msg. --- source/dnode/mnode/impl/inc/mndStream.h | 12 ++- source/dnode/mnode/impl/src/mndStream.c | 104 ++++++++++++++------ source/dnode/mnode/impl/src/mndStreamHb.c | 6 +- source/dnode/mnode/impl/src/mndStreamUtil.c | 68 +++++++++---- source/dnode/vnode/src/tqCommon/tqCommon.c | 2 +- source/libs/stream/src/streamCheckpoint.c | 23 +++-- 6 files changed, 155 insertions(+), 60 deletions(-) diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index 89343ce37c..a5d91c8aa8 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -57,6 +57,12 @@ typedef struct SStreamTaskResetMsg { int32_t transId; } SStreamTaskResetMsg; +typedef struct SChkptReportInfo { + SArray* pTaskList; + int64_t reportChkpt; + int64_t streamId; +} SChkptReportInfo; + typedef struct SStreamExecInfo { bool initTaskList; SArray *pNodeList; @@ -66,9 +72,9 @@ typedef struct SStreamExecInfo { SArray *pTaskList; TdThreadMutex lock; SHashObj *pTransferStateStreams; - SHashObj *pChkptStreams; + SHashObj *pChkptStreams; // use to update the checkpoint info, if all tasks send the checkpoint-report msgs SHashObj *pStreamConsensus; - SArray *pKilledChkptTrans; // SArray + SArray *pKilledChkptTrans; // SArray } SStreamExecInfo; extern SStreamExecInfo execInfo; @@ -153,6 +159,8 @@ int32_t mndGetConsensusInfo(SHashObj *pHash, int64_t streamId, int32_t numOfTask void mndAddConsensusTasks(SCheckpointConsensusInfo *pInfo, const SRestoreCheckpointInfo *pRestoreInfo); void mndClearConsensusRspEntry(SCheckpointConsensusInfo *pInfo); int64_t mndClearConsensusCheckpointId(SHashObj* pHash, int64_t streamId); +int64_t mndClearChkptReportInfo(SHashObj* pHash, int64_t streamId); +int32_t mndResetChkptReportInfo(SHashObj* pHash, int64_t streamId); int32_t setStreamAttrInResBlock(SStreamObj *pStream, SSDataBlock *pBlock, int32_t numOfRows); int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SSDataBlock *pBlock, int32_t numOfRows); diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 90ef7daa60..a8d35993c7 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -2454,8 +2454,45 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) { return 0; } -static void doAddReportStreamTask(SArray* pList, const SCheckpointReport* pReport) { - bool existed = false; +// valid the info according to the HbMsg +static bool validateChkptReport(const SCheckpointReport *pReport, int64_t reportChkptId) { + STaskId id = {.streamId = pReport->streamId, .taskId = pReport->taskId}; + STaskStatusEntry *pTaskEntry = taosHashGet(execInfo.pTaskMap, &id, sizeof(id)); + if (pTaskEntry == NULL) { + mError("invalid checkpoint-report msg from task:0x%x, discard", pReport->taskId); + return false; + } + + if (pTaskEntry->checkpointInfo.latestId >= pReport->checkpointId) { + mError("s-task:0x%x invalid checkpoint-report msg, checkpointId:%" PRId64 " saved checkpointId:%" PRId64 " discard", + pReport->taskId, pReport->checkpointId, pTaskEntry->checkpointInfo.activeId); + return false; + } + + // now the task in checkpoint procedure + if ((pTaskEntry->checkpointInfo.activeId != 0) && (pTaskEntry->checkpointInfo.activeId > pReport->checkpointId)) { + mError("s-task:0x%x invalid checkpoint-report msg, checkpointId:%" PRId64 " active checkpointId:%" PRId64 + " discard", + pReport->taskId, pReport->checkpointId, pTaskEntry->checkpointInfo.activeId); + return false; + } + + if (reportChkptId >= pReport->checkpointId) { + mError("s-task:0x%x expired checkpoint-report msg, checkpointId:%" PRId64 " already update checkpointId:%" PRId64 + " discard", + pReport->taskId, pReport->checkpointId, reportChkptId); + return false; + } + + return true; +} + +static void doAddReportStreamTask(SArray *pList, int64_t reportChkptId, const SCheckpointReport *pReport) { + bool valid = validateChkptReport(pReport, reportChkptId); + if (!valid) { + return; + } + for (int32_t i = 0; i < taosArrayGetSize(pList); ++i) { STaskChkptInfo *p = taosArrayGet(pList, i); if (p == NULL) { @@ -2463,27 +2500,38 @@ static void doAddReportStreamTask(SArray* pList, const SCheckpointReport* pRepor } if (p->taskId == pReport->taskId) { - existed = true; - break; + if (p->checkpointId > pReport->checkpointId) { + mError("s-task:0x%x invalid checkpoint-report msg, existed:%" PRId64 " req checkpointId:%" PRId64 ", discard", + pReport->taskId, p->checkpointId, pReport->checkpointId); + } else if (p->checkpointId < pReport->checkpointId) { // expired checkpoint-report msg, update it + mDebug("s-task:0x%x expired checkpoint-report msg in checkpoint-report list update from %" PRId64 "->%" PRId64, + pReport->taskId, p->checkpointId, pReport->checkpointId); + + memcpy(p, pReport, sizeof(STaskChkptInfo)); + } else { + mWarn("taskId:0x%x already in checkpoint-report list", pReport->taskId); + } + return; } } - if (!existed) { - STaskChkptInfo info = { - .streamId = pReport->streamId, - .taskId = pReport->taskId, - .transId = pReport->transId, - .dropHTask = pReport->dropHTask, - .version = pReport->checkpointVer, - .ts = pReport->checkpointTs, - .checkpointId = pReport->checkpointId, - .nodeId = pReport->nodeId, - }; + STaskChkptInfo info = { + .streamId = pReport->streamId, + .taskId = pReport->taskId, + .transId = pReport->transId, + .dropHTask = pReport->dropHTask, + .version = pReport->checkpointVer, + .ts = pReport->checkpointTs, + .checkpointId = pReport->checkpointId, + .nodeId = pReport->nodeId, + }; - void* p = taosArrayPush(pList, &info); - if (p == NULL) { - mError("failed to put into task list, taskId:0x%x", pReport->taskId); - } + void *p = taosArrayPush(pList, &info); + if (p == NULL) { + mError("failed to put into task list, taskId:0x%x", pReport->taskId); + } else { + int32_t size = taosArrayGetSize(pList); + mDebug("stream:0x%"PRIx64" %d tasks has send checkpoint-report", pReport->streamId, size); } } @@ -2530,23 +2578,23 @@ int32_t mndProcessCheckpointReport(SRpcMsg *pReq) { int32_t numOfTasks = (pStream == NULL) ? 0 : mndGetNumOfStreamTasks(pStream); - SArray **pReqTaskList = (SArray **)taosHashGet(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId)); - if (pReqTaskList == NULL) { - SArray *pList = taosArrayInit(4, sizeof(STaskChkptInfo)); - if (pList != NULL) { - doAddReportStreamTask(pList, &req); - code = taosHashPut(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId), &pList, POINTER_BYTES); + SChkptReportInfo *pInfo = (SChkptReportInfo*)taosHashGet(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId)); + if (pInfo == NULL) { + SChkptReportInfo info = {.pTaskList = taosArrayInit(4, sizeof(STaskChkptInfo)), .streamId = req.streamId}; + if (info.pTaskList != NULL) { + doAddReportStreamTask(info.pTaskList, info.reportChkpt, &req); + code = taosHashPut(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId), &info, sizeof(info)); if (code) { mError("stream:0x%" PRIx64 " failed to put into checkpoint stream", req.streamId); } - pReqTaskList = (SArray **)taosHashGet(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId)); + pInfo = (SChkptReportInfo *)taosHashGet(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId)); } } else { - doAddReportStreamTask(*pReqTaskList, &req); + doAddReportStreamTask(pInfo->pTaskList, pInfo->reportChkpt, &req); } - int32_t total = taosArrayGetSize(*pReqTaskList); + int32_t total = taosArrayGetSize(pInfo->pTaskList); if (total == numOfTasks) { // all tasks has send the reqs mInfo("stream:0x%" PRIx64 " %s all %d tasks send checkpoint-report, checkpoint meta-info for checkpointId:%" PRId64 " will be issued soon", diff --git a/source/dnode/mnode/impl/src/mndStreamHb.c b/source/dnode/mnode/impl/src/mndStreamHb.c index 50db903520..59f07ce977 100644 --- a/source/dnode/mnode/impl/src/mndStreamHb.c +++ b/source/dnode/mnode/impl/src/mndStreamHb.c @@ -211,6 +211,10 @@ int32_t mndProcessResetStatusReq(SRpcMsg *pReq) { SStreamTaskResetMsg* pMsg = pReq->pCont; mndKillTransImpl(pMnode, pMsg->transId, ""); + streamMutexLock(&execInfo.lock); + (void) mndResetChkptReportInfo(execInfo.pChkptStreams, pMsg->streamId); + streamMutexUnlock(&execInfo.lock); + code = mndGetStreamObj(pMnode, pMsg->streamId, &pStream); if (pStream == NULL || code != 0) { code = TSDB_CODE_STREAM_TASK_NOT_EXIST; @@ -453,7 +457,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { addIntoCheckpointList(pFailedChkpt, &info); // remove failed trans from pChkptStreams - code = taosHashRemove(execInfo.pChkptStreams, &p->id.streamId, sizeof(p->id.streamId)); + code = mndResetChkptReportInfo(execInfo.pChkptStreams, p->id.streamId); if (code) { mError("failed to remove stream:0x%"PRIx64" in checkpoint stream list", p->id.streamId); } diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index 739bb0ca37..649cab91c1 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -904,8 +904,9 @@ void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) { ASSERT(taosHashGetSize(pExecNode->pTaskMap) == taosArrayGetSize(pExecNode->pTaskList)); - // 2. remove stream entry in consensus hash table + // 2. remove stream entry in consensus hash table and checkpoint-report hash table (void) mndClearConsensusCheckpointId(execInfo.pStreamConsensus, pStream->uid); + (void) mndClearChkptReportInfo(execInfo.pChkptStreams, pStream->uid); streamMutexUnlock(&pExecNode->lock); destroyStreamTaskIter(pIter); @@ -973,9 +974,8 @@ int32_t removeExpiredNodeEntryAndTaskInBuf(SArray *pNodeSnapshot) { static int32_t doSetUpdateChkptAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask) { SVUpdateCheckpointInfoReq *pReq = taosMemoryCalloc(1, sizeof(SVUpdateCheckpointInfoReq)); if (pReq == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; mError("failed to malloc in reset stream, size:%" PRIzu ", code:%s", sizeof(SVUpdateCheckpointInfoReq), - tstrerror(TSDB_CODE_OUT_OF_MEMORY)); + tstrerror(terrno)); return terrno; } @@ -983,12 +983,14 @@ static int32_t doSetUpdateChkptAction(SMnode *pMnode, STrans *pTrans, SStreamTas pReq->taskId = pTask->id.taskId; pReq->streamId = pTask->id.streamId; - SArray **pReqTaskList = (SArray **)taosHashGet(execInfo.pChkptStreams, &pTask->id.streamId, sizeof(pTask->id.streamId)); - ASSERT(pReqTaskList); + SChkptReportInfo *pStreamItem = (SChkptReportInfo*)taosHashGet(execInfo.pChkptStreams, &pTask->id.streamId, sizeof(pTask->id.streamId)); + if (pStreamItem == NULL) { + return TSDB_CODE_INVALID_PARA; + } - int32_t size = taosArrayGetSize(*pReqTaskList); + int32_t size = taosArrayGetSize(pStreamItem->pTaskList); for(int32_t i = 0; i < size; ++i) { - STaskChkptInfo* pInfo = taosArrayGet(*pReqTaskList, i); + STaskChkptInfo* pInfo = taosArrayGet(pStreamItem->pTaskList, i); if (pInfo == NULL) { continue; } @@ -1063,11 +1065,12 @@ int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq) { } mDebug("start to scan checkpoint report info"); + streamMutexLock(&execInfo.lock); while ((pIter = taosHashIterate(execInfo.pChkptStreams, pIter)) != NULL) { - SArray *pList = *(SArray **)pIter; + SChkptReportInfo* px = (SChkptReportInfo *)pIter; - STaskChkptInfo *pInfo = taosArrayGet(pList, 0); + STaskChkptInfo *pInfo = taosArrayGet(px->pTaskList, 0); if (pInfo == NULL) { continue; } @@ -1080,12 +1083,11 @@ int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq) { if (p == NULL) { mError("failed to put stream into drop list:0x%" PRIx64, pInfo->streamId); } - continue; } int32_t total = mndGetNumOfStreamTasks(pStream); - int32_t existed = (int32_t)taosArrayGetSize(pList); + int32_t existed = (int32_t)taosArrayGetSize(px->pTaskList); if (total == existed) { mDebug("stream:0x%" PRIx64 " %s all %d tasks send checkpoint-report, start to update checkpoint-info", @@ -1093,14 +1095,11 @@ int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq) { bool conflict = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHKPT_UPDATE_NAME, false); if (!conflict) { - code = mndCreateStreamChkptInfoUpdateTrans(pMnode, pStream, pList); + code = mndCreateStreamChkptInfoUpdateTrans(pMnode, pStream, px->pTaskList); if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) { // remove this entry - void* p = taosArrayPush(pDropped, &pInfo->streamId); - if (p == NULL) { - mError("failed to remove stream:0x%" PRIx64, pInfo->streamId); - } else { - mDebug("stream:0x%" PRIx64 " removed", pInfo->streamId); - } + taosArrayClear(px->pTaskList); + px->reportChkpt = pInfo->checkpointId; + mDebug("stream:0x%" PRIx64 " clear checkpoint-report list", pInfo->streamId); } else { mDebug("stream:0x%" PRIx64 " not launch chkpt-meta update trans, due to checkpoint not finished yet", pInfo->streamId); @@ -1135,6 +1134,8 @@ int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq) { mDebug("drop %d stream(s) in checkpoint-report list, remain:%d", size, numOfStreams); } + streamMutexUnlock(&execInfo.lock); + taosArrayDestroy(pDropped); return TSDB_CODE_SUCCESS; } @@ -1319,7 +1320,7 @@ int64_t mndClearConsensusCheckpointId(SHashObj* pHash, int64_t streamId) { int32_t code = 0; int32_t numOfStreams = taosHashGetSize(pHash); if (numOfStreams == 0) { - return TSDB_CODE_SUCCESS; + return code; } code = taosHashRemove(pHash, &streamId, sizeof(streamId)); @@ -1332,6 +1333,35 @@ int64_t mndClearConsensusCheckpointId(SHashObj* pHash, int64_t streamId) { return code; } +int64_t mndClearChkptReportInfo(SHashObj* pHash, int64_t streamId) { + int32_t code = 0; + int32_t numOfStreams = taosHashGetSize(pHash); + if (numOfStreams == 0) { + return code; + } + + code = taosHashRemove(pHash, &streamId, sizeof(streamId)); + if (code == 0) { + mDebug("drop stream:0x%" PRIx64 " in chkpt-report list, remain:%d", streamId, numOfStreams); + } else { + mError("failed to remove stream:0x%"PRIx64" in chkpt-report list, remain:%d", streamId, numOfStreams); + } + + return code; +} + +int32_t mndResetChkptReportInfo(SHashObj* pHash, int64_t streamId) { + SChkptReportInfo* pInfo = taosHashGet(pHash, &streamId, sizeof(streamId)); + if (pInfo != NULL) { + taosArrayClear(pInfo->pTaskList); + mDebug("stream:0x%" PRIx64 " checkpoint-report list cleared, prev report checkpointId:%" PRId64, streamId, + pInfo->reportChkpt); + return 0; + } + + return TSDB_CODE_MND_STREAM_NOT_EXIST; +} + static void mndShowStreamStatus(char *dst, SStreamObj *pStream) { int8_t status = atomic_load_8(&pStream->status); if (status == STREAM_STATUS__NORMAL) { diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index ba911fa76d..faca2020c5 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -563,7 +563,7 @@ int32_t tqStreamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg) pTask->id.idStr, req.downstreamTaskId, req.downstreamNodeId); } - code = streamProcessCheckpointReadyMsg(pTask, req.checkpointId, req.downstreamTaskId, req.downstreamNodeId); + code = streamProcessCheckpointReadyMsg(pTask, req.checkpointId, req.downstreamNodeId, req.downstreamTaskId); streamMetaReleaseTask(pMeta, pTask); if (code) { return code; diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 6c0f8ec6cb..65e5c475b4 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -361,7 +361,6 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock (void)streamTaskBuildCheckpoint(pTask); // todo: not handle error yet } else { // source & agg tasks need to forward the checkpoint msg downwards stDebug("s-task:%s process checkpoint-trigger block, all %d upstreams sent, forwards to downstream", id, num); - flushStateDataInExecutor(pTask, (SStreamQueueItem*)pBlock); // Put the checkpoint-trigger block into outputQ, to make sure all blocks with less version have been handled by @@ -376,8 +375,8 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock // only when all downstream tasks are send checkpoint rsp, we can start the checkpoint procedure for the agg task static int32_t processCheckpointReadyHelp(SActiveCheckpointInfo* pInfo, int32_t numOfDownstream, int32_t downstreamNodeId, int64_t streamId, int32_t downstreamTaskId, - const char* id, int32_t* pNotReady, int32_t* pTransId) { - bool received = false; + const char* id, int32_t* pNotReady, int32_t* pTransId, bool* alreadyRecv) { + *alreadyRecv = false; int32_t size = taosArrayGetSize(pInfo->pCheckpointReadyRecvList); for (int32_t i = 0; i < size; ++i) { STaskDownstreamReadyInfo* p = taosArrayGet(pInfo->pCheckpointReadyRecvList, i); @@ -386,12 +385,12 @@ static int32_t processCheckpointReadyHelp(SActiveCheckpointInfo* pInfo, int32_t } if (p->downstreamTaskId == downstreamTaskId) { - received = true; + (*alreadyRecv) = true; break; } } - if (received) { + if (*alreadyRecv) { stDebug("s-task:%s already recv checkpoint-ready msg from downstream:0x%x, ignore. %d/%d downstream not ready", id, downstreamTaskId, (int32_t)(numOfDownstream - taosArrayGetSize(pInfo->pCheckpointReadyRecvList)), numOfDownstream); @@ -427,6 +426,7 @@ int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask, int64_t checkpointId int32_t code = 0; int32_t notReady = 0; int32_t transId = 0; + bool alreadyHandled = false; // 1. not in checkpoint status now SStreamTaskState pStat = streamTaskGetStatus(pTask); @@ -445,12 +445,17 @@ int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask, int64_t checkpointId streamMutexLock(&pInfo->lock); code = processCheckpointReadyHelp(pInfo, total, downstreamNodeId, pTask->id.streamId, downstreamTaskId, id, ¬Ready, - &transId); + &transId, &alreadyHandled); streamMutexUnlock(&pInfo->lock); - if ((notReady == 0) && (code == 0)) { - stDebug("s-task:%s all downstream tasks have completed build checkpoint, do checkpoint for current task", id); - (void)appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT, checkpointId, transId, -1); + if (alreadyHandled) { + stDebug("s-task:%s checkpoint-ready msg checkpointId:%" PRId64 " from task:0x%x already handled, not handle again", + id, checkpointId, downstreamTaskId); + } else { + if ((notReady == 0) && (code == 0) && (!alreadyHandled)) { + stDebug("s-task:%s all downstream tasks have completed build checkpoint, do checkpoint for current task", id); + (void)appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT, checkpointId, transId, -1); + } } return code;