diff --git a/include/common/rsync.h b/include/common/rsync.h index 5bb98f9eab..30f542857e 100644 --- a/include/common/rsync.h +++ b/include/common/rsync.h @@ -13,8 +13,8 @@ extern "C" { void stopRsync(); int32_t startRsync(); -int32_t uploadByRsync(const char* id, const char* path); -int32_t downloadRsync(const char* id, const char* path); +int32_t uploadByRsync(const char* id, const char* path, int64_t checkpointId); +int32_t downloadByRsync(const char* id, const char* path, int64_t checkpointId); int32_t deleteRsync(const char* id); #ifdef __cplusplus diff --git a/include/libs/stream/streamMsg.h b/include/libs/stream/streamMsg.h index 34921daac3..0ceaa93a72 100644 --- a/include/libs/stream/streamMsg.h +++ b/include/libs/stream/streamMsg.h @@ -164,6 +164,7 @@ int32_t tDecodeStreamTaskCheckpointReq(SDecoder* pDecoder, SStreamTaskCheckpoint typedef struct SStreamHbMsg { int32_t vgId; int32_t msgId; + int64_t ts; int32_t numOfTasks; SArray* pTaskStatus; // SArray SArray* pUpdateNodes; // SArray, needs update the epsets in stream tasks for those nodes. diff --git a/source/common/src/rsync.c b/source/common/src/rsync.c index c7044864ae..b53c4e416e 100644 --- a/source/common/src/rsync.c +++ b/source/common/src/rsync.c @@ -163,7 +163,7 @@ int32_t startRsync() { return code; } -int32_t uploadByRsync(const char* id, const char* path) { +int32_t uploadByRsync(const char* id, const char* path, int64_t checkpointId) { int64_t st = taosGetTimestampMs(); char command[PATH_MAX] = {0}; @@ -203,12 +203,12 @@ int32_t uploadByRsync(const char* id, const char* path) { // prepare the data directory int32_t code = execCommand(command); if (code != 0) { - uError("[rsync] s-task:%s prepare checkpoint data in %s to %s failed, code:%d," ERRNO_ERR_FORMAT, id, path, + uError("[rsync] s-task:%s prepare checkpoint dir in %s to %s failed, code:%d," ERRNO_ERR_FORMAT, id, path, tsSnodeAddress, code, ERRNO_ERR_DATA); code = TAOS_SYSTEM_ERROR(errno); } else { int64_t el = (taosGetTimestampMs() - st); - uDebug("[rsync] s-task:%s prepare checkpoint data in:%s to %s successfully, elapsed time:%" PRId64 "ms", id, path, + uDebug("[rsync] s-task:%s prepare checkpoint dir in:%s to %s successfully, elapsed time:%" PRId64 "ms", id, path, tsSnodeAddress, el); } @@ -222,7 +222,7 @@ int32_t uploadByRsync(const char* id, const char* path) { #endif snprintf(command, PATH_MAX, "rsync -av --debug=all --log-file=%s/rsynclog --delete --timeout=10 --bwlimit=100000 %s/ " - "rsync://%s/checkpoint/%s/data/", + "rsync://%s/checkpoint/%s/%" PRId64 "/", tsLogDir, #ifdef WINDOWS pathTransform @@ -230,11 +230,11 @@ int32_t uploadByRsync(const char* id, const char* path) { path #endif , - tsSnodeAddress, id); + tsSnodeAddress, id, checkpointId); } else { snprintf(command, PATH_MAX, "rsync -av --debug=all --log-file=%s/rsynclog --delete --timeout=10 --bwlimit=100000 %s " - "rsync://%s/checkpoint/%s/data/", + "rsync://%s/checkpoint/%s/%" PRId64 "/", tsLogDir, #ifdef WINDOWS pathTransform @@ -242,7 +242,7 @@ int32_t uploadByRsync(const char* id, const char* path) { path #endif , - tsSnodeAddress, id); + tsSnodeAddress, id, checkpointId); } code = execCommand(command); @@ -260,7 +260,7 @@ int32_t uploadByRsync(const char* id, const char* path) { } // abort from retry if quit -int32_t downloadRsync(const char* id, const char* path) { +int32_t downloadByRsync(const char* id, const char* path, int64_t checkpointId) { int64_t st = taosGetTimestampMs(); int32_t MAX_RETRY = 10; int32_t times = 0; @@ -274,8 +274,9 @@ int32_t downloadRsync(const char* id, const char* path) { char command[PATH_MAX] = {0}; snprintf( command, PATH_MAX, - "rsync -av --debug=all --log-file=%s/rsynclog --timeout=10 --bwlimit=100000 rsync://%s/checkpoint/%s/data/ %s", - tsLogDir, tsSnodeAddress, id, + "rsync -av --debug=all --log-file=%s/rsynclog --timeout=10 --bwlimit=100000 rsync://%s/checkpoint/%s/%" PRId64 + "/ %s", + tsLogDir, tsSnodeAddress, id, checkpointId, #ifdef WINDOWS pathTransform #else @@ -283,19 +284,49 @@ int32_t downloadRsync(const char* id, const char* path) { #endif ); - uDebug("[rsync] %s start to sync data from remote to:%s, %s", id, path, command); + uDebug("[rsync] %s start to sync data from remote to:%s, cmd:%s", id, path, command); + + code = execCommand(command); + if (code != TSDB_CODE_SUCCESS) { + uError("[rsync] %s download checkpointId:%" PRId64 + " data:%s failed, retry after 1sec, times:%d, code:%d," ERRNO_ERR_FORMAT, + id, checkpointId, path, times, code, ERRNO_ERR_DATA); + } else { + int32_t el = taosGetTimestampMs() - st; + uDebug("[rsync] %s download checkpointId:%" PRId64 " data:%s successfully, elapsed time:%dms", id, checkpointId, + path, el); + } + + if (code != TSDB_CODE_SUCCESS) { // if failed, try to load it from data directory +#ifdef WINDOWS + memset(pathTransform, 0, PATH_MAX); + changeDirFromWindowsToLinux(path, pathTransform); +#endif + + memset(command, 0, PATH_MAX); + snprintf( + command, PATH_MAX, + "rsync -av --debug=all --log-file=%s/rsynclog --timeout=10 --bwlimit=100000 rsync://%s/checkpoint/%s/data/ %s", + tsLogDir, tsSnodeAddress, id, +#ifdef WINDOWS + pathTransform +#else + path +#endif + ); + + uDebug("[rsync] %s start to sync data from remote data dir to:%s, cmd:%s", id, path, command); - while (times++ < MAX_RETRY) { code = execCommand(command); if (code != TSDB_CODE_SUCCESS) { - uError("[rsync] %s download checkpoint data:%s failed, retry after 1sec, times:%d, code:%d," ERRNO_ERR_FORMAT, id, - path, times, code, ERRNO_ERR_DATA); - taosSsleep(1); - code = TAOS_SYSTEM_ERROR(errno); + uError("[rsync] %s download checkpointId:%" PRId64 + " data:%s failed, retry after 1sec, times:%d, code:%d," ERRNO_ERR_FORMAT, + id, checkpointId, path, times, code, ERRNO_ERR_DATA); + code = TAOS_SYSTEM_ERROR(code); } else { int32_t el = taosGetTimestampMs() - st; - uDebug("[rsync] %s download checkpoint data:%s successfully, elapsed time:%dms", id, path, el); - break; + uDebug("[rsync] %s download checkpointId:%" PRId64 " data:%s successfully, elapsed time:%dms", id, checkpointId, + path, el); } } return code; diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index 6de9665db9..8186fcd4e9 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -109,7 +109,7 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { int32_t svrVer = 0; (void)taosVersionStrToInt(version, &svrVer); if ((code = taosCheckVersionCompatible(pRpc->info.cliVer, svrVer, 3)) != 0) { - dError("Version not compatible, cli ver: %d, svr ver: %d", pRpc->info.cliVer, svrVer); + dError("Version not compatible, cli ver: %d, svr ver: %d, ip:0x%x", pRpc->info.cliVer, svrVer, pRpc->info.conn.clientIp); goto _OVER; } diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index d713de5158..a5d91c8aa8 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -57,6 +57,12 @@ typedef struct SStreamTaskResetMsg { int32_t transId; } SStreamTaskResetMsg; +typedef struct SChkptReportInfo { + SArray* pTaskList; + int64_t reportChkpt; + int64_t streamId; +} SChkptReportInfo; + typedef struct SStreamExecInfo { bool initTaskList; SArray *pNodeList; @@ -66,9 +72,9 @@ typedef struct SStreamExecInfo { SArray *pTaskList; TdThreadMutex lock; SHashObj *pTransferStateStreams; - SHashObj *pChkptStreams; + SHashObj *pChkptStreams; // use to update the checkpoint info, if all tasks send the checkpoint-report msgs SHashObj *pStreamConsensus; - SArray *pKilledChkptTrans; // SArray + SArray *pKilledChkptTrans; // SArray } SStreamExecInfo; extern SStreamExecInfo execInfo; @@ -79,6 +85,8 @@ typedef struct SNodeEntry { bool stageUpdated; // the stage has been updated due to the leader/follower change or node reboot. SEpSet epset; // compare the epset to identify the vgroup tranferring between different dnodes. int64_t hbTimestamp; // second + int32_t lastHbMsgId; // latest hb msgId + int64_t lastHbMsgTs; } SNodeEntry; typedef struct { @@ -151,6 +159,8 @@ int32_t mndGetConsensusInfo(SHashObj *pHash, int64_t streamId, int32_t numOfTask void mndAddConsensusTasks(SCheckpointConsensusInfo *pInfo, const SRestoreCheckpointInfo *pRestoreInfo); void mndClearConsensusRspEntry(SCheckpointConsensusInfo *pInfo); int64_t mndClearConsensusCheckpointId(SHashObj* pHash, int64_t streamId); +int64_t mndClearChkptReportInfo(SHashObj* pHash, int64_t streamId); +int32_t mndResetChkptReportInfo(SHashObj* pHash, int64_t streamId); int32_t setStreamAttrInResBlock(SStreamObj *pStream, SSDataBlock *pBlock, int32_t numOfRows); int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SSDataBlock *pBlock, int32_t numOfRows); diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index c6b4de74d1..dc8f494914 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -2139,7 +2139,7 @@ static int32_t refreshNodeListFromExistedStreams(SMnode *pMnode, SArray *pNodeLi break; } - SNodeEntry entry = {.hbTimestamp = -1, .nodeId = pTask->info.nodeId}; + SNodeEntry entry = {.hbTimestamp = -1, .nodeId = pTask->info.nodeId, .lastHbMsgId = -1}; epsetAssign(&entry.epset, &pTask->info.epSet); (void)taosHashPut(pHash, &entry.nodeId, sizeof(entry.nodeId), &entry, sizeof(entry)); } @@ -2319,7 +2319,7 @@ void saveTaskAndNodeInfoIntoBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) } if (!exist) { - SNodeEntry nodeEntry = {.hbTimestamp = -1, .nodeId = pTask->info.nodeId}; + SNodeEntry nodeEntry = {.hbTimestamp = -1, .nodeId = pTask->info.nodeId, .lastHbMsgId = -1}; epsetAssign(&nodeEntry.epset, &pTask->info.epSet); void* px = taosArrayPush(pExecNode->pNodeList, &nodeEntry); @@ -2454,8 +2454,45 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) { return 0; } -static void doAddReportStreamTask(SArray* pList, const SCheckpointReport* pReport) { - bool existed = false; +// valid the info according to the HbMsg +static bool validateChkptReport(const SCheckpointReport *pReport, int64_t reportChkptId) { + STaskId id = {.streamId = pReport->streamId, .taskId = pReport->taskId}; + STaskStatusEntry *pTaskEntry = taosHashGet(execInfo.pTaskMap, &id, sizeof(id)); + if (pTaskEntry == NULL) { + mError("invalid checkpoint-report msg from task:0x%x, discard", pReport->taskId); + return false; + } + + if (pTaskEntry->checkpointInfo.latestId >= pReport->checkpointId) { + mError("s-task:0x%x invalid checkpoint-report msg, checkpointId:%" PRId64 " saved checkpointId:%" PRId64 " discard", + pReport->taskId, pReport->checkpointId, pTaskEntry->checkpointInfo.activeId); + return false; + } + + // now the task in checkpoint procedure + if ((pTaskEntry->checkpointInfo.activeId != 0) && (pTaskEntry->checkpointInfo.activeId > pReport->checkpointId)) { + mError("s-task:0x%x invalid checkpoint-report msg, checkpointId:%" PRId64 " active checkpointId:%" PRId64 + " discard", + pReport->taskId, pReport->checkpointId, pTaskEntry->checkpointInfo.activeId); + return false; + } + + if (reportChkptId >= pReport->checkpointId) { + mError("s-task:0x%x expired checkpoint-report msg, checkpointId:%" PRId64 " already update checkpointId:%" PRId64 + " discard", + pReport->taskId, pReport->checkpointId, reportChkptId); + return false; + } + + return true; +} + +static void doAddReportStreamTask(SArray *pList, int64_t reportChkptId, const SCheckpointReport *pReport) { + bool valid = validateChkptReport(pReport, reportChkptId); + if (!valid) { + return; + } + for (int32_t i = 0; i < taosArrayGetSize(pList); ++i) { STaskChkptInfo *p = taosArrayGet(pList, i); if (p == NULL) { @@ -2463,27 +2500,38 @@ static void doAddReportStreamTask(SArray* pList, const SCheckpointReport* pRepor } if (p->taskId == pReport->taskId) { - existed = true; - break; + if (p->checkpointId > pReport->checkpointId) { + mError("s-task:0x%x invalid checkpoint-report msg, existed:%" PRId64 " req checkpointId:%" PRId64 ", discard", + pReport->taskId, p->checkpointId, pReport->checkpointId); + } else if (p->checkpointId < pReport->checkpointId) { // expired checkpoint-report msg, update it + mDebug("s-task:0x%x expired checkpoint-report msg in checkpoint-report list update from %" PRId64 "->%" PRId64, + pReport->taskId, p->checkpointId, pReport->checkpointId); + + memcpy(p, pReport, sizeof(STaskChkptInfo)); + } else { + mWarn("taskId:0x%x already in checkpoint-report list", pReport->taskId); + } + return; } } - if (!existed) { - STaskChkptInfo info = { - .streamId = pReport->streamId, - .taskId = pReport->taskId, - .transId = pReport->transId, - .dropHTask = pReport->dropHTask, - .version = pReport->checkpointVer, - .ts = pReport->checkpointTs, - .checkpointId = pReport->checkpointId, - .nodeId = pReport->nodeId, - }; + STaskChkptInfo info = { + .streamId = pReport->streamId, + .taskId = pReport->taskId, + .transId = pReport->transId, + .dropHTask = pReport->dropHTask, + .version = pReport->checkpointVer, + .ts = pReport->checkpointTs, + .checkpointId = pReport->checkpointId, + .nodeId = pReport->nodeId, + }; - void* p = taosArrayPush(pList, &info); - if (p == NULL) { - mError("failed to put into task list, taskId:0x%x", pReport->taskId); - } + void *p = taosArrayPush(pList, &info); + if (p == NULL) { + mError("failed to put into task list, taskId:0x%x", pReport->taskId); + } else { + int32_t size = taosArrayGetSize(pList); + mDebug("stream:0x%"PRIx64" %d tasks has send checkpoint-report", pReport->streamId, size); } } @@ -2530,23 +2578,23 @@ int32_t mndProcessCheckpointReport(SRpcMsg *pReq) { int32_t numOfTasks = (pStream == NULL) ? 0 : mndGetNumOfStreamTasks(pStream); - SArray **pReqTaskList = (SArray **)taosHashGet(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId)); - if (pReqTaskList == NULL) { - SArray *pList = taosArrayInit(4, sizeof(STaskChkptInfo)); - if (pList != NULL) { - doAddReportStreamTask(pList, &req); - code = taosHashPut(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId), &pList, POINTER_BYTES); + SChkptReportInfo *pInfo = (SChkptReportInfo*)taosHashGet(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId)); + if (pInfo == NULL) { + SChkptReportInfo info = {.pTaskList = taosArrayInit(4, sizeof(STaskChkptInfo)), .streamId = req.streamId}; + if (info.pTaskList != NULL) { + doAddReportStreamTask(info.pTaskList, info.reportChkpt, &req); + code = taosHashPut(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId), &info, sizeof(info)); if (code) { mError("stream:0x%" PRIx64 " failed to put into checkpoint stream", req.streamId); } - pReqTaskList = (SArray **)taosHashGet(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId)); + pInfo = (SChkptReportInfo *)taosHashGet(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId)); } } else { - doAddReportStreamTask(*pReqTaskList, &req); + doAddReportStreamTask(pInfo->pTaskList, pInfo->reportChkpt, &req); } - int32_t total = taosArrayGetSize(*pReqTaskList); + int32_t total = taosArrayGetSize(pInfo->pTaskList); if (total == numOfTasks) { // all tasks has send the reqs mInfo("stream:0x%" PRIx64 " %s all %d tasks send checkpoint-report, checkpoint meta-info for checkpointId:%" PRId64 " will be issued soon", diff --git a/source/dnode/mnode/impl/src/mndStreamHb.c b/source/dnode/mnode/impl/src/mndStreamHb.c index 11556a212d..59f07ce977 100644 --- a/source/dnode/mnode/impl/src/mndStreamHb.c +++ b/source/dnode/mnode/impl/src/mndStreamHb.c @@ -211,6 +211,10 @@ int32_t mndProcessResetStatusReq(SRpcMsg *pReq) { SStreamTaskResetMsg* pMsg = pReq->pCont; mndKillTransImpl(pMnode, pMsg->transId, ""); + streamMutexLock(&execInfo.lock); + (void) mndResetChkptReportInfo(execInfo.pChkptStreams, pMsg->streamId); + streamMutexUnlock(&execInfo.lock); + code = mndGetStreamObj(pMnode, pMsg->streamId, &pStream); if (pStream == NULL || code != 0) { code = TSDB_CODE_STREAM_TASK_NOT_EXIST; @@ -333,7 +337,8 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { } tDecoderClear(&decoder); - mDebug("receive stream-meta hb from vgId:%d, active numOfTasks:%d, msgId:%d", req.vgId, req.numOfTasks, req.msgId); + mDebug("receive stream-meta hb from vgId:%d, active numOfTasks:%d, HbMsgId:%d, HbMsgTs:%" PRId64, req.vgId, + req.numOfTasks, req.msgId, req.ts); pFailedChkpt = taosArrayInit(4, sizeof(SFailedCheckpointInfo)); pOrphanTasks = taosArrayInit(4, sizeof(SOrphanTask)); @@ -356,6 +361,31 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { TAOS_RETURN(TSDB_CODE_INVALID_MSG); } + for(int32_t i = 0; i < taosArrayGetSize(execInfo.pNodeList); ++i) { + SNodeEntry* pEntry = taosArrayGet(execInfo.pNodeList, i); + if (pEntry == NULL) { + continue; + } + + if (pEntry->nodeId != req.vgId) { + continue; + } + + if ((pEntry->lastHbMsgId == req.msgId) && (pEntry->lastHbMsgTs == req.ts)) { + mError("vgId:%d HbMsgId:%d already handled, bh msg discard", pEntry->nodeId, req.msgId); + + terrno = TSDB_CODE_INVALID_MSG; + doSendHbMsgRsp(terrno, &pReq->info, req.vgId, req.msgId); + + streamMutexUnlock(&execInfo.lock); + cleanupAfterProcessHbMsg(&req, pFailedChkpt, pOrphanTasks); + return terrno; + } else { + pEntry->lastHbMsgId = req.msgId; + pEntry->lastHbMsgTs = req.ts; + } + } + int32_t numOfUpdated = taosArrayGetSize(req.pUpdateNodes); if (numOfUpdated > 0) { mDebug("%d stream node(s) need updated from hbMsg(vgId:%d)", numOfUpdated, req.vgId); @@ -393,6 +423,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { SStreamObj *pStream = NULL; code = mndGetStreamObj(pMnode, p->id.streamId, &pStream); if (code) { + mError("stream obj not exist, failed to handle consensus checkpoint-info req, code:%s", tstrerror(code)); continue; } @@ -426,7 +457,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { addIntoCheckpointList(pFailedChkpt, &info); // remove failed trans from pChkptStreams - code = taosHashRemove(execInfo.pChkptStreams, &p->id.streamId, sizeof(p->id.streamId)); + code = mndResetChkptReportInfo(execInfo.pChkptStreams, p->id.streamId); if (code) { mError("failed to remove stream:0x%"PRIx64" in checkpoint stream list", p->id.streamId); } @@ -484,14 +515,14 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { } if (pMnode != NULL) { // make sure that the unit test case can work - mndStreamSendUpdateChkptInfoMsg(pMnode); + code = mndStreamSendUpdateChkptInfoMsg(pMnode); } streamMutexUnlock(&execInfo.lock); doSendHbMsgRsp(TSDB_CODE_SUCCESS, &pReq->info, req.vgId, req.msgId); - cleanupAfterProcessHbMsg(&req, pFailedChkpt, pOrphanTasks); + return code; } diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index b4bd9c8847..47b0fdb412 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -129,6 +129,8 @@ int32_t mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady, SArray **pList) { goto _err; } + *allReady = true; + while (1) { pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); if (pIter == NULL) { @@ -540,8 +542,7 @@ static int32_t doSetDropActionFromId(SMnode *pMnode, STrans *pTrans, SOrphanTask } // The epset of nodeId of this task may have been expired now, let's use the newest epset from mnode. - code = setTransAction(pTrans, pReq, sizeof(SVDropStreamTaskReq), TDMT_STREAM_TASK_DROP, &epset, 0, - TSDB_CODE_VND_INVALID_VGROUP_ID); + code = setTransAction(pTrans, pReq, sizeof(SVDropStreamTaskReq), TDMT_STREAM_TASK_DROP, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID); if (code != 0) { taosMemoryFree(pReq); return code; @@ -812,9 +813,13 @@ void removeExpiredNodeInfo(const SArray *pNodeSnapshot) { } if (pEntry->nodeId == p->nodeId) { + p->hbTimestamp = pEntry->hbTimestamp; + void* px = taosArrayPush(pValidList, p); if (px == NULL) { mError("failed to put node into list, nodeId:%d", p->nodeId); + } else { + mDebug("vgId:%d ts:%" PRId64 " HbMsgId:%d is valid", p->nodeId, p->hbTimestamp, p->lastHbMsgId); } break; } @@ -899,8 +904,9 @@ void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) { ASSERT(taosHashGetSize(pExecNode->pTaskMap) == taosArrayGetSize(pExecNode->pTaskList)); - // 2. remove stream entry in consensus hash table + // 2. remove stream entry in consensus hash table and checkpoint-report hash table (void) mndClearConsensusCheckpointId(execInfo.pStreamConsensus, pStream->uid); + (void) mndClearChkptReportInfo(execInfo.pChkptStreams, pStream->uid); streamMutexUnlock(&pExecNode->lock); destroyStreamTaskIter(pIter); @@ -968,9 +974,8 @@ int32_t removeExpiredNodeEntryAndTaskInBuf(SArray *pNodeSnapshot) { static int32_t doSetUpdateChkptAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask) { SVUpdateCheckpointInfoReq *pReq = taosMemoryCalloc(1, sizeof(SVUpdateCheckpointInfoReq)); if (pReq == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; mError("failed to malloc in reset stream, size:%" PRIzu ", code:%s", sizeof(SVUpdateCheckpointInfoReq), - tstrerror(TSDB_CODE_OUT_OF_MEMORY)); + tstrerror(terrno)); return terrno; } @@ -978,12 +983,14 @@ static int32_t doSetUpdateChkptAction(SMnode *pMnode, STrans *pTrans, SStreamTas pReq->taskId = pTask->id.taskId; pReq->streamId = pTask->id.streamId; - SArray **pReqTaskList = (SArray **)taosHashGet(execInfo.pChkptStreams, &pTask->id.streamId, sizeof(pTask->id.streamId)); - ASSERT(pReqTaskList); + SChkptReportInfo *pStreamItem = (SChkptReportInfo*)taosHashGet(execInfo.pChkptStreams, &pTask->id.streamId, sizeof(pTask->id.streamId)); + if (pStreamItem == NULL) { + return TSDB_CODE_INVALID_PARA; + } - int32_t size = taosArrayGetSize(*pReqTaskList); + int32_t size = taosArrayGetSize(pStreamItem->pTaskList); for(int32_t i = 0; i < size; ++i) { - STaskChkptInfo* pInfo = taosArrayGet(*pReqTaskList, i); + STaskChkptInfo* pInfo = taosArrayGet(pStreamItem->pTaskList, i); if (pInfo == NULL) { continue; } @@ -1058,11 +1065,12 @@ int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq) { } mDebug("start to scan checkpoint report info"); + streamMutexLock(&execInfo.lock); while ((pIter = taosHashIterate(execInfo.pChkptStreams, pIter)) != NULL) { - SArray *pList = *(SArray **)pIter; + SChkptReportInfo* px = (SChkptReportInfo *)pIter; - STaskChkptInfo *pInfo = taosArrayGet(pList, 0); + STaskChkptInfo *pInfo = taosArrayGet(px->pTaskList, 0); if (pInfo == NULL) { continue; } @@ -1075,12 +1083,11 @@ int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq) { if (p == NULL) { mError("failed to put stream into drop list:0x%" PRIx64, pInfo->streamId); } - continue; } int32_t total = mndGetNumOfStreamTasks(pStream); - int32_t existed = (int32_t)taosArrayGetSize(pList); + int32_t existed = (int32_t)taosArrayGetSize(px->pTaskList); if (total == existed) { mDebug("stream:0x%" PRIx64 " %s all %d tasks send checkpoint-report, start to update checkpoint-info", @@ -1088,14 +1095,11 @@ int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq) { bool conflict = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHKPT_UPDATE_NAME, false); if (!conflict) { - code = mndCreateStreamChkptInfoUpdateTrans(pMnode, pStream, pList); + code = mndCreateStreamChkptInfoUpdateTrans(pMnode, pStream, px->pTaskList); if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) { // remove this entry - void* p = taosArrayPush(pDropped, &pInfo->streamId); - if (p == NULL) { - mError("failed to remove stream:0x%" PRIx64, pInfo->streamId); - } else { - mDebug("stream:0x%" PRIx64 " removed", pInfo->streamId); - } + taosArrayClear(px->pTaskList); + px->reportChkpt = pInfo->checkpointId; + mDebug("stream:0x%" PRIx64 " clear checkpoint-report list", pInfo->streamId); } else { mDebug("stream:0x%" PRIx64 " not launch chkpt-meta update trans, due to checkpoint not finished yet", pInfo->streamId); @@ -1130,6 +1134,8 @@ int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq) { mDebug("drop %d stream(s) in checkpoint-report list, remain:%d", size, numOfStreams); } + streamMutexUnlock(&execInfo.lock); + taosArrayDestroy(pDropped); return TSDB_CODE_SUCCESS; } @@ -1314,7 +1320,7 @@ int64_t mndClearConsensusCheckpointId(SHashObj* pHash, int64_t streamId) { int32_t code = 0; int32_t numOfStreams = taosHashGetSize(pHash); if (numOfStreams == 0) { - return TSDB_CODE_SUCCESS; + return code; } code = taosHashRemove(pHash, &streamId, sizeof(streamId)); @@ -1327,6 +1333,35 @@ int64_t mndClearConsensusCheckpointId(SHashObj* pHash, int64_t streamId) { return code; } +int64_t mndClearChkptReportInfo(SHashObj* pHash, int64_t streamId) { + int32_t code = 0; + int32_t numOfStreams = taosHashGetSize(pHash); + if (numOfStreams == 0) { + return code; + } + + code = taosHashRemove(pHash, &streamId, sizeof(streamId)); + if (code == 0) { + mDebug("drop stream:0x%" PRIx64 " in chkpt-report list, remain:%d", streamId, numOfStreams); + } else { + mError("failed to remove stream:0x%"PRIx64" in chkpt-report list, remain:%d", streamId, numOfStreams); + } + + return code; +} + +int32_t mndResetChkptReportInfo(SHashObj* pHash, int64_t streamId) { + SChkptReportInfo* pInfo = taosHashGet(pHash, &streamId, sizeof(streamId)); + if (pInfo != NULL) { + taosArrayClear(pInfo->pTaskList); + mDebug("stream:0x%" PRIx64 " checkpoint-report list cleared, prev report checkpointId:%" PRId64, streamId, + pInfo->reportChkpt); + return 0; + } + + return TSDB_CODE_MND_STREAM_NOT_EXIST; +} + static void mndShowStreamStatus(char *dst, SStreamObj *pStream) { int8_t status = atomic_load_8(&pStream->status); if (status == STREAM_STATUS__NORMAL) { diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 416438b2ec..43684b9641 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -362,7 +362,7 @@ int32_t streamTaskSnapReaderClose(SStreamTaskReader* pReader); int32_t streamTaskSnapRead(SStreamTaskReader* pReader, uint8_t** ppData); int32_t streamTaskSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamTaskWriter** ppWriter); -int32_t streamTaskSnapWriterClose(SStreamTaskWriter* ppWriter, int8_t rollback); +int32_t streamTaskSnapWriterClose(SStreamTaskWriter* ppWriter, int8_t rollback, int8_t loadTask); int32_t streamTaskSnapWrite(SStreamTaskWriter* pWriter, uint8_t* pData, uint32_t nData); int32_t streamStateSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamStateReader** ppReader); diff --git a/source/dnode/vnode/src/tq/tqStreamTaskSnap.c b/source/dnode/vnode/src/tq/tqStreamTaskSnap.c index c89ad807c7..8dff60c4cf 100644 --- a/source/dnode/vnode/src/tq/tqStreamTaskSnap.c +++ b/source/dnode/vnode/src/tq/tqStreamTaskSnap.c @@ -192,7 +192,7 @@ int32_t streamTaskSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamTa return code; } -int32_t streamTaskSnapWriterClose(SStreamTaskWriter* pWriter, int8_t rollback) { +int32_t streamTaskSnapWriterClose(SStreamTaskWriter* pWriter, int8_t rollback, int8_t loadTask) { int32_t code = 0; STQ* pTq = pWriter->pTq; @@ -213,6 +213,10 @@ int32_t streamTaskSnapWriterClose(SStreamTaskWriter* pWriter, int8_t rollback) { } streamMetaWUnLock(pTq->pStreamMeta); taosMemoryFree(pWriter); + + if (loadTask == 1) { + streamMetaLoadAllTasks(pTq->pStreamMeta); + } return code; _err: diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index a4c490e9b5..faca2020c5 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -563,7 +563,7 @@ int32_t tqStreamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg) pTask->id.idStr, req.downstreamTaskId, req.downstreamNodeId); } - code = streamProcessCheckpointReadyMsg(pTask, req.checkpointId, req.downstreamTaskId, req.downstreamNodeId); + code = streamProcessCheckpointReadyMsg(pTask, req.checkpointId, req.downstreamNodeId, req.downstreamTaskId); streamMetaReleaseTask(pMeta, pTask); if (code) { return code; @@ -996,7 +996,13 @@ int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg) int64_t checkpointId = 0; streamTaskGetActiveCheckpointInfo(pTask, &transId, &checkpointId); - ASSERT(checkpointId == pReq->checkpointId); + if (checkpointId != pReq->checkpointId) { + tqError("s-task:%s invalid checkpoint-trigger retrieve msg from 0x%" PRIx64 ", current checkpointId:%" PRId64 + " req:%" PRId64, + pTask->id.idStr, pReq->downstreamTaskId, checkpointId, pReq->checkpointId); + streamMetaReleaseTask(pMeta, pTask); + return TSDB_CODE_INVALID_MSG; + } if (streamTaskAlreadySendTrigger(pTask, pReq->downstreamNodeId)) { // re-send the lost checkpoint-trigger msg to downstream task diff --git a/source/dnode/vnode/src/vnd/vnodeSnapshot.c b/source/dnode/vnode/src/vnd/vnodeSnapshot.c index 92e2ffeb7f..a54beff00b 100644 --- a/source/dnode/vnode/src/vnd/vnodeSnapshot.c +++ b/source/dnode/vnode/src/vnd/vnodeSnapshot.c @@ -722,7 +722,8 @@ int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot * } if (pWriter->pStreamTaskWriter) { - code = streamTaskSnapWriterClose(pWriter->pStreamTaskWriter, rollback); + code = streamTaskSnapWriterClose(pWriter->pStreamTaskWriter, rollback, pWriter->pStreamStateWriter == NULL ? 1 : 0); + if (code) goto _exit; } diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 93d2edd639..350bd35490 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -48,23 +48,30 @@ extern "C" { #define stTrace(...) do { if (stDebugFlag & DEBUG_TRACE) { taosPrintLog("STM ", DEBUG_TRACE, stDebugFlag, __VA_ARGS__); }} while(0) // clang-format on +typedef struct SStreamTmrInfo { + int32_t activeCounter; // make sure only launch one checkpoint trigger check tmr + tmr_h tmrHandle; + int64_t launchChkptId; + int8_t isActive; +} SStreamTmrInfo; + struct SActiveCheckpointInfo { - TdThreadMutex lock; - int32_t transId; - int64_t firstRecvTs; // first time to recv checkpoint trigger info - int64_t activeId; // current active checkpoint id - int64_t failedId; - bool dispatchTrigger; - SArray* pDispatchTriggerList; // SArray - SArray* pReadyMsgList; // SArray - int8_t allUpstreamTriggerRecv; - SArray* pCheckpointReadyRecvList; // SArray - int32_t checkCounter; - tmr_h pChkptTriggerTmr; - int32_t sendReadyCheckCounter; - tmr_h pSendReadyMsgTmr; + TdThreadMutex lock; + int32_t transId; + int64_t firstRecvTs; // first time to recv checkpoint trigger info + int64_t activeId; // current active checkpoint id + int64_t failedId; + bool dispatchTrigger; + SArray* pDispatchTriggerList; // SArray + SArray* pReadyMsgList; // SArray + int8_t allUpstreamTriggerRecv; + SArray* pCheckpointReadyRecvList; // SArray + SStreamTmrInfo chkptTriggerMsgTmr; + SStreamTmrInfo chkptReadyMsgTmr; }; +int32_t streamCleanBeforeQuitTmr(SStreamTmrInfo* pInfo, SStreamTask* pTask); + typedef struct { int8_t type; SSDataBlock* pBlock; @@ -222,7 +229,7 @@ int32_t streamMetaSendHbHelper(SStreamMeta* pMeta); ECHECKPOINT_BACKUP_TYPE streamGetCheckpointBackupType(); -int32_t streamTaskDownloadCheckpointData(const char* id, char* path); +int32_t streamTaskDownloadCheckpointData(const char* id, char* path, int64_t checkpointId); int32_t streamTaskOnNormalTaskReady(SStreamTask* pTask); int32_t streamTaskOnScanHistoryTaskReady(SStreamTask* pTask); diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index d3db0347e2..537aa72d91 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -447,7 +447,7 @@ int32_t rebuildFromRemoteChkp_rsync(const char* key, char* checkpointPath, int64 cleanDir(defaultPath, key); stDebug("clear local default dir before downloading checkpoint data:%s succ", defaultPath); - code = streamTaskDownloadCheckpointData(key, checkpointPath); + code = streamTaskDownloadCheckpointData(key, checkpointPath, checkpointId); if (code != 0) { stError("failed to download checkpoint data:%s", key); return code; @@ -482,7 +482,7 @@ int32_t rebuildDataFromS3(char* chkpPath, int64_t chkpId) { int32_t rebuildFromRemoteChkp_s3(const char* key, char* chkpPath, int64_t chkpId, char* defaultPath) { int8_t rename = 0; - int32_t code = streamTaskDownloadCheckpointData(key, chkpPath); + int32_t code = streamTaskDownloadCheckpointData(key, chkpPath, chkpId); if (code != 0) { return code; } @@ -683,7 +683,7 @@ static int32_t rebuildFromLocalCheckpoint(const char* pTaskIdStr, const char* ch defaultPath); } } else { - code = TSDB_CODE_FAILED; + code = terrno; stError("%s no valid data for checkpointId:%" PRId64 " in %s", pTaskIdStr, checkpointId, checkpointPath); } @@ -763,7 +763,7 @@ int32_t restoreCheckpointData(const char* path, const char* key, int64_t chkptId } if (code != 0) { - stError("failed to start stream backend at %s, restart from default defaultPath:%s, reason:%s", checkpointPath, + stError("failed to start stream backend at %s, restart from defaultPath:%s, reason:%s", checkpointPath, defaultPath, tstrerror(code)); code = 0; // reset the error code } diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 9da7a5d9c8..c555da9865 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -20,7 +20,7 @@ static int32_t downloadCheckpointDataByName(const char* id, const char* fname, const char* dstName); static int32_t deleteCheckpointFile(const char* id, const char* name); -static int32_t streamTaskUploadCheckpoint(const char* id, const char* path); +static int32_t streamTaskUploadCheckpoint(const char* id, const char* path, int64_t checkpointId); static int32_t deleteCheckpoint(const char* id); static int32_t downloadCheckpointByNameS3(const char* id, const char* fname, const char* dstName); static int32_t continueDispatchCheckpointTriggerBlock(SStreamDataBlock* pBlock, SStreamTask* pTask); @@ -297,14 +297,26 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock return code; } - int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); - stDebug("s-task:%s start checkpoint-trigger monitor in 10s, ref:%d ", pTask->id.idStr, ref); - streamMetaAcquireOneTask(pTask); + // if previous launched timer not started yet, not start a new timer + // todo: fix this bug: previous set checkpoint-trigger check tmr is running, while we happen to try to launch + // a new checkpoint-trigger timer right now. + // And if we don't start a new timer, and the lost of checkpoint-trigger message may cause the whole checkpoint + // procedure to be stucked. + SStreamTmrInfo* pTmrInfo = &pActiveInfo->chkptTriggerMsgTmr; + int8_t old = atomic_val_compare_exchange_8(&pTmrInfo->isActive, 0, 1); + if (old == 0) { + int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); + stDebug("s-task:%s start checkpoint-trigger monitor in 10s, ref:%d ", pTask->id.idStr, ref); + streamMetaAcquireOneTask(pTask); - if (pActiveInfo->pChkptTriggerTmr == NULL) { - pActiveInfo->pChkptTriggerTmr = taosTmrStart(checkpointTriggerMonitorFn, 100, pTask, streamTimer); - } else { - streamTmrReset(checkpointTriggerMonitorFn, 100, pTask, streamTimer, &pActiveInfo->pChkptTriggerTmr, vgId, "trigger-recv-monitor"); + if (pTmrInfo->tmrHandle == NULL) { + pTmrInfo->tmrHandle = taosTmrStart(checkpointTriggerMonitorFn, 200, pTask, streamTimer); + } else { + streamTmrReset(checkpointTriggerMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, "trigger-recv-monitor"); + } + pTmrInfo->launchChkptId = pActiveInfo->activeId; + } else { // already launched, do nothing + stError("s-task:%s previous checkpoint-trigger monitor tmr is set, not start new one", pTask->id.idStr); } } @@ -349,7 +361,6 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock (void)streamTaskBuildCheckpoint(pTask); // todo: not handle error yet } else { // source & agg tasks need to forward the checkpoint msg downwards stDebug("s-task:%s process checkpoint-trigger block, all %d upstreams sent, forwards to downstream", id, num); - flushStateDataInExecutor(pTask, (SStreamQueueItem*)pBlock); // Put the checkpoint-trigger block into outputQ, to make sure all blocks with less version have been handled by @@ -364,8 +375,8 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock // only when all downstream tasks are send checkpoint rsp, we can start the checkpoint procedure for the agg task static int32_t processCheckpointReadyHelp(SActiveCheckpointInfo* pInfo, int32_t numOfDownstream, int32_t downstreamNodeId, int64_t streamId, int32_t downstreamTaskId, - const char* id, int32_t* pNotReady, int32_t* pTransId) { - bool received = false; + const char* id, int32_t* pNotReady, int32_t* pTransId, bool* alreadyRecv) { + *alreadyRecv = false; int32_t size = taosArrayGetSize(pInfo->pCheckpointReadyRecvList); for (int32_t i = 0; i < size; ++i) { STaskDownstreamReadyInfo* p = taosArrayGet(pInfo->pCheckpointReadyRecvList, i); @@ -374,12 +385,12 @@ static int32_t processCheckpointReadyHelp(SActiveCheckpointInfo* pInfo, int32_t } if (p->downstreamTaskId == downstreamTaskId) { - received = true; + (*alreadyRecv) = true; break; } } - if (received) { + if (*alreadyRecv) { stDebug("s-task:%s already recv checkpoint-ready msg from downstream:0x%x, ignore. %d/%d downstream not ready", id, downstreamTaskId, (int32_t)(numOfDownstream - taosArrayGetSize(pInfo->pCheckpointReadyRecvList)), numOfDownstream); @@ -415,6 +426,7 @@ int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask, int64_t checkpointId int32_t code = 0; int32_t notReady = 0; int32_t transId = 0; + bool alreadyHandled = false; // 1. not in checkpoint status now SStreamTaskState pStat = streamTaskGetStatus(pTask); @@ -433,12 +445,17 @@ int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask, int64_t checkpointId streamMutexLock(&pInfo->lock); code = processCheckpointReadyHelp(pInfo, total, downstreamNodeId, pTask->id.streamId, downstreamTaskId, id, ¬Ready, - &transId); + &transId, &alreadyHandled); streamMutexUnlock(&pInfo->lock); - if ((notReady == 0) && (code == 0)) { - stDebug("s-task:%s all downstream tasks have completed build checkpoint, do checkpoint for current task", id); - (void)appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT, checkpointId, transId, -1); + if (alreadyHandled) { + stDebug("s-task:%s checkpoint-ready msg checkpointId:%" PRId64 " from task:0x%x already handled, not handle again", + id, checkpointId, downstreamTaskId); + } else { + if ((notReady == 0) && (code == 0) && (!alreadyHandled)) { + stDebug("s-task:%s all downstream tasks have completed build checkpoint, do checkpoint for current task", id); + (void)appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT, checkpointId, transId, -1); + } } return code; @@ -508,8 +525,8 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV streamMutexLock(&pTask->lock); if (pReq->checkpointId <= pInfo->checkpointId) { - stDebug("s-task:%s vgId:%d latest checkpointId:%" PRId64 " checkpointVer:%" PRId64 - " no need to update the checkpoint info, updated checkpointId:%" PRId64 " checkpointVer:%" PRId64 + stDebug("s-task:%s vgId:%d latest checkpointId:%" PRId64 " Ver:%" PRId64 + " no need to update checkpoint info, updated checkpointId:%" PRId64 " Ver:%" PRId64 " transId:%d ignored", id, vgId, pInfo->checkpointId, pInfo->checkpointVer, pReq->checkpointId, pReq->checkpointVer, pReq->transId); @@ -550,14 +567,13 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV ASSERT(pInfo->checkpointId <= pReq->checkpointId && pInfo->checkpointVer <= pReq->checkpointVer && pInfo->processedVer <= pReq->checkpointVer); - pInfo->checkpointId = pReq->checkpointId; - pInfo->checkpointVer = pReq->checkpointVer; - pInfo->checkpointTime = pReq->checkpointTs; - - streamTaskClearCheckInfo(pTask, true); - + // update only it is in checkpoint status. if (pStatus.state == TASK_STATUS__CK) { - // todo handle error + pInfo->checkpointId = pReq->checkpointId; + pInfo->checkpointVer = pReq->checkpointVer; + pInfo->checkpointTime = pReq->checkpointTs; + + streamTaskClearCheckInfo(pTask, true); code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE); } else { stDebug("s-task:0x%x vgId:%d not handle checkpoint-done event, status:%s", pReq->taskId, vgId, pStatus.name); @@ -670,7 +686,7 @@ int32_t uploadCheckpointData(SStreamTask* pTask, int64_t checkpointId, int64_t d } if (code == TSDB_CODE_SUCCESS) { - code = streamTaskUploadCheckpoint(idStr, path); + code = streamTaskUploadCheckpoint(idStr, path, checkpointId); if (code == TSDB_CODE_SUCCESS) { stDebug("s-task:%s upload checkpointId:%" PRId64 " to remote succ", idStr, checkpointId); } else { @@ -810,6 +826,7 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) { const char* id = pTask->id.idStr; SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo; + SStreamTmrInfo* pTmrInfo = &pActiveInfo->chkptTriggerMsgTmr; if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); @@ -820,24 +837,24 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) { // check the status every 100ms if (streamTaskShouldStop(pTask)) { - int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); stDebug("s-task:%s vgId:%d quit from monitor checkpoint-trigger, ref:%d", id, vgId, ref); streamMetaReleaseTask(pTask->pMeta, pTask); return; } - if (++pActiveInfo->checkCounter < 100) { - streamTmrReset(checkpointTriggerMonitorFn, 100, pTask, streamTimer, &pActiveInfo->pChkptTriggerTmr, vgId, "trigger-recv-monitor"); + if (++pTmrInfo->activeCounter < 50) { + streamTmrReset(checkpointTriggerMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, "trigger-recv-monitor"); return; } - pActiveInfo->checkCounter = 0; + pTmrInfo->activeCounter = 0; stDebug("s-task:%s vgId:%d checkpoint-trigger monitor in tmr, ts:%" PRId64, id, vgId, now); streamMutexLock(&pTask->lock); SStreamTaskState pState = streamTaskGetStatus(pTask); if (pState.state != TASK_STATUS__CK) { - int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); stDebug("s-task:%s vgId:%d not in checkpoint status, quit from monitor checkpoint-trigger, ref:%d", id, vgId, ref); streamMutexUnlock(&pTask->lock); @@ -847,7 +864,7 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) { // checkpoint-trigger recv flag is set, quit if (pActiveInfo->allUpstreamTriggerRecv) { - int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); stDebug("s-task:%s vgId:%d all checkpoint-trigger recv, quit from monitor checkpoint-trigger, ref:%d", id, vgId, ref); @@ -867,6 +884,31 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) { terrno = TSDB_CODE_OUT_OF_MEMORY; stDebug("s-task:%s start to triggerMonitor, reason:%s", id, tstrerror(terrno)); streamMutexUnlock(&pActiveInfo->lock); + + stDebug("s-task:%s start to monitor checkpoint-trigger in 10s", id); + streamTmrReset(checkpointTriggerMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, "trigger-recv-monitor"); + return; + } + + if ((pTmrInfo->launchChkptId != pActiveInfo->activeId) || (pActiveInfo->activeId == 0)) { + streamMutexUnlock(&pActiveInfo->lock); + int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); + stWarn("s-task:%s vgId:%d checkpoint-trigger retrieve by previous checkpoint procedure, checkpointId:%" PRId64 + ", quit, ref:%d", + id, vgId, pTmrInfo->launchChkptId, ref); + + streamMetaReleaseTask(pTask->pMeta, pTask); + return; + } + + // active checkpoint info is cleared for now + if ((pActiveInfo->activeId == 0) || (pActiveInfo->transId == 0) || (pTask->chkInfo.startTs == 0)) { + streamMutexUnlock(&pActiveInfo->lock); + int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); + stWarn("s-task:%s vgId:%d active checkpoint may be cleared, quit from retrieve checkpoint-trigger send tmr, ref:%d", + id, vgId, ref); + + streamMetaReleaseTask(pTask->pMeta, pTask); return; } @@ -900,9 +942,9 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) { // check every 100ms if (size > 0) { stDebug("s-task:%s start to monitor checkpoint-trigger in 10s", id); - streamTmrReset(checkpointTriggerMonitorFn, 100, pTask, streamTimer, &pActiveInfo->pChkptTriggerTmr, vgId, "trigger-recv-monitor"); + streamTmrReset(checkpointTriggerMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, "trigger-recv-monitor"); } else { - int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); stDebug("s-task:%s all checkpoint-trigger recved, quit from monitor checkpoint-trigger tmr, ref:%d", id, ref); streamMetaReleaseTask(pTask->pMeta, pTask); } @@ -1060,11 +1102,8 @@ void streamTaskInitTriggerDispatchInfo(SStreamTask* pTask) { streamMutexUnlock(&pInfo->lock); } -int32_t streamTaskGetNumOfConfirmed(SStreamTask* pTask) { - SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo; - +int32_t streamTaskGetNumOfConfirmed(SActiveCheckpointInfo* pInfo) { int32_t num = 0; - streamMutexLock(&pInfo->lock); for (int32_t i = 0; i < taosArrayGetSize(pInfo->pDispatchTriggerList); ++i) { STaskTriggerSendInfo* p = taosArrayGet(pInfo->pDispatchTriggerList, i); if (p == NULL) { @@ -1075,7 +1114,6 @@ int32_t streamTaskGetNumOfConfirmed(SStreamTask* pTask) { num++; } } - streamMutexUnlock(&pInfo->lock); return num; } @@ -1101,9 +1139,9 @@ void streamTaskSetTriggerDispatchConfirmed(SStreamTask* pTask, int32_t vgId) { } } + int32_t numOfConfirmed = streamTaskGetNumOfConfirmed(pInfo); streamMutexUnlock(&pInfo->lock); - int32_t numOfConfirmed = streamTaskGetNumOfConfirmed(pTask); int32_t total = streamTaskGetNumOfDownstream(pTask); if (taskId == 0) { stError("s-task:%s recv invalid trigger-dispatch confirm, vgId:%d", pTask->id.idStr, vgId); @@ -1198,7 +1236,7 @@ ECHECKPOINT_BACKUP_TYPE streamGetCheckpointBackupType() { } } -int32_t streamTaskUploadCheckpoint(const char* id, const char* path) { +int32_t streamTaskUploadCheckpoint(const char* id, const char* path, int64_t checkpointId) { int32_t code = 0; if (id == NULL || path == NULL || strlen(id) == 0 || strlen(path) == 0 || strlen(path) >= PATH_MAX) { stError("invalid parameters in upload checkpoint, %s", id); @@ -1206,7 +1244,7 @@ int32_t streamTaskUploadCheckpoint(const char* id, const char* path) { } if (strlen(tsSnodeAddress) != 0) { - code = uploadByRsync(id, path); + code = uploadByRsync(id, path, checkpointId); if (code != 0) { return TAOS_SYSTEM_ERROR(errno); } @@ -1233,14 +1271,14 @@ int32_t downloadCheckpointDataByName(const char* id, const char* fname, const ch return 0; } -int32_t streamTaskDownloadCheckpointData(const char* id, char* path) { +int32_t streamTaskDownloadCheckpointData(const char* id, char* path, int64_t checkpointId) { if (id == NULL || path == NULL || strlen(id) == 0 || strlen(path) == 0 || strlen(path) >= PATH_MAX) { stError("down checkpoint data parameters invalid"); return -1; } if (strlen(tsSnodeAddress) != 0) { - return downloadRsync(id, path); + return downloadByRsync(id, path, checkpointId); } else if (tsS3StreamEnabled) { return s3GetObjectsByPrefix(id, path); } @@ -1281,6 +1319,8 @@ int32_t streamTaskSendRestoreChkptMsg(SStreamTask* pTask) { const char* id = pTask->id.idStr; streamMutexLock(&pTask->lock); + ETaskStatus p = streamTaskGetStatus(pTask).state; + if (pTask->status.sendConsensusChkptId == true) { stDebug("s-task:%s already start to consensus-checkpointId, not start again before it completed", id); streamMutexUnlock(&pTask->lock); @@ -1291,9 +1331,15 @@ int32_t streamTaskSendRestoreChkptMsg(SStreamTask* pTask) { streamMutexUnlock(&pTask->lock); + if (pTask->pBackend != NULL) { + streamFreeTaskState(pTask, p); + pTask->pBackend = NULL; + } + ASSERT(pTask->pBackend == NULL); pTask->status.requireConsensusChkptId = true; + stDebug("s-task:%s set the require consensus-checkpointId flag", id); return 0; } diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index dfd9d3e8df..493b5013d0 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -820,31 +820,32 @@ int32_t initCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamNodeId, int32 } static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) { - SStreamTask* pTask = param; - int32_t vgId = pTask->pMeta->vgId; - const char* id = pTask->id.idStr; + SStreamTask* pTask = param; + int32_t vgId = pTask->pMeta->vgId; + const char* id = pTask->id.idStr; + SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo; + SStreamTmrInfo* pTmrInfo = &pActiveInfo->chkptReadyMsgTmr; // check the status every 100ms if (streamTaskShouldStop(pTask)) { - int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); stDebug("s-task:%s vgId:%d status:stop, quit from monitor checkpoint-trigger, ref:%d", id, vgId, ref); streamMetaReleaseTask(pTask->pMeta, pTask); return; } - SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo; - if (++pActiveInfo->sendReadyCheckCounter < 100) { - streamTmrReset(checkpointReadyMsgSendMonitorFn, 100, pTask, streamTimer, &pActiveInfo->pSendReadyMsgTmr, vgId, "chkpt-ready-monitor"); + if (++pTmrInfo->activeCounter < 50) { + streamTmrReset(checkpointReadyMsgSendMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, "chkpt-ready-monitor"); return; } - pActiveInfo->sendReadyCheckCounter = 0; - stDebug("s-task:%s in sending checkpoint-ready msg monitor timer", id); + pTmrInfo->activeCounter = 0; + stDebug("s-task:%s in sending checkpoint-ready msg monitor tmr", id); streamMutexLock(&pTask->lock); SStreamTaskState pState = streamTaskGetStatus(pTask); if (pState.state != TASK_STATUS__CK) { - int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); stDebug("s-task:%s vgId:%d status:%s not in checkpoint, quit from monitor checkpoint-ready send, ref:%d", id, vgId, pState.name, ref); streamMutexUnlock(&pTask->lock); @@ -858,10 +859,21 @@ static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) { SArray* pList = pActiveInfo->pReadyMsgList; int32_t num = taosArrayGetSize(pList); - // active checkpoint info is cleared for now - if ((pActiveInfo->activeId == 0) && (pActiveInfo->transId == 0) && (num == 0) && (pTask->chkInfo.startTs == 0)) { + if (pTmrInfo->launchChkptId != pActiveInfo->activeId) { streamMutexUnlock(&pActiveInfo->lock); - int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); + stWarn("s-task:%s vgId:%d ready-msg send tmr launched by previous checkpoint procedure, checkpointId:%" PRId64 + ", quit, ref:%d", + id, vgId, pTmrInfo->launchChkptId, ref); + + streamMetaReleaseTask(pTask->pMeta, pTask); + return; + } + + // active checkpoint info is cleared for now + if ((pActiveInfo->activeId == 0) || (pActiveInfo->transId == 0) || (num == 0) || (pTask->chkInfo.startTs == 0)) { + streamMutexUnlock(&pActiveInfo->lock); + int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); stWarn("s-task:%s vgId:%d active checkpoint may be cleared, quit from readyMsg send tmr, ref:%d", id, vgId, ref); streamMetaReleaseTask(pTask->pMeta, pTask); @@ -923,10 +935,10 @@ static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) { } } - streamTmrReset(checkpointReadyMsgSendMonitorFn, 100, pTask, streamTimer, &pActiveInfo->pSendReadyMsgTmr, vgId, "chkpt-ready-monitor"); + streamTmrReset(checkpointReadyMsgSendMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, "chkpt-ready-monitor"); streamMutexUnlock(&pActiveInfo->lock); } else { - int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); stDebug( "s-task:%s vgId:%d recv of checkpoint-ready msg confirmed by all upstream task(s), clear checkpoint-ready msg " "and quit from timer, ref:%d", @@ -975,22 +987,32 @@ int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask) { } } - streamMutexUnlock(&pActiveInfo->lock); stDebug("s-task:%s level:%d checkpoint-ready msg sent to all %d upstreams", id, pTask->info.taskLevel, num); // start to check if checkpoint ready msg has successfully received by upstream tasks. if (pTask->info.taskLevel == TASK_LEVEL__SINK || pTask->info.taskLevel == TASK_LEVEL__AGG) { - int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); - stDebug("s-task:%s start checkpoint-ready monitor in 10s, ref:%d ", pTask->id.idStr, ref); - streamMetaAcquireOneTask(pTask); + SStreamTmrInfo* pTmrInfo = &pActiveInfo->chkptReadyMsgTmr; - if (pActiveInfo->pSendReadyMsgTmr == NULL) { - pActiveInfo->pSendReadyMsgTmr = taosTmrStart(checkpointReadyMsgSendMonitorFn, 100, pTask, streamTimer); + int8_t old = atomic_val_compare_exchange_8(&pTmrInfo->isActive, 0, 1); + if (old == 0) { + int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); + stDebug("s-task:%s start checkpoint-ready monitor in 10s, ref:%d ", pTask->id.idStr, ref); + streamMetaAcquireOneTask(pTask); + + if (pTmrInfo->tmrHandle == NULL) { + pTmrInfo->tmrHandle = taosTmrStart(checkpointReadyMsgSendMonitorFn, 200, pTask, streamTimer); + } else { + streamTmrReset(checkpointReadyMsgSendMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, "chkpt-ready-monitor"); + } + + // mark the timer monitor checkpointId + pTmrInfo->launchChkptId = pActiveInfo->activeId; } else { - streamTmrReset(checkpointReadyMsgSendMonitorFn, 100, pTask, streamTimer, &pActiveInfo->pSendReadyMsgTmr, vgId, "chkpt-ready-monitor"); + stError("s-task:%s previous checkpoint-ready monitor tmr is set, not start new one", pTask->id.idStr); } } + streamMutexUnlock(&pActiveInfo->lock); return TSDB_CODE_SUCCESS; } @@ -1267,17 +1289,18 @@ static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId } } -static bool setDispatchRspInfo(SDispatchMsgInfo* pMsgInfo, int32_t vgId, int32_t code, int64_t now, int32_t* pNotRsp, const char* id) { +static bool setDispatchRspInfo(SDispatchMsgInfo* pMsgInfo, int32_t vgId, int32_t code, int64_t now, int32_t* pNotRsp, + int32_t* pFailed, const char* id) { int32_t numOfRsp = 0; - bool alreadySet = false; - bool updated = false; - bool allRsp = false; - *pNotRsp = 0; + int32_t numOfFailed = 0; - streamMutexLock(&pMsgInfo->lock); + bool allRsp = false; int32_t numOfDispatchBranch = taosArrayGetSize(pMsgInfo->pSendInfo); - for(int32_t i = 0; i < numOfDispatchBranch; ++i) { + *pNotRsp = 0; + *pFailed = 0; + + for (int32_t i = 0; i < numOfDispatchBranch; ++i) { SDispatchEntry* pEntry = taosArrayGet(pMsgInfo->pSendInfo, i); if (pEntry == NULL) { continue; @@ -1295,24 +1318,34 @@ static bool setDispatchRspInfo(SDispatchMsgInfo* pMsgInfo, int32_t vgId, int32_t } if (pEntry->nodeId == vgId) { - ASSERT(!alreadySet); - pEntry->rspTs = now; - pEntry->status = code; - alreadySet = true; - updated = true; - numOfRsp += 1; + if (pEntry->rspTs != -1) { + stDebug("s-task:%s dispatch rsp has already recved at:%" PRId64 ", ignore this rsp, msgId:%d", id, + pEntry->rspTs, pMsgInfo->msgId); + allRsp = false; + } else { + pEntry->rspTs = now; + pEntry->status = code; + numOfRsp += 1; + allRsp = (numOfRsp == numOfDispatchBranch); - stDebug("s-task:%s record the rsp recv, ts:%" PRId64 " code:%d, idx:%d, total recv:%d/%d", id, now, code, j, - numOfRsp, numOfDispatchBranch); + stDebug("s-task:%s record the rsp recv, ts:%" PRId64 " code:%d, idx:%d, total recv:%d/%d", id, now, code, j, + numOfRsp, numOfDispatchBranch); + } + break; } } + // this code may be error code. + for (int32_t i = 0; i < numOfDispatchBranch; ++i) { + SDispatchEntry* pEntry = taosArrayGet(pMsgInfo->pSendInfo, i); + if (pEntry->status != TSDB_CODE_SUCCESS || isDispatchRspTimeout(pEntry, now)) { + numOfFailed += 1; + } + } + + *pFailed = numOfFailed; *pNotRsp = numOfDispatchBranch - numOfRsp; - allRsp = (numOfRsp == numOfDispatchBranch); - streamMutexUnlock(&pMsgInfo->lock); - - ASSERT(updated); return allRsp; } @@ -1345,15 +1378,23 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i int64_t now = taosGetTimestampMs(); bool allRsp = false; int32_t notRsp = 0; + int32_t numOfFailed = 0; + bool triggerDispatchRsp = false; + + // we only set the dispatch msg info for current checkpoint trans + streamMutexLock(&pTask->lock); + triggerDispatchRsp = (streamTaskGetStatus(pTask).state == TASK_STATUS__CK) && + (pTask->chkInfo.pActiveInfo->activeId == pMsgInfo->checkpointId); + streamMutexUnlock(&pTask->lock); streamMutexLock(&pMsgInfo->lock); - int32_t msgId = pMsgInfo->msgId; - streamMutexUnlock(&pMsgInfo->lock); + int32_t msgId = pMsgInfo->msgId; // follower not handle the dispatch rsp if ((pTask->pMeta->role == NODE_ROLE_FOLLOWER) || (pTask->status.downstreamReady != 1)) { stError("s-task:%s vgId:%d is follower or task just re-launched, not handle the dispatch rsp, discard it", id, vgId); + streamMutexUnlock(&pMsgInfo->lock); return TSDB_CODE_STREAM_TASK_NOT_EXIST; } @@ -1362,6 +1403,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i stError("s-task:%s vgId:%d not expect rsp, expected: msgId:%d, stage:%" PRId64 " actual msgId:%d, stage:%" PRId64 " discard it", id, vgId, msgId, pTask->pMeta->stage, pRsp->msgId, pRsp->stage); + streamMutexUnlock(&pMsgInfo->lock); return TSDB_CODE_INVALID_MSG; } @@ -1373,18 +1415,18 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i if (code == TSDB_CODE_STREAM_TASK_NOT_EXIST) { // destination task does not exist, not retry anymore stError("s-task:%s failed to dispatch msg to task:0x%x(vgId:%d), msgId:%d no retry, since task destroyed already", id, pRsp->downstreamTaskId, pRsp->downstreamNodeId, msgId); - allRsp = setDispatchRspInfo(pMsgInfo, pRsp->downstreamNodeId, TSDB_CODE_SUCCESS, now, ¬Rsp, id); + allRsp = setDispatchRspInfo(pMsgInfo, pRsp->downstreamNodeId, TSDB_CODE_SUCCESS, now, ¬Rsp, &numOfFailed, id); } else { stError("s-task:%s failed to dispatch msgId:%d to task:0x%x(vgId:%d), code:%s, add to retry list", id, msgId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, tstrerror(code)); - allRsp = setDispatchRspInfo(pMsgInfo, pRsp->downstreamNodeId, code, now, ¬Rsp, id); + allRsp = setDispatchRspInfo(pMsgInfo, pRsp->downstreamNodeId, code, now, ¬Rsp, &numOfFailed, id); } } else { // code == 0 if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) { pTask->inputq.status = TASK_INPUT_STATUS__BLOCKED; // block the input of current task, to push pressure to upstream - allRsp = setDispatchRspInfo(pMsgInfo, pRsp->downstreamNodeId, pRsp->inputStatus, now, ¬Rsp, id); + allRsp = setDispatchRspInfo(pMsgInfo, pRsp->downstreamNodeId, pRsp->inputStatus, now, ¬Rsp, &numOfFailed, id); stTrace("s-task:%s inputQ of downstream task:0x%x(vgId:%d) is full, wait for retry dispatch", id, pRsp->downstreamTaskId, pRsp->downstreamNodeId); } else { @@ -1396,15 +1438,13 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i id, pRsp->downstreamTaskId, pRsp->downstreamNodeId); } - allRsp = setDispatchRspInfo(pMsgInfo, pRsp->downstreamNodeId, TSDB_CODE_SUCCESS, now, ¬Rsp, id); + allRsp = setDispatchRspInfo(pMsgInfo, pRsp->downstreamNodeId, TSDB_CODE_SUCCESS, now, ¬Rsp, &numOfFailed, id); { bool delayDispatch = (pMsgInfo->dispatchMsgType == STREAM_INPUT__CHECKPOINT_TRIGGER); if (delayDispatch) { - streamMutexLock(&pTask->lock); // we only set the dispatch msg info for current checkpoint trans - if (streamTaskGetStatus(pTask).state == TASK_STATUS__CK && - pTask->chkInfo.pActiveInfo->activeId == pMsgInfo->checkpointId) { + if (triggerDispatchRsp) { ASSERT(pTask->chkInfo.pActiveInfo->transId == pMsgInfo->transId); stDebug("s-task:%s checkpoint-trigger msg to 0x%x rsp for checkpointId:%" PRId64 " transId:%d confirmed", pTask->id.idStr, pRsp->downstreamTaskId, pMsgInfo->checkpointId, pMsgInfo->transId); @@ -1415,12 +1455,13 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i " transId:%d discard, since expired", pTask->id.idStr, pMsgInfo->checkpointId, pMsgInfo->transId); } - streamMutexUnlock(&pTask->lock); } } } } + streamMutexUnlock(&pMsgInfo->lock); + if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { if (!allRsp) { stDebug( @@ -1439,29 +1480,25 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i } // all msg rsp already, continue - if (allRsp) { - ASSERT(pTask->outputq.status == TASK_OUTPUT_STATUS__WAIT); + // we need to re-try send dispatch msg to downstream tasks + if (allRsp && (numOfFailed == 0)) { + // trans-state msg has been sent to downstream successfully. let's transfer the fill-history task state + if (pMsgInfo->dispatchMsgType == STREAM_INPUT__TRANS_STATE) { + stDebug("s-task:%s dispatch trans-state msgId:%d to downstream successfully, start to prepare transfer state", id, + msgId); + ASSERT(pTask->info.fillHistory == 1); - // we need to re-try send dispatch msg to downstream tasks - int32_t numOfFailed = getFailedDispatchInfo(pMsgInfo, now); - if (numOfFailed == 0) { // this message has been sent successfully, let's try next one. - // trans-state msg has been sent to downstream successfully. let's transfer the fill-history task state - if (pMsgInfo->dispatchMsgType == STREAM_INPUT__TRANS_STATE) { - stDebug("s-task:%s dispatch trans-state msgId:%d to downstream successfully, start to prepare transfer state", - id, msgId); - ASSERT(pTask->info.fillHistory == 1); - - code = streamTransferStatePrepare(pTask); - if (code != TSDB_CODE_SUCCESS) { // todo: do nothing if error happens - } - - clearBufferedDispatchMsg(pTask); - - // now ready for next data output - atomic_store_8(&pTask->outputq.status, TASK_OUTPUT_STATUS__NORMAL); - } else { - code = handleDispatchSuccessRsp(pTask, pRsp->downstreamTaskId, pRsp->downstreamNodeId); + code = streamTransferStatePrepare(pTask); + if (code != TSDB_CODE_SUCCESS) { // todo: do nothing if error happens } + + clearBufferedDispatchMsg(pTask); + + // now ready for next data output + atomic_store_8(&pTask->outputq.status, TASK_OUTPUT_STATUS__NORMAL); + } else { + // this message has been sent successfully, let's try next one. + code = handleDispatchSuccessRsp(pTask, pRsp->downstreamTaskId, pRsp->downstreamNodeId); } } diff --git a/source/libs/stream/src/streamHb.c b/source/libs/stream/src/streamHb.c index 9804943ec2..8513a8ba06 100644 --- a/source/libs/stream/src/streamHb.c +++ b/source/libs/stream/src/streamHb.c @@ -142,11 +142,12 @@ int32_t streamMetaSendHbHelper(SStreamMeta* pMeta) { } SStreamHbMsg* pMsg = &pInfo->hbMsg; - stDebug("vgId:%d build stream hbMsg, leader:%d msgId:%d", pMeta->vgId, (pMeta->role == NODE_ROLE_LEADER), - pMeta->pHbInfo->hbCount); - pMsg->vgId = pMeta->vgId; pMsg->msgId = pMeta->pHbInfo->hbCount; + pMsg->ts = taosGetTimestampMs(); + + stDebug("vgId:%d build stream hbMsg, leader:%d HbMsgId:%d, HbMsgTs:%" PRId64, pMeta->vgId, + (pMeta->role == NODE_ROLE_LEADER), pMsg->msgId, pMsg->ts); pMsg->pTaskStatus = taosArrayInit(numOfTasks, sizeof(STaskStatusEntry)); pMsg->pUpdateNodes = taosArrayInit(numOfTasks, sizeof(int32_t)); @@ -292,7 +293,7 @@ void streamMetaHbToMnode(void* param, void* tmrId) { streamMetaRLock(pMeta); code = streamMetaSendHbHelper(pMeta); if (code) { - stError("vgId:%d failed to send hmMsg to mnode, try again in 5s, code:%s", pMeta->vgId, strerror(code)); + stError("vgId:%d failed to send hmMsg to mnode, try again in 5s, code:%s", pMeta->vgId, tstrerror(code)); } streamMetaRUnLock(pMeta); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index fbcc338f09..5ed9f274a2 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -318,7 +318,19 @@ int32_t streamTaskSetDb(SStreamMeta* pMeta, SStreamTask* pTask, const char* key) pBackend->pTask = pTask; pBackend->pMeta = pMeta; - if (processVer != -1) pTask->chkInfo.processedVer = processVer; + if (processVer != -1) { + if (pTask->chkInfo.processedVer != processVer) { + stWarn("s-task:%s vgId:%d update checkpointVer:%" PRId64 "->%" PRId64 " for checkpointId:%" PRId64, + pTask->id.idStr, pTask->pMeta->vgId, pTask->chkInfo.processedVer, processVer, pTask->chkInfo.checkpointId); + pTask->chkInfo.processedVer = processVer; + pTask->chkInfo.checkpointVer = processVer; + pTask->chkInfo.nextProcessVer = processVer + 1; + } else { + stInfo("s-task:%s vgId:%d processedVer:%" PRId64 + " in task meta equals to data in checkpoint data for checkpointId:%" PRId64, + pTask->id.idStr, pTask->pMeta->vgId, pTask->chkInfo.processedVer, pTask->chkInfo.checkpointId); + } + } code = taosHashPut(pMeta->pTaskDbUnique, key, strlen(key), &pBackend, sizeof(void*)); if (code) { @@ -1407,7 +1419,6 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) { } // negotiate the consensus checkpoint id for current task - ASSERT(pTask->pBackend == NULL); code = streamTaskSendRestoreChkptMsg(pTask); // this task may has no checkpoint, but others tasks may generate checkpoint already? diff --git a/source/libs/stream/src/streamMsg.c b/source/libs/stream/src/streamMsg.c index 1a48b594ea..6fe2d818b3 100644 --- a/source/libs/stream/src/streamMsg.c +++ b/source/libs/stream/src/streamMsg.c @@ -398,6 +398,7 @@ int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) { } if (tEncodeI32(pEncoder, pReq->msgId) < 0) return -1; + if (tEncodeI64(pEncoder, pReq->ts) < 0) return -1; tEndEncode(pEncoder); return pEncoder->pos; } @@ -470,6 +471,7 @@ int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) { } if (tDecodeI32(pDecoder, &pReq->msgId) < 0) return -1; + if (tDecodeI64(pDecoder, &pReq->ts) < 0) return -1; tEndDecode(pDecoder); return 0; diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 21362ca806..c0b2b16d30 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -1140,14 +1140,16 @@ void streamTaskDestroyActiveChkptInfo(SActiveCheckpointInfo* pInfo) { taosArrayDestroy(pInfo->pCheckpointReadyRecvList); pInfo->pCheckpointReadyRecvList = NULL; - if (pInfo->pChkptTriggerTmr != NULL) { - (void) taosTmrStop(pInfo->pChkptTriggerTmr); - pInfo->pChkptTriggerTmr = NULL; + SStreamTmrInfo* pTriggerTmr = &pInfo->chkptTriggerMsgTmr; + if (pTriggerTmr->tmrHandle != NULL) { + (void) taosTmrStop(pTriggerTmr->tmrHandle); + pTriggerTmr->tmrHandle = NULL; } - if (pInfo->pSendReadyMsgTmr != NULL) { - (void) taosTmrStop(pInfo->pSendReadyMsgTmr); - pInfo->pSendReadyMsgTmr = NULL; + SStreamTmrInfo* pReadyTmr = &pInfo->chkptReadyMsgTmr; + if (pReadyTmr->tmrHandle != NULL) { + (void) taosTmrStop(pReadyTmr->tmrHandle); + pReadyTmr->tmrHandle = NULL; } taosMemoryFree(pInfo); diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index a54c17df03..8fd26dda27 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -96,12 +96,6 @@ static int32_t attachWaitedEvent(SStreamTask* pTask, SFutureHandleEventInfo* pEv } } -static int32_t stopTaskSuccFn(SStreamTask* pTask) { - SStreamTaskSM* pSM = pTask->status.pSM; - streamFreeTaskState(pTask,pSM->current.state == TASK_STATUS__DROPPING ? 1 : 0); - return TSDB_CODE_SUCCESS; -} - int32_t streamTaskInitStatus(SStreamTask* pTask) { pTask->execInfo.checkTs = taosGetTimestampMs(); stDebug("s-task:%s start init, and check downstream tasks, set the init ts:%" PRId64, pTask->id.idStr, @@ -698,21 +692,21 @@ void doInitStateTransferTable(void) { // resume is completed by restore status of state-machine // stop related event - trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, stopTaskSuccFn, NULL); + trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL); CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans)); - trans = createStateTransform(TASK_STATUS__DROPPING, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, stopTaskSuccFn, NULL); + trans = createStateTransform(TASK_STATUS__DROPPING, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL); CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans)); - trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, stopTaskSuccFn, NULL); + trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL); CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans)); - trans = createStateTransform(TASK_STATUS__STOP, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, stopTaskSuccFn, NULL); + trans = createStateTransform(TASK_STATUS__STOP, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL); CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans)); - trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, stopTaskSuccFn, NULL); + trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL); CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans)); - trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, stopTaskSuccFn, NULL); + trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL); CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans)); - trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, stopTaskSuccFn, NULL); + trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL); CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans)); - trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, stopTaskSuccFn, NULL); + trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL); CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans)); // dropping related event diff --git a/source/libs/stream/src/streamTimer.c b/source/libs/stream/src/streamTimer.c index fb1740ae0a..8f2fc83b72 100644 --- a/source/libs/stream/src/streamTimer.c +++ b/source/libs/stream/src/streamTimer.c @@ -50,3 +50,13 @@ void streamTmrReset(TAOS_TMR_CALLBACK fp, int32_t mseconds, void* param, void* h // stError("vgId:%d failed to reset tmr: %s, try again", vgId, pMsg); // } } + +int32_t streamCleanBeforeQuitTmr(SStreamTmrInfo* pInfo, SStreamTask* pTask) { + pInfo->activeCounter = 0; + pInfo->launchChkptId = 0; + atomic_store_8(&pInfo->isActive, 0); + + int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + ASSERT(ref >= 0); + return ref; +} \ No newline at end of file diff --git a/tests/script/tsim/stream/checkpointInterval0.sim b/tests/script/tsim/stream/checkpointInterval0.sim index a548f05c82..a5e5c87704 100644 --- a/tests/script/tsim/stream/checkpointInterval0.sim +++ b/tests/script/tsim/stream/checkpointInterval0.sim @@ -76,6 +76,8 @@ system sh/stop_dnodes.sh system sh/exec.sh -n dnode1 -s start +run tsim/stream/checkTaskStatus.sim + sql insert into t1 values(1648791213002,3,2,3,1.1); $loop_count = 0 diff --git a/tests/system-test/8-stream/state_window_case.py b/tests/system-test/8-stream/state_window_case.py index 5ecf8d7832..3015b0db42 100644 --- a/tests/system-test/8-stream/state_window_case.py +++ b/tests/system-test/8-stream/state_window_case.py @@ -30,14 +30,14 @@ class TDTestCase: tdSql.execute("CREATE STREAM stream_device_alarm2 TRIGGER AT_ONCE DELETE_MARK 30d INTO st_device_alarm2 tags(factory_id varchar(20), device_code varchar(80), var_name varchar(200))\ as select _wstart start_time, last(load_time) end_time, first(var_value) var_value, 1 state_flag from st_variable_data\ PARTITION BY tbname tname, factory_id, device_code, var_name STATE_WINDOW(case when lower(var_value)=lower(trigger_value) then '1' else '0' end)") - time.sleep(2) + time.sleep(5) def insert_data(self): try: tdSql.execute("insert into aaa values('2024-07-15 14:00:00', '2024-07-15 14:00:00', 'a8')", queryTimes=5, show=True) time.sleep(0.01) tdSql.execute("insert into aaa values('2024-07-15 14:10:00', '2024-07-15 14:10:00', 'a9')", queryTimes=5, show=True) - time.sleep(1) + time.sleep(5) except Exception as error: tdLog.exit(f"insert data failed {error}")