diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index ec04aa3111..fc1c95a3b3 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -159,6 +159,7 @@ void removeTasksInBuf(SArray *pTaskIds, SStreamExecInfo *pExecInfo); int32_t mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList, SVgroupChangeInfo *pInfo); void killAllCheckpointTrans(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo); +bool isNodeUpdateTransActive(); int32_t createStreamTaskIter(SStreamObj *pStream, SStreamTaskIter **pIter); void destroyStreamTaskIter(SStreamTaskIter *pIter); @@ -175,8 +176,8 @@ void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode); int32_t mndGetConsensusInfo(SHashObj *pHash, int64_t streamId, int32_t numOfTasks, SCheckpointConsensusInfo **pInfo); void mndAddConsensusTasks(SCheckpointConsensusInfo *pInfo, const SRestoreCheckpointInfo *pRestoreInfo); void mndClearConsensusRspEntry(SCheckpointConsensusInfo *pInfo); -int64_t mndClearConsensusCheckpointId(SHashObj *pHash, int64_t streamId); -int64_t mndClearChkptReportInfo(SHashObj *pHash, int64_t streamId); +int32_t mndClearConsensusCheckpointId(SHashObj *pHash, int64_t streamId); +int32_t mndClearChkptReportInfo(SHashObj *pHash, int64_t streamId); int32_t mndResetChkptReportInfo(SHashObj *pHash, int64_t streamId); int32_t setStreamAttrInResBlock(SStreamObj *pStream, SSDataBlock *pBlock, int32_t numOfRows); diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 4f5198acc0..30953736eb 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -65,8 +65,6 @@ static int32_t mndProcessDropOrphanTaskReq(SRpcMsg *pReq); static void saveTaskAndNodeInfoIntoBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode); static void addAllStreamTasksIntoBuf(SMnode *pMnode, SStreamExecInfo *pExecInfo); -static void removeExpiredNodeInfo(const SArray *pNodeSnapshot); -static int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDbName, size_t len); static SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw); SSdbRaw *mndStreamSeqActionEncode(SStreamObj *pStream); @@ -801,6 +799,13 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { TSDB_CHECK_NULL(sql, code, lino, _OVER, terrno); } + // check for the taskEp update trans + if (isNodeUpdateTransActive()) { + mError("stream:%s failed to create stream, node update trans is active", createReq.name); + code = TSDB_CODE_STREAM_TASK_IVLD_STATUS; + goto _OVER; + } + SDbObj *pSourceDb = mndAcquireDb(pMnode, createReq.sourceDB); if (pSourceDb == NULL) { code = terrno; @@ -2416,8 +2421,8 @@ static bool validateChkptReport(const SCheckpointReport *pReport, int64_t report return true; } -static void doAddReportStreamTask(SArray *pList, int64_t reportChkptId, const SCheckpointReport *pReport) { - bool valid = validateChkptReport(pReport, reportChkptId); +static void doAddReportStreamTask(SArray *pList, int64_t reportedChkptId, const SCheckpointReport *pReport) { + bool valid = validateChkptReport(pReport, reportedChkptId); if (!valid) { return; } @@ -2433,7 +2438,7 @@ static void doAddReportStreamTask(SArray *pList, int64_t reportChkptId, const SC mError("s-task:0x%x invalid checkpoint-report msg, existed:%" PRId64 " req checkpointId:%" PRId64 ", discard", pReport->taskId, p->checkpointId, pReport->checkpointId); } else if (p->checkpointId < pReport->checkpointId) { // expired checkpoint-report msg, update it - mDebug("s-task:0x%x expired checkpoint-report msg in checkpoint-report list update from %" PRId64 "->%" PRId64, + mInfo("s-task:0x%x expired checkpoint-report info in checkpoint-report list update from %" PRId64 "->%" PRId64, pReport->taskId, p->checkpointId, pReport->checkpointId); // update the checkpoint report info @@ -2465,7 +2470,8 @@ static void doAddReportStreamTask(SArray *pList, int64_t reportChkptId, const SC mError("failed to put into task list, taskId:0x%x", pReport->taskId); } else { int32_t size = taosArrayGetSize(pList); - mDebug("stream:0x%" PRIx64 " %d tasks has send checkpoint-report", pReport->streamId, size); + mDebug("stream:0x%" PRIx64 " taskId:0x%x checkpoint-report recv, %d tasks has send checkpoint-report", + pReport->streamId, pReport->taskId, size); } } @@ -2491,7 +2497,7 @@ int32_t mndProcessCheckpointReport(SRpcMsg *pReq) { " checkpointVer:%" PRId64 " transId:%d", req.nodeId, req.taskId, req.checkpointId, req.checkpointVer, req.transId); - // register to the stream task done map, if all tasks has sent this kinds of message, start the checkpoint trans. + // register to the stream task done map, if all tasks has sent these kinds of message, start the checkpoint trans. streamMutexLock(&execInfo.lock); SStreamObj *pStream = NULL; @@ -2500,7 +2506,7 @@ int32_t mndProcessCheckpointReport(SRpcMsg *pReq) { mWarn("failed to find the stream:0x%" PRIx64 ", not handle checkpoint-report, try to acquire in buf", req.streamId); // not in meta-store yet, try to acquire the task in exec buffer - // the checkpoint req arrives too soon before the completion of the create stream trans. + // the checkpoint req arrives too soon before the completion of the creation of stream trans. STaskId id = {.streamId = req.streamId, .taskId = req.taskId}; void *p = taosHashGet(execInfo.pTaskMap, &id, sizeof(id)); if (p == NULL) { @@ -2533,7 +2539,7 @@ int32_t mndProcessCheckpointReport(SRpcMsg *pReq) { } int32_t total = taosArrayGetSize(pInfo->pTaskList); - if (total == numOfTasks) { // all tasks has send the reqs + if (total == numOfTasks) { // all tasks have sent the reqs mInfo("stream:0x%" PRIx64 " %s all %d tasks send checkpoint-report, checkpoint meta-info for checkpointId:%" PRId64 " will be issued soon", req.streamId, pStream->name, total, req.checkpointId); diff --git a/source/dnode/mnode/impl/src/mndStreamTrans.c b/source/dnode/mnode/impl/src/mndStreamTrans.c index a1e104aeca..fe3359dc74 100644 --- a/source/dnode/mnode/impl/src/mndStreamTrans.c +++ b/source/dnode/mnode/impl/src/mndStreamTrans.c @@ -292,6 +292,25 @@ int32_t setTransAction(STrans *pTrans, void *pCont, int32_t contLen, int32_t msg return mndTransAppendRedoAction(pTrans, &action); } +bool isNodeUpdateTransActive() { + bool exist = false; + void *pIter = NULL; + + streamMutexLock(&execInfo.lock); + + while ((pIter = taosHashIterate(execInfo.transMgmt.pDBTrans, pIter)) != NULL) { + SStreamTransInfo *pTransInfo = (SStreamTransInfo *)pIter; + if (strcmp(pTransInfo->name, MND_STREAM_TASK_UPDATE_NAME) == 0) { + mDebug("stream:0x%" PRIx64 " %s st:%" PRId64 " is in task nodeEp update, create new stream not allowed", + pTransInfo->streamId, pTransInfo->name, pTransInfo->startTime); + exist = true; + } + } + + streamMutexUnlock(&execInfo.lock); + return exist; +} + int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDBName, size_t len) { void *pIter = NULL; diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index 7a38e68744..d896434f3b 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -658,6 +658,72 @@ int32_t removeExpiredNodeEntryAndTaskInBuf(SArray *pNodeSnapshot) { return 0; } +static int32_t allTasksSendChkptReport(SChkptReportInfo* pReportInfo, int32_t numOfTasks, const char* pName) { + int64_t checkpointId = -1; + int32_t transId = -1; + int32_t taskId = -1; + + int32_t existed = (int32_t)taosArrayGetSize(pReportInfo->pTaskList); + if (existed != numOfTasks) { + mDebug("stream:0x%" PRIx64 " %s %d/%d tasks send checkpoint-report, %d not send", pReportInfo->streamId, pName, + existed, numOfTasks, numOfTasks - existed); + return -1; + } + + // acquire current active checkpointId, and do cross-check checkpointId info in exec.pTaskList + for(int32_t i = 0; i < numOfTasks; ++i) { + STaskChkptInfo *pInfo = taosArrayGet(pReportInfo->pTaskList, i); + if (pInfo == NULL) { + continue; + } + + if (checkpointId == -1) { + checkpointId = pInfo->checkpointId; + transId = pInfo->transId; + taskId = pInfo->taskId; + } else if (checkpointId != pInfo->checkpointId) { + mError("stream:0x%" PRIx64 + " checkpointId in checkpoint-report list are not identical, type 1 taskId:0x%x checkpointId:%" PRId64 + ", type 2 taskId:0x%x checkpointId:%" PRId64, + pReportInfo->streamId, taskId, checkpointId, pInfo->taskId, pInfo->checkpointId); + return -1; + } + } + + // check for the correct checkpointId for current task info in STaskChkptInfo + STaskChkptInfo *p = taosArrayGet(pReportInfo->pTaskList, 0); + STaskId id = {.streamId = p->streamId, .taskId = p->taskId}; + STaskStatusEntry *pe = taosHashGet(execInfo.pTaskMap, &id, sizeof(id)); + + // cross-check failed, there must be something unknown wrong + SStreamTransInfo *pTransInfo = taosHashGet(execInfo.transMgmt.pDBTrans, &id.streamId, sizeof(id.streamId)); + if (pTransInfo == NULL) { + mWarn("stream:0x%" PRIx64 " no active trans exists for checkpoint transId:%d, it may have been cleared already", + id.streamId, transId); + + if (pe->checkpointInfo.activeId != 0 && pe->checkpointInfo.activeId != checkpointId) { + mWarn("stream:0x%" PRIx64 " active checkpointId is not equalled to the required, current:%" PRId64 + ", req:%" PRId64 " recheck next time", + id.streamId, pe->checkpointInfo.activeId, checkpointId); + return -1; + } else { + // do nothing + } + } else { + if (pTransInfo->transId != transId) { + mError("stream:0x%" PRIx64 + " checkpoint-report list info are expired, active transId:%d trans in list:%d, recheck next time", + id.streamId, pTransInfo->transId, transId); + return -1; + } + } + + mDebug("stream:0x%" PRIx64 " %s all %d tasks send checkpoint-report, start to update checkpoint-info", id.streamId, + pName, numOfTasks); + + return TSDB_CODE_SUCCESS; +} + int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; void *pIter = NULL; @@ -668,6 +734,7 @@ int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq) { } mDebug("start to scan checkpoint report info"); + streamMutexLock(&execInfo.lock); while ((pIter = taosHashIterate(execInfo.pChkptStreams, pIter)) != NULL) { @@ -693,30 +760,27 @@ int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq) { } int32_t total = mndGetNumOfStreamTasks(pStream); - int32_t existed = (int32_t)taosArrayGetSize(px->pTaskList); - - if (total == existed) { - mDebug("stream:0x%" PRIx64 " %s all %d tasks send checkpoint-report, start to update checkpoint-info", - pStream->uid, pStream->name, total); - + int32_t ret = allTasksSendChkptReport(px, total, pStream->name); + if (ret == 0) { code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHKPT_UPDATE_NAME, false); if (code == 0) { code = mndCreateStreamChkptInfoUpdateTrans(pMnode, pStream, px->pTaskList); if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) { // remove this entry taosArrayClear(px->pTaskList); + mInfo("stream:0x%" PRIx64 " clear checkpoint-report list and update the report checkpointId from:%" PRId64 + " to %" PRId64, + pInfo->streamId, px->reportChkpt, pInfo->checkpointId); px->reportChkpt = pInfo->checkpointId; - mDebug("stream:0x%" PRIx64 " clear checkpoint-report list", pInfo->streamId); } else { - mDebug("stream:0x%" PRIx64 " not launch chkpt-meta update trans, due to checkpoint not finished yet", + mDebug("stream:0x%" PRIx64 " not launch chkpt-info update trans, due to checkpoint not finished yet", pInfo->streamId); } + + sdbRelease(pMnode->pSdb, pStream); break; } else { mDebug("stream:0x%" PRIx64 " active checkpoint trans not finished yet, wait", pInfo->streamId); } - } else { - mDebug("stream:0x%" PRIx64 " %s %d/%d tasks send checkpoint-report, %d not send", pInfo->streamId, pStream->name, - existed, total, total - existed); } sdbRelease(pMnode->pSdb, pStream); @@ -743,6 +807,8 @@ int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq) { streamMutexUnlock(&execInfo.lock); taosArrayDestroy(pDropped); + + mDebug("end to scan checkpoint report info") return TSDB_CODE_SUCCESS; } @@ -836,7 +902,8 @@ void mndAddConsensusTasks(SCheckpointConsensusInfo *pInfo, const SRestoreCheckpo SCheckpointConsensusEntry info = {.ts = taosGetTimestampMs()}; memcpy(&info.req, pRestoreInfo, sizeof(info.req)); - for (int32_t i = 0; i < taosArrayGetSize(pInfo->pTaskList); ++i) { + int32_t num = (int32_t) taosArrayGetSize(pInfo->pTaskList); + for (int32_t i = 0; i < num; ++i) { SCheckpointConsensusEntry *p = taosArrayGet(pInfo->pTaskList, i); if (p == NULL) { continue; @@ -844,10 +911,12 @@ void mndAddConsensusTasks(SCheckpointConsensusInfo *pInfo, const SRestoreCheckpo if (p->req.taskId == info.req.taskId) { mDebug("s-task:0x%x already in consensus-checkpointId list for stream:0x%" PRIx64 ", update ts %" PRId64 - "->%" PRId64 " total existed:%d", - pRestoreInfo->taskId, pRestoreInfo->streamId, p->req.startTs, info.req.startTs, - (int32_t)taosArrayGetSize(pInfo->pTaskList)); + "->%" PRId64 " checkpointId:%" PRId64 " -> %" PRId64 " total existed:%d", + pRestoreInfo->taskId, pRestoreInfo->streamId, p->req.startTs, info.req.startTs, p->req.checkpointId, + info.req.checkpointId, num); p->req.startTs = info.req.startTs; + p->req.checkpointId = info.req.checkpointId; + p->req.transId = info.req.transId; return; } } @@ -856,7 +925,7 @@ void mndAddConsensusTasks(SCheckpointConsensusInfo *pInfo, const SRestoreCheckpo if (p == NULL) { mError("s-task:0x%x failed to put task into consensus-checkpointId list, code: out of memory", info.req.taskId); } else { - int32_t num = taosArrayGetSize(pInfo->pTaskList); + num = taosArrayGetSize(pInfo->pTaskList); mDebug("s-task:0x%x checkpointId:%" PRId64 " added into consensus-checkpointId list, stream:0x%" PRIx64 " waiting tasks:%d", pRestoreInfo->taskId, pRestoreInfo->checkpointId, pRestoreInfo->streamId, num); @@ -868,7 +937,7 @@ void mndClearConsensusRspEntry(SCheckpointConsensusInfo *pInfo) { pInfo->pTaskList = NULL; } -int64_t mndClearConsensusCheckpointId(SHashObj *pHash, int64_t streamId) { +int32_t mndClearConsensusCheckpointId(SHashObj *pHash, int64_t streamId) { int32_t code = 0; int32_t numOfStreams = taosHashGetSize(pHash); if (numOfStreams == 0) { @@ -885,7 +954,7 @@ int64_t mndClearConsensusCheckpointId(SHashObj *pHash, int64_t streamId) { return code; } -int64_t mndClearChkptReportInfo(SHashObj *pHash, int64_t streamId) { +int32_t mndClearChkptReportInfo(SHashObj *pHash, int64_t streamId) { int32_t code = 0; int32_t numOfStreams = taosHashGetSize(pHash); if (numOfStreams == 0) { diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index d3eba382c9..1a49e50547 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -604,6 +604,17 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV streamMutexLock(&pTask->lock); + // not update the checkpoint info if the checkpointId is less than the failed checkpointId + if (pReq->checkpointId < pInfo->pActiveInfo->failedId) { + stWarn("s-task:%s vgId:%d not update the checkpoint-info, since update checkpointId:%" PRId64 + " is less than the failed checkpointId:%" PRId64 ", discard the update info", + id, vgId, pReq->checkpointId, pInfo->pActiveInfo->failedId); + streamMutexUnlock(&pTask->lock); + + // always return true + return TSDB_CODE_SUCCESS; + } + if (pReq->checkpointId <= pInfo->checkpointId) { stDebug("s-task:%s vgId:%d latest checkpointId:%" PRId64 " Ver:%" PRId64 " no need to update checkpoint info, updated checkpointId:%" PRId64 " Ver:%" PRId64 " transId:%d ignored", @@ -638,9 +649,9 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV pInfo->checkpointTime, pReq->checkpointTs); } else { // not in restore status, must be in checkpoint status if ((pStatus.state == TASK_STATUS__CK) || (pMeta->role == NODE_ROLE_FOLLOWER)) { - stDebug("s-task:%s vgId:%d status:%s start to update the checkpoint-info, checkpointId:%" PRId64 "->%" PRId64 + stDebug("s-task:%s vgId:%d status:%s role:%d start to update the checkpoint-info, checkpointId:%" PRId64 "->%" PRId64 " checkpointVer:%" PRId64 "->%" PRId64 " checkpointTs:%" PRId64 "->%" PRId64, - id, vgId, pStatus.name, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer, + id, vgId, pStatus.name, pMeta->role, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer, pReq->checkpointVer, pInfo->checkpointTime, pReq->checkpointTs); } else { stDebug("s-task:%s vgId:%d status:%s NOT update the checkpoint-info, checkpointId:%" PRId64 "->%" PRId64