diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 4f5198acc0..4f75aae42e 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -2416,8 +2416,8 @@ static bool validateChkptReport(const SCheckpointReport *pReport, int64_t report return true; } -static void doAddReportStreamTask(SArray *pList, int64_t reportChkptId, const SCheckpointReport *pReport) { - bool valid = validateChkptReport(pReport, reportChkptId); +static void doAddReportStreamTask(SArray *pList, int64_t reportedChkptId, const SCheckpointReport *pReport) { + bool valid = validateChkptReport(pReport, reportedChkptId); if (!valid) { return; } @@ -2433,7 +2433,7 @@ static void doAddReportStreamTask(SArray *pList, int64_t reportChkptId, const SC mError("s-task:0x%x invalid checkpoint-report msg, existed:%" PRId64 " req checkpointId:%" PRId64 ", discard", pReport->taskId, p->checkpointId, pReport->checkpointId); } else if (p->checkpointId < pReport->checkpointId) { // expired checkpoint-report msg, update it - mDebug("s-task:0x%x expired checkpoint-report msg in checkpoint-report list update from %" PRId64 "->%" PRId64, + mInfo("s-task:0x%x expired checkpoint-report info in checkpoint-report list update from %" PRId64 "->%" PRId64, pReport->taskId, p->checkpointId, pReport->checkpointId); // update the checkpoint report info @@ -2465,7 +2465,8 @@ static void doAddReportStreamTask(SArray *pList, int64_t reportChkptId, const SC mError("failed to put into task list, taskId:0x%x", pReport->taskId); } else { int32_t size = taosArrayGetSize(pList); - mDebug("stream:0x%" PRIx64 " %d tasks has send checkpoint-report", pReport->streamId, size); + mDebug("stream:0x%" PRIx64 " taskId:0x%x checkpoint-report recv, %d tasks has send checkpoint-report", + pReport->streamId, pReport->taskId, size); } } @@ -2491,7 +2492,7 @@ int32_t mndProcessCheckpointReport(SRpcMsg *pReq) { " checkpointVer:%" PRId64 " transId:%d", req.nodeId, req.taskId, req.checkpointId, req.checkpointVer, req.transId); - // register to the stream task done map, if all tasks has sent this kinds of message, start the checkpoint trans. + // register to the stream task done map, if all tasks has sent these kinds of message, start the checkpoint trans. streamMutexLock(&execInfo.lock); SStreamObj *pStream = NULL; @@ -2500,7 +2501,7 @@ int32_t mndProcessCheckpointReport(SRpcMsg *pReq) { mWarn("failed to find the stream:0x%" PRIx64 ", not handle checkpoint-report, try to acquire in buf", req.streamId); // not in meta-store yet, try to acquire the task in exec buffer - // the checkpoint req arrives too soon before the completion of the create stream trans. + // the checkpoint req arrives too soon before the completion of the creation of stream trans. STaskId id = {.streamId = req.streamId, .taskId = req.taskId}; void *p = taosHashGet(execInfo.pTaskMap, &id, sizeof(id)); if (p == NULL) { @@ -2533,7 +2534,7 @@ int32_t mndProcessCheckpointReport(SRpcMsg *pReq) { } int32_t total = taosArrayGetSize(pInfo->pTaskList); - if (total == numOfTasks) { // all tasks has send the reqs + if (total == numOfTasks) { // all tasks have sent the reqs mInfo("stream:0x%" PRIx64 " %s all %d tasks send checkpoint-report, checkpoint meta-info for checkpointId:%" PRId64 " will be issued soon", req.streamId, pStream->name, total, req.checkpointId); diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index 7a38e68744..69ab56429c 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -658,6 +658,65 @@ int32_t removeExpiredNodeEntryAndTaskInBuf(SArray *pNodeSnapshot) { return 0; } +static int32_t allTasksSendChkptReport(SChkptReportInfo* pReportInfo, int32_t numOfTasks, const char* pName) { + int64_t checkpointId = -1; + int32_t transId = -1; + int32_t taskId = -1; + + int32_t existed = (int32_t)taosArrayGetSize(pReportInfo->pTaskList); + if (existed != numOfTasks) { + mDebug("stream:0x%" PRIx64 " %s %d/%d tasks send checkpoint-report, %d not send", pReportInfo->streamId, pName, + existed, numOfTasks, numOfTasks - existed); + return -1; + } + + // acquire current active checkpointId, and do cross-check checkpointId info in exec.pTaskList + for(int32_t i = 0; i < numOfTasks; ++i) { + STaskChkptInfo *pInfo = taosArrayGet(pReportInfo->pTaskList, i); + if (pInfo == NULL) { + continue; + } + + if (checkpointId == -1) { + checkpointId = pInfo->checkpointId; + transId = pInfo->transId; + taskId = pInfo->taskId; + } else if (checkpointId != pInfo->checkpointId) { + mError("stream:0x%" PRIx64 + " checkpointId in checkpoint-report list are not identical, type 1 taskId:0x%x checkpointId:%" PRId64 + ", type 2 taskId:0x%x checkpointId:%" PRId64, + pReportInfo->streamId, taskId, checkpointId, pInfo->taskId, pInfo->checkpointId); + return -1; + } + } + + // check for the correct checkpointId for current task info in STaskChkptInfo + STaskChkptInfo *p = taosArrayGet(pReportInfo->pTaskList, 0); + STaskId id = {.streamId = p->streamId, .taskId = p->taskId}; + STaskStatusEntry *pe = taosHashGet(execInfo.pTaskMap, &id, sizeof(id)); + + // cross-check failed, there must be something unknown wrong + SStreamTransInfo *pTransInfo = taosHashGet(execInfo.transMgmt.pDBTrans, &id.streamId, sizeof(id.streamId)); + if (pTransInfo == NULL) { + mError("stream:0x%" PRIx64" no active exists for checkpoint transId:%d, clear checkpoint-report list", id.streamId); + taosArrayClear(pReportInfo->pTaskList); + return -1; + } + + if (pTransInfo->transId != transId) { + mError("stream:0x%" PRIx64 + " checkpoint-report list info are expired, clear and retry, active transId:%d trans in list:%d", + id.streamId, pTransInfo->transId, transId); + taosArrayClear(pReportInfo->pTaskList); + return -1; + } + + mDebug("stream:0x%" PRIx64 " %s all %d tasks send checkpoint-report, start to update checkpoint-info", id.streamId, + pName, numOfTasks); + + return TSDB_CODE_SUCCESS; +} + int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; void *pIter = NULL; @@ -668,6 +727,7 @@ int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq) { } mDebug("start to scan checkpoint report info"); + streamMutexLock(&execInfo.lock); while ((pIter = taosHashIterate(execInfo.pChkptStreams, pIter)) != NULL) { @@ -693,30 +753,27 @@ int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq) { } int32_t total = mndGetNumOfStreamTasks(pStream); - int32_t existed = (int32_t)taosArrayGetSize(px->pTaskList); - - if (total == existed) { - mDebug("stream:0x%" PRIx64 " %s all %d tasks send checkpoint-report, start to update checkpoint-info", - pStream->uid, pStream->name, total); - + int32_t ret = allTasksSendChkptReport(px, total, pStream->name); + if (ret == 0) { code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHKPT_UPDATE_NAME, false); if (code == 0) { code = mndCreateStreamChkptInfoUpdateTrans(pMnode, pStream, px->pTaskList); if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) { // remove this entry taosArrayClear(px->pTaskList); + mInfo("stream:0x%" PRIx64 " clear checkpoint-report list and update the report checkpointId from:%" PRId64 + " to %" PRId64, + pInfo->streamId, px->reportChkpt, pInfo->checkpointId); px->reportChkpt = pInfo->checkpointId; - mDebug("stream:0x%" PRIx64 " clear checkpoint-report list", pInfo->streamId); } else { - mDebug("stream:0x%" PRIx64 " not launch chkpt-meta update trans, due to checkpoint not finished yet", + mDebug("stream:0x%" PRIx64 " not launch chkpt-info update trans, due to checkpoint not finished yet", pInfo->streamId); } + + sdbRelease(pMnode->pSdb, pStream); break; } else { mDebug("stream:0x%" PRIx64 " active checkpoint trans not finished yet, wait", pInfo->streamId); } - } else { - mDebug("stream:0x%" PRIx64 " %s %d/%d tasks send checkpoint-report, %d not send", pInfo->streamId, pStream->name, - existed, total, total - existed); } sdbRelease(pMnode->pSdb, pStream); @@ -743,6 +800,8 @@ int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq) { streamMutexUnlock(&execInfo.lock); taosArrayDestroy(pDropped); + + mDebug("end to scan checkpoint report info") return TSDB_CODE_SUCCESS; }