diff --git a/include/libs/stream/streamMsg.h b/include/libs/stream/streamMsg.h index 3f1dc77773..b69032330d 100644 --- a/include/libs/stream/streamMsg.h +++ b/include/libs/stream/streamMsg.h @@ -213,19 +213,21 @@ int32_t tDecodeStreamTaskChkptReport(SDecoder* pDecoder, SCheckpointReport* pReq typedef struct SRestoreCheckpointInfo { SMsgHead head; + int64_t startTs; int64_t streamId; int64_t checkpointId; // latest checkpoint id int32_t taskId; int32_t nodeId; } SRestoreCheckpointInfo; -int32_t tEncodeStreamTaskLatestChkptInfo (SEncoder* pEncoder, const SRestoreCheckpointInfo* pReq); -int32_t tDecodeStreamTaskLatestChkptInfo(SDecoder* pDecoder, SRestoreCheckpointInfo* pReq); +int32_t tEncodeRestoreCheckpointInfo (SEncoder* pEncoder, const SRestoreCheckpointInfo* pReq); +int32_t tDecodeRestoreCheckpointInfo(SDecoder* pDecoder, SRestoreCheckpointInfo* pReq); typedef struct SRestoreCheckpointInfoRsp { - int64_t streamId; - int64_t checkpointId; - int32_t taskId; + int64_t streamId; + int64_t checkpointId; + int64_t startTs; + int32_t taskId; } SRestoreCheckpointInfoRsp; int32_t tEncodeRestoreCheckpointInfoRsp(SEncoder* pCoder, const SRestoreCheckpointInfoRsp* pInfo); @@ -238,11 +240,11 @@ typedef struct { int32_t reqType; } SStreamTaskRunReq; -typedef struct SCheckpointConsensusInfo { +typedef struct SCheckpointConsensusEntry { SRestoreCheckpointInfo req; SRpcMsg rsp; int64_t ts; -} SCheckpointConsensusInfo; +} SCheckpointConsensusEntry; #ifdef __cplusplus } diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index f358e2c310..e98039d2fe 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -266,12 +266,13 @@ typedef struct SStreamTaskId { } SStreamTaskId; typedef struct SCheckpointInfo { - int64_t startTs; - int64_t checkpointId; // latest checkpoint id - int64_t checkpointVer; // latest checkpoint offset in wal - int64_t checkpointTime; // latest checkpoint time - int64_t processedVer; - int64_t nextProcessVer; // current offset in WAL, not serialize it + int64_t startTs; + int64_t checkpointId; // latest checkpoint id + int64_t checkpointVer; // latest checkpoint offset in wal + int64_t checkpointTime; // latest checkpoint time + int64_t processedVer; + int64_t nextProcessVer; // current offset in WAL, not serialize it + SActiveCheckpointInfo* pActiveInfo; int64_t msgVer; } SCheckpointInfo; @@ -613,6 +614,12 @@ typedef struct SStreamTaskState { char* name; } SStreamTaskState; +typedef struct SCheckpointConsensusInfo { + SArray* pTaskList; + int64_t checkpointId; + int64_t genTs; +} SCheckpointConsensusInfo; + int32_t streamSetupScheduleTrigger(SStreamTask* pTask); // dispatch related @@ -747,7 +754,7 @@ void streamMetaRLock(SStreamMeta* pMeta); void streamMetaRUnLock(SStreamMeta* pMeta); void streamMetaWLock(SStreamMeta* pMeta); void streamMetaWUnLock(SStreamMeta* pMeta); -void streamMetaResetStartInfo(STaskStartInfo* pMeta); +void streamMetaResetStartInfo(STaskStartInfo* pMeta, int32_t vgId); SArray* streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta); void streamMetaUpdateStageRole(SStreamMeta* pMeta, int64_t stage, bool isLeader); void streamMetaLoadAllTasks(SStreamMeta* pMeta); @@ -755,7 +762,7 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta); int32_t streamMetaStopAllTasks(SStreamMeta* pMeta); int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId); bool streamMetaAllTasksReady(const SStreamMeta* pMeta); -int32_t streamTaskSendConsensusChkptMsg(SStreamTask* pTask); +int32_t streamTaskSendRestoreChkptMsg(SStreamTask* pTask); // timer tmr_h streamTimerGetInstance(); diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index 611891e298..60b36b3461 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -828,6 +828,7 @@ TEST(clientCase, projection_query_tables) { // printf("error in create db, reason:%s\n", taos_errstr(pRes)); // } // taos_free_result(pRes); + TAOS_RES* pRes = NULL; pRes= taos_query(pConn, "use abc1"); taos_free_result(pRes); diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index 54fbb6c31b..69f430778b 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -133,7 +133,7 @@ int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) int32_t mndStreamSetUpdateChkptAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); int32_t mndCreateStreamChkptInfoUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SArray *pChkptInfoList); int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq); -int32_t mndStreamSetRestoreCheckpointId(SArray* pList, int64_t checkpointId); +int32_t mndSendConsensusCheckpointIdRsp(SArray* pList, int64_t checkpointId); void removeTasksInBuf(SArray *pTaskIds, SStreamExecInfo *pExecInfo); @@ -145,8 +145,14 @@ void mndInitExecInfo(); void mndInitStreamExecInfo(SMnode *pMnode, SStreamExecInfo *pExecInfo); int32_t removeExpiredNodeEntryAndTaskInBuf(SArray *pNodeSnapshot); void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode); -void mndAddConsensusTasks(SArray *pList, const SRestoreCheckpointInfo *pInfo, SRpcMsg *pMsg); -int64_t mndGetConsensusCheckpointId(SArray *pList, SStreamObj *pStream); + +SCheckpointConsensusInfo *mndGetConsensusInfo(SHashObj *pHash, int64_t streamId); +void mndAddConsensusTasks(SCheckpointConsensusInfo *pInfo, const SRestoreCheckpointInfo *pRestoreInfo, SRpcMsg *pMsg); +int64_t mndGetConsensusCheckpointId(SCheckpointConsensusInfo *pInfo, SStreamObj *pStream); +bool mndAllTaskSendCheckpointId(SCheckpointConsensusInfo *pInfo, int32_t numOfTasks, int32_t* pTotal); +void mndClearConsensusRspEntry(SCheckpointConsensusInfo *pInfo); +int32_t doSendConsensusCheckpointRsp(SRestoreCheckpointInfo *pInfo, SRpcMsg *pMsg, int64_t checkpointId); +int64_t mndClearConsensusCheckpointId(SHashObj* pHash, int64_t streamId); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index dd92026e97..41b4f5145b 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1235,6 +1235,9 @@ static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) { code = mndProcessStreamCheckpointTrans(pMnode, p, checkpointId, 1, true); sdbRelease(pSdb, p); + // clear the consensus checkpoint info + mndClearConsensusCheckpointId(execInfo.pStreamConsensus, p->uid); + if (code != -1) { started += 1; @@ -2627,7 +2630,7 @@ static int32_t mndProcessConsensusCheckpointId(SRpcMsg *pReq) { SRestoreCheckpointInfo req = {0}; tDecoderInit(&decoder, pReq->pCont, pReq->contLen); - if (tDecodeStreamTaskLatestChkptInfo(&decoder, &req)) { + if (tDecodeRestoreCheckpointInfo(&decoder, &req)) { tDecoderClear(&decoder); terrno = TSDB_CODE_INVALID_MSG; mError("invalid task consensus-checkpoint msg received"); @@ -2662,34 +2665,44 @@ static int32_t mndProcessConsensusCheckpointId(SRpcMsg *pReq) { int32_t numOfTasks = (pStream == NULL) ? 0 : mndGetNumOfStreamTasks(pStream); - SArray **pTaskList = (SArray **)taosHashGet(execInfo.pStreamConsensus, &req.streamId, sizeof(req.streamId)); - if (pTaskList == NULL) { - SArray *pList = taosArrayInit(4, sizeof(SCheckpointConsensusInfo)); - mndAddConsensusTasks(pList, &req, pReq); - taosHashPut(execInfo.pStreamConsensus, &req.streamId, sizeof(req.streamId), &pList, POINTER_BYTES); + SCheckpointConsensusInfo *pInfo = mndGetConsensusInfo(execInfo.pStreamConsensus, req.streamId); + int64_t ckId = mndGetConsensusCheckpointId(pInfo, pStream); + if (ckId != -1) { // consensus checkpoint id already exist + SRpcMsg rsp = {0}; + rsp.code = 0; + rsp.info = pReq->info; + rsp.contLen = sizeof(SRestoreCheckpointInfoRsp) + sizeof(SMsgHead); + rsp.pCont = rpcMallocCont(rsp.contLen); - pTaskList = (SArray **)taosHashGet(execInfo.pStreamConsensus, &req.streamId, sizeof(req.streamId)); - } else { - mndAddConsensusTasks(*pTaskList, &req, pReq); + SMsgHead *pHead = rsp.pCont; + pHead->vgId = htonl(req.nodeId); + + mDebug("stream:0x%" PRIx64 " consensus checkpointId:%" PRId64 " exists, return directly", req.streamId, ckId); + doSendConsensusCheckpointRsp(&req, &rsp, ckId); + + taosThreadMutexUnlock(&execInfo.lock); + pReq->info.handle = NULL; // disable auto rsp + + return TSDB_CODE_SUCCESS; } - int32_t total = taosArrayGetSize(*pTaskList); - if (total == numOfTasks) { // all tasks has send the reqs + mndAddConsensusTasks(pInfo, &req, pReq); + + int32_t total = 0; + if (mndAllTaskSendCheckpointId(pInfo, numOfTasks, &total)) { // all tasks has send the reqs // start transaction to set the checkpoint id - int64_t checkpointId = mndGetConsensusCheckpointId(*pTaskList, pStream); + int64_t checkpointId = mndGetConsensusCheckpointId(pInfo, pStream); mInfo("stream:0x%" PRIx64 " %s all %d tasks send latest checkpointId, the consensus-checkpointId is:%" PRId64 " will be issued soon", - req.streamId, pStream->name, total, checkpointId); + req.streamId, pStream->name, numOfTasks, checkpointId); // start the checkpoint consensus trans - int32_t code = mndStreamSetRestoreCheckpointId(*pTaskList, checkpointId); - if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) { // remove this entry - taosHashRemove(execInfo.pStreamConsensus, &req.streamId, sizeof(req.streamId)); - int32_t numOfStreams = taosHashGetSize(execInfo.pStreamConsensus); - mDebug("drop stream:0x%" PRIx64 " in consensus-checkpointId list, remain:%d", req.streamId, numOfStreams); + int32_t code = mndSendConsensusCheckpointIdRsp(pInfo->pTaskList, checkpointId); + if (code == TSDB_CODE_SUCCESS) { + mndClearConsensusRspEntry(pInfo); + mDebug("clear all waiting for rsp entry for stream:0x%" PRIx64, req.streamId); } else { - mDebug("stream:0x%" PRIx64 " not start set consensus-checkpointId trans, due to not all task ready", - req.streamId); + mDebug("stream:0x%" PRIx64 " not start send consensus-checkpointId msg, due to not all task ready", req.streamId); } } else { mDebug("stream:0x%" PRIx64 " %d/%d tasks send consensus-checkpointId info", req.streamId, total, numOfTasks); @@ -2700,19 +2713,8 @@ static int32_t mndProcessConsensusCheckpointId(SRpcMsg *pReq) { } taosThreadMutexUnlock(&execInfo.lock); - pReq->info.handle = NULL; // disable auto rsp -// { // start an transaction to set the start checkpoint id -// SRpcMsg rsp = {.code = 0, .info = pReq->info, .contLen = sizeof(SRestoreCheckpointInfoRsp)}; -// rsp.pCont = rpcMallocCont(rsp.contLen); -// SMsgHead *pHead = rsp.pCont; -// pHead->vgId = htonl(req.nodeId); -// -// tmsgSendRsp(&rsp); -// pReq->info.handle = NULL; // disable auto rsp -// } - return 0; } diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index 4113bbe2b0..98b9c7c0c7 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -821,11 +821,12 @@ int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq) { return TSDB_CODE_SUCCESS; } -static int32_t doSendRestoreCheckpointInfo(SRestoreCheckpointInfo* pInfo, SRpcMsg* pMsg, int64_t checkpointId) { +int32_t doSendConsensusCheckpointRsp(SRestoreCheckpointInfo* pInfo, SRpcMsg* pMsg, int64_t checkpointId) { int32_t code = 0; int32_t blen; - SRestoreCheckpointInfoRsp req = {.streamId = pInfo->streamId, .taskId = pInfo->taskId, .checkpointId = checkpointId}; + SRestoreCheckpointInfoRsp req = { + .streamId = pInfo->streamId, .taskId = pInfo->taskId, .checkpointId = checkpointId, .startTs = pInfo->startTs}; tEncodeSize(tEncodeRestoreCheckpointInfoRsp, &req, blen, code); if (code < 0) { @@ -848,51 +849,120 @@ static int32_t doSendRestoreCheckpointInfo(SRestoreCheckpointInfo* pInfo, SRpcMs return code; } -int32_t mndStreamSetRestoreCheckpointId(SArray* pInfoList, int64_t checkpointId) { +int32_t mndSendConsensusCheckpointIdRsp(SArray* pInfoList, int64_t checkpointId) { for(int32_t i = 0; i < taosArrayGetSize(pInfoList); ++i) { - SCheckpointConsensusInfo* pInfo = taosArrayGet(pInfoList, i); - doSendRestoreCheckpointInfo(&pInfo->req, &pInfo->rsp, checkpointId); + SCheckpointConsensusEntry* pInfo = taosArrayGet(pInfoList, i); + doSendConsensusCheckpointRsp(&pInfo->req, &pInfo->rsp, checkpointId); } return 0; } -void mndAddConsensusTasks(SArray* pList, const SRestoreCheckpointInfo* pInfo, SRpcMsg* pMsg) { - bool existed = false; - for(int32_t i = 0; i < taosArrayGetSize(pList); ++i) { - STaskChkptInfo* p = taosArrayGet(pList ,i); - if (p->taskId == pInfo->taskId) { - existed = true; - break; - } +SCheckpointConsensusInfo* mndGetConsensusInfo(SHashObj* pHash, int64_t streamId) { + void* pInfo = taosHashGet(pHash, &streamId, sizeof(streamId)); + if (pInfo != NULL) { + return (SCheckpointConsensusInfo*)pInfo; } - if (!existed) { - SCheckpointConsensusInfo info = {0}; - memcpy(&info.req, pInfo, sizeof(info.req)); + SCheckpointConsensusInfo p = { + .genTs = -1, .checkpointId = -1, .pTaskList = taosArrayInit(4, sizeof(SCheckpointConsensusEntry))}; + taosHashPut(pHash, &streamId, sizeof(streamId), &p, sizeof(p)); - info.rsp.code = 0; - info.rsp.info = pMsg->info; - info.rsp.contLen = sizeof(SRestoreCheckpointInfoRsp) + sizeof(SMsgHead); - info.rsp.pCont = rpcMallocCont(info.rsp.contLen); - - SMsgHead *pHead = info.rsp.pCont; - pHead->vgId = htonl(pInfo->nodeId); - - taosArrayPush(pList, &info); - } + void* pChkptInfo = (SCheckpointConsensusInfo*)taosHashGet(pHash, &streamId, sizeof(streamId)); + return pChkptInfo; } -int64_t mndGetConsensusCheckpointId(SArray* pList, SStreamObj* pStream) { - int64_t checkpointId = INT64_MAX; +// no matter existed or not, add the request into info list anyway, since we need to send rsp mannually +// discard the msg may lead to the lost of connections. +void mndAddConsensusTasks(SCheckpointConsensusInfo *pInfo, const SRestoreCheckpointInfo *pRestoreInfo, SRpcMsg *pMsg) { + SCheckpointConsensusEntry info = {0}; + memcpy(&info.req, pRestoreInfo, sizeof(info.req)); - for (int32_t i = 0; i < taosArrayGetSize(pList); ++i) { - SCheckpointConsensusInfo *pInfo = taosArrayGet(pList, i); - if (pInfo->req.checkpointId < checkpointId) { - checkpointId = pInfo->req.checkpointId; - mTrace("stream:0x%" PRIx64 " %s task:0x%x vgId:%d latest checkpointId:%" PRId64, pStream->uid, pStream->name, - pInfo->req.taskId, pInfo->req.nodeId, pInfo->req.checkpointId); + info.rsp.code = 0; + info.rsp.info = pMsg->info; + info.rsp.contLen = sizeof(SRestoreCheckpointInfoRsp) + sizeof(SMsgHead); + info.rsp.pCont = rpcMallocCont(info.rsp.contLen); + + SMsgHead *pHead = info.rsp.pCont; + pHead->vgId = htonl(pRestoreInfo->nodeId); + + taosArrayPush(pInfo->pTaskList, &info); +} + +static int32_t entryComparFn(const void* p1, const void* p2) { + const SCheckpointConsensusEntry* pe1 = p1; + const SCheckpointConsensusEntry* pe2 = p2; + + if (pe1->req.taskId == pe2->req.taskId) { + return 0; + } + + return pe1->req.taskId < pe2->req.taskId? -1:1; +} + +bool mndAllTaskSendCheckpointId(SCheckpointConsensusInfo* pInfo, int32_t numOfTasks, int32_t* pTotal) { + int32_t numOfExisted = taosArrayGetSize(pInfo->pTaskList); + if (numOfExisted < numOfTasks) { + if (pTotal != NULL) { + *pTotal = numOfExisted; + } + return false; + } + + taosArraySort(pInfo->pTaskList, entryComparFn); + + int32_t num = 1; + int32_t taskId = ((SCheckpointConsensusEntry*)taosArrayGet(pInfo->pTaskList, 0))->req.taskId; + for(int32_t i = 1; i < taosArrayGetSize(pInfo->pTaskList); ++i) { + SCheckpointConsensusEntry* pe = taosArrayGet(pInfo->pTaskList, i); + if (pe->req.taskId != taskId) { + num += 1; + taskId = pe->req.taskId; } } + if (pTotal != NULL) { + *pTotal = num; + } + + ASSERT(num <= numOfTasks); + return num == numOfTasks; +} + +int64_t mndGetConsensusCheckpointId(SCheckpointConsensusInfo* pInfo, SStreamObj* pStream) { + if (pInfo->genTs > 0) { + ASSERT(pInfo->checkpointId > 0); + return pInfo->checkpointId; + } + + int32_t numOfTasks = mndGetNumOfStreamTasks(pStream); + if (!mndAllTaskSendCheckpointId(pInfo, numOfTasks, NULL)) { + return -1; + } + + int64_t checkpointId = INT64_MAX; + + for (int32_t i = 0; i < taosArrayGetSize(pInfo->pTaskList); ++i) { + SCheckpointConsensusEntry *pEntry = taosArrayGet(pInfo->pTaskList, i); + if (pEntry->req.checkpointId < checkpointId) { + checkpointId = pEntry->req.checkpointId; + mTrace("stream:0x%" PRIx64 " %s task:0x%x vgId:%d latest checkpointId:%" PRId64, pStream->uid, pStream->name, + pEntry->req.taskId, pEntry->req.nodeId, pEntry->req.checkpointId); + } + } + + pInfo->checkpointId = checkpointId; + pInfo->genTs = taosGetTimestampMs(); return checkpointId; -} \ No newline at end of file +} + +void mndClearConsensusRspEntry(SCheckpointConsensusInfo* pInfo) { + pInfo->pTaskList = taosArrayDestroy(pInfo->pTaskList); +} + +int64_t mndClearConsensusCheckpointId(SHashObj* pHash, int64_t streamId) { + taosHashRemove(pHash, &streamId, sizeof(streamId)); + int32_t numOfStreams = taosHashGetSize(pHash); + mDebug("drop stream:0x%" PRIx64 " in consensus-checkpointId list after new checkpoint generated, remain:%d", streamId, + numOfStreams); + return TSDB_CODE_SUCCESS; +} diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 6fe9df316f..9686fd3789 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -25,7 +25,7 @@ #define sndDebug(...) do { if (sndDebugFlag & DEBUG_DEBUG) { taosPrintLog("SND ", DEBUG_DEBUG, sndDebugFlag, __VA_ARGS__);}} while (0) // clang-format on -int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProcessVer) { +int32_t sndBuildStreamTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProcessVer) { ASSERT(pTask->info.taskLevel == TASK_LEVEL__AGG && taosArrayGetSize(pTask->upstreamInfo.pList) != 0); int32_t code = streamTaskInit(pTask, pSnode->pMeta, &pSnode->msgCb, nextProcessVer); if (code != TSDB_CODE_SUCCESS) { @@ -71,8 +71,7 @@ SSnode *sndOpen(const char *path, const SSnodeOpt *pOption) { startRsync(); pSnode->msgCb = pOption->msgCb; - pSnode->pMeta = streamMetaOpen(path, pSnode, (FTaskBuild *)sndExpandTask, tqExpandStreamTask, SNODE_HANDLE, - taosGetTimestampMs(), tqStartTaskCompleteCallback); + pSnode->pMeta = streamMetaOpen(path, pSnode, (FTaskBuild *)sndBuildStreamTask, tqExpandStreamTask, SNODE_HANDLE, taosGetTimestampMs(), tqStartTaskCompleteCallback); if (pSnode->pMeta == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; goto FAIL; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 2e90a26d1e..0a64b9c165 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -139,9 +139,11 @@ void tqClose(STQ* pTq) { taosHashCleanup(pTq->pCheckInfo); taosMemoryFree(pTq->path); tqMetaClose(pTq); + + int32_t vgId = pTq->pStreamMeta->vgId; streamMetaClose(pTq->pStreamMeta); - qDebug("end to close tq"); + qDebug("vgId:%d end to close tq", vgId); taosMemoryFree(pTq); } diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 5ca5b9e63e..bdee0d7586 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -146,7 +146,7 @@ int32_t tqStreamTaskRestoreCheckpoint(SStreamMeta* pMeta, int64_t streamId, int3 return TSDB_CODE_STREAM_TASK_NOT_EXIST; } - int32_t code = streamTaskSendConsensusChkptMsg(pTask); + int32_t code = streamTaskSendRestoreChkptMsg(pTask); streamMetaReleaseTask(pMeta, pTask); return code; } @@ -675,8 +675,11 @@ int32_t tqStreamTaskProcessUpdateCheckpointReq(SStreamMeta* pMeta, bool restored if (ppTask != NULL && (*ppTask) != NULL) { streamTaskUpdateTaskCheckpointInfo(*ppTask, restored, pReq); } else { // failed to get the task. - tqError("vgId:%d failed to locate the s-task:0x%x to update the checkpoint info, it may have been dropped already", - vgId, pReq->taskId); + int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); + tqError( + "vgId:%d failed to locate the s-task:0x%x to update the checkpoint info, numOfTasks:%d, it may have been " + "dropped already", + vgId, pReq->taskId, numOfTasks); } streamMetaWUnLock(pMeta); @@ -729,7 +732,7 @@ static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) { streamMetaWUnLock(pMeta); streamMetaStartAllTasks(pMeta); } else { - streamMetaResetStartInfo(&pMeta->startInfo); + streamMetaResetStartInfo(&pMeta->startInfo, pMeta->vgId); streamMetaWUnLock(pMeta); tqInfo("vgId:%d, follower node not start stream tasks or stream is disabled", vgId); } @@ -877,7 +880,6 @@ int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { tqDebug("s-task:%s start task by checking downstream tasks", pTask->id.idStr); ASSERT(pTask->status.downstreamReady == 0); tqStreamTaskRestoreCheckpoint(pMeta, pTask->id.streamId, pTask->id.taskId); -// tqStreamStartOneTaskAsync(pMeta, pTask->pMsgCb, pTask->id.streamId, pTask->id.taskId); } else { tqDebug("s-task:%s status:%s do nothing after receiving reset-task from mnode", pTask->id.idStr, pState->name); } @@ -1114,7 +1116,7 @@ int32_t tqStreamProcessConsensusChkptRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); int32_t len = pMsg->contLen - sizeof(SMsgHead); SRpcMsg rsp = {.info = pMsg->info, .code = TSDB_CODE_SUCCESS}; - bool updateCheckpointId = false; + int64_t now = taosGetTimestampMs(); SRestoreCheckpointInfoRsp req = {0}; @@ -1133,8 +1135,19 @@ int32_t tqStreamProcessConsensusChkptRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.taskId); if (pTask == NULL) { - tqError("vgId:%d process restore checkpointId req, failed to acquire task:0x%x, it may have been dropped already", pMeta->vgId, - req.taskId); + tqError("vgId:%d process restore checkpointId req, failed to acquire task:0x%x, it may have been dropped already", + pMeta->vgId, req.taskId); + streamMetaAddFailedTask(pMeta, req.streamId, req.taskId); + return TSDB_CODE_SUCCESS; + } + + // discard the rsp from before restart + if (req.startTs < pTask->execInfo.created) { + tqWarn("s-task:%s vgId:%d create time:%" PRId64 " recv expired consensus checkpointId:%" PRId64 + " from task createTs:%" PRId64 ", discard", + pTask->id.idStr, pMeta->vgId, pTask->execInfo.created, req.checkpointId, req.startTs); + streamMetaAddFailedTaskSelf(pTask, now); + streamMetaReleaseTask(pMeta, pTask); return TSDB_CODE_SUCCESS; } @@ -1148,23 +1161,11 @@ int32_t tqStreamProcessConsensusChkptRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { tqDebug("s-task:%s vgId:%d update the checkpoint from %" PRId64 " to %" PRId64, pTask->id.idStr, vgId, pTask->chkInfo.checkpointId, req.checkpointId); pTask->chkInfo.checkpointId = req.checkpointId; - updateCheckpointId = true; - streamMetaSaveTask(pMeta, pTask); } - taosThreadMutexUnlock(&pTask->lock); - if (updateCheckpointId) { - streamMetaWLock(pMeta); - if (streamMetaCommit(pMeta) < 0) { - // persist to disk - } - streamMetaWUnLock(pMeta); - } - - // todo: set the update transId, and discard with less transId. if (pMeta->role == NODE_ROLE_LEADER) { - /*code = */tqStreamStartOneTaskAsync(pMeta, pTask->pMsgCb, req.streamId, req.taskId); + /*code = */ tqStreamStartOneTaskAsync(pMeta, pTask->pMsgCb, req.streamId, req.taskId); } else { tqDebug("vgId:%d follower not start task:%s", vgId, pTask->id.idStr); } diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 846ce1a51a..7ec453a68d 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -65,6 +65,11 @@ struct SActiveCheckpointInfo { tmr_h pSendReadyMsgTmr; }; +struct SConsensusCheckpoint { + int8_t inProcess; + +}; + typedef struct { int8_t type; SSDataBlock* pBlock; diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 04d52bb052..a82628ea52 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -1108,7 +1108,7 @@ int32_t deleteCheckpointFile(const char* id, const char* name) { return 0; } -int32_t streamTaskSendConsensusChkptMsg(SStreamTask* pTask) { +int32_t streamTaskSendRestoreChkptMsg(SStreamTask* pTask) { int32_t code; int32_t tlen = 0; int32_t vgId = pTask->pMeta->vgId; @@ -1118,9 +1118,14 @@ int32_t streamTaskSendConsensusChkptMsg(SStreamTask* pTask) { ASSERT(pTask->pBackend == NULL); SRestoreCheckpointInfo req = { - .streamId = pTask->id.streamId, .taskId = pTask->id.taskId, .nodeId = vgId, .checkpointId = pInfo->checkpointId}; + .streamId = pTask->id.streamId, + .taskId = pTask->id.taskId, + .nodeId = vgId, + .checkpointId = pInfo->checkpointId, + .startTs = pTask->execInfo.created, + }; - tEncodeSize(tEncodeStreamTaskLatestChkptInfo, &req, tlen, code); + tEncodeSize(tEncodeRestoreCheckpointInfo, &req, tlen, code); if (code < 0) { stError("s-task:%s vgId:%d encode stream task latest-checkpoint-id failed, code:%s", id, vgId, tstrerror(code)); return -1; @@ -1135,7 +1140,7 @@ int32_t streamTaskSendConsensusChkptMsg(SStreamTask* pTask) { SEncoder encoder; tEncoderInit(&encoder, buf, tlen); - if ((code = tEncodeStreamTaskLatestChkptInfo(&encoder, &req)) < 0) { + if ((code = tEncodeRestoreCheckpointInfo(&encoder, &req)) < 0) { rpcFreeCont(buf); stError("s-task:%s vgId:%d encode stream task latest-checkpoint-id msg failed, code:%s", id, vgId, tstrerror(code)); return -1; @@ -1144,8 +1149,8 @@ int32_t streamTaskSendConsensusChkptMsg(SStreamTask* pTask) { SRpcMsg msg = {0}; initRpcMsg(&msg, TDMT_MND_STREAM_CHKPT_CONSEN, buf, tlen); - stDebug("s-task:%s vgId:%d send task latest-checkpoint-id to mnode:%" PRId64 " to reach the consensus checkpointId", - id, vgId, pInfo->checkpointId); + stDebug("s-task:%s vgId:%d send latest checkpointId:%" PRId64 " to mnode to get the consensus checkpointId", id, vgId, + pInfo->checkpointId); tmsgSendReq(&pTask->info.mnodeEpset, &msg); return 0; diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 5980483a5d..d9b78094a0 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -473,7 +473,7 @@ void streamMetaClear(SStreamMeta* pMeta) { } void streamMetaClose(SStreamMeta* pMeta) { - stDebug("start to close stream meta"); + stDebug("vgId:%d start to close stream meta", pMeta->vgId); if (pMeta == NULL) { return; } @@ -489,11 +489,13 @@ void streamMetaClose(SStreamMeta* pMeta) { void streamMetaCloseImpl(void* arg) { SStreamMeta* pMeta = arg; - stDebug("start to do-close stream meta"); if (pMeta == NULL) { return; } + int32_t vgId = pMeta->vgId; + stDebug("vgId:%d start to do-close stream meta", vgId); + streamMetaWLock(pMeta); streamMetaClear(pMeta); streamMetaWUnLock(pMeta); @@ -526,7 +528,7 @@ void streamMetaCloseImpl(void* arg) { taosThreadRwlockDestroy(&pMeta->lock); taosMemoryFree(pMeta); - stDebug("end to close stream meta"); + stDebug("vgId:%d end to close stream meta", vgId); } // todo let's check the status for each task @@ -888,13 +890,16 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) { continue; } + stDebug("s-task:0x%" PRIx64 "-0x%x vgId:%d loaded from meta file, checkpointId:%" PRId64 " checkpointVer:%" PRId64, + pTask->id.streamId, pTask->id.taskId, vgId, pTask->chkInfo.checkpointId, pTask->chkInfo.checkpointVer); + // do duplicate task check. STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; void* p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); if (p == NULL) { code = pMeta->buildTaskFn(pMeta->ahandle, pTask, pTask->chkInfo.checkpointVer + 1); if (code < 0) { - stError("failed to expand s-task:0x%" PRIx64 ", code:%s, continue", id.taskId, tstrerror(terrno)); + stError("failed to load s-task:0x%"PRIx64", code:%s, continue", id.taskId, tstrerror(terrno)); tFreeStreamTask(pTask); continue; } @@ -978,7 +983,7 @@ void streamMetaNotifyClose(SStreamMeta* pMeta) { int32_t sendCount = 0; streamMetaGetHbSendInfo(pMeta->pHbInfo, &startTs, &sendCount); - stDebug("vgId:%d notify all stream tasks that the vnode is closing. isLeader:%d startHb:%" PRId64 ", totalHb:%d", + stInfo("vgId:%d notify all stream tasks that current vnode is closing. isLeader:%d startHb:%" PRId64 ", totalHb:%d", vgId, (pMeta->role == NODE_ROLE_LEADER), startTs, sendCount); // wait for the stream meta hb function stopping @@ -1020,7 +1025,7 @@ void streamMetaStartHb(SStreamMeta* pMeta) { streamMetaHbToMnode(pRid, NULL); } -void streamMetaResetStartInfo(STaskStartInfo* pStartInfo) { +void streamMetaResetStartInfo(STaskStartInfo* pStartInfo, int32_t vgId) { taosHashClear(pStartInfo->pReadyTaskSet); taosHashClear(pStartInfo->pFailedTaskSet); pStartInfo->tasksWillRestart = 0; @@ -1028,6 +1033,7 @@ void streamMetaResetStartInfo(STaskStartInfo* pStartInfo) { // reset the sentinel flag value to be 0 pStartInfo->startAllTasks = 0; + stDebug("vgId:%d clear all start-all-task info", vgId); } void streamMetaRLock(SStreamMeta* pMeta) { @@ -1249,7 +1255,7 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) { // negotiate the consensus checkpoint id for current task ASSERT(pTask->pBackend == NULL); - code = streamTaskSendConsensusChkptMsg(pTask); + code = streamTaskSendRestoreChkptMsg(pTask); // this task may has no checkpoint, but others tasks may generate checkpoint already? streamMetaReleaseTask(pMeta, pTask); @@ -1420,7 +1426,7 @@ int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int3 // print the initialization elapsed time and info displayStatusInfo(pMeta, pStartInfo->pReadyTaskSet, true); displayStatusInfo(pMeta, pStartInfo->pFailedTaskSet, false); - streamMetaResetStartInfo(pStartInfo); + streamMetaResetStartInfo(pStartInfo, pMeta->vgId); streamMetaWUnLock(pMeta); pStartInfo->completeFn(pMeta); diff --git a/source/libs/stream/src/streamMsg.c b/source/libs/stream/src/streamMsg.c index 8f979b1538..e0435156e2 100644 --- a/source/libs/stream/src/streamMsg.c +++ b/source/libs/stream/src/streamMsg.c @@ -629,8 +629,9 @@ int32_t tDecodeStreamTaskChkptReport(SDecoder* pDecoder, SCheckpointReport* pReq return 0; } -int32_t tEncodeStreamTaskLatestChkptInfo (SEncoder* pEncoder, const SRestoreCheckpointInfo* pReq) { +int32_t tEncodeRestoreCheckpointInfo (SEncoder* pEncoder, const SRestoreCheckpointInfo* pReq) { if (tStartEncode(pEncoder) < 0) return -1; + if (tEncodeI64(pEncoder, pReq->startTs) < 0) return -1; if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; if (tEncodeI64(pEncoder, pReq->checkpointId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1; @@ -639,8 +640,9 @@ int32_t tEncodeStreamTaskLatestChkptInfo (SEncoder* pEncoder, const SRestoreChec return pEncoder->pos; } -int32_t tDecodeStreamTaskLatestChkptInfo(SDecoder* pDecoder, SRestoreCheckpointInfo* pReq) { +int32_t tDecodeRestoreCheckpointInfo(SDecoder* pDecoder, SRestoreCheckpointInfo* pReq) { if (tStartDecode(pDecoder) < 0) return -1; + if (tDecodeI64(pDecoder, &pReq->startTs) < 0) return -1; if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1; if (tDecodeI64(pDecoder, &pReq->checkpointId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1; @@ -651,6 +653,7 @@ int32_t tDecodeStreamTaskLatestChkptInfo(SDecoder* pDecoder, SRestoreCheckpointI int32_t tEncodeRestoreCheckpointInfoRsp(SEncoder* pCoder, const SRestoreCheckpointInfoRsp* pInfo) { if (tStartEncode(pCoder) < 0) return -1; + if (tEncodeI64(pCoder, pInfo->startTs) < 0) return -1; if (tEncodeI64(pCoder, pInfo->streamId) < 0) return -1; if (tEncodeI32(pCoder, pInfo->taskId) < 0) return -1; if (tEncodeI64(pCoder, pInfo->checkpointId) < 0) return -1; @@ -660,6 +663,7 @@ int32_t tEncodeRestoreCheckpointInfoRsp(SEncoder* pCoder, const SRestoreCheckpoi int32_t tDecodeRestoreCheckpointInfoRsp(SDecoder* pCoder, SRestoreCheckpointInfoRsp* pInfo) { if (tStartDecode(pCoder) < 0) return -1; + if (tDecodeI64(pCoder, &pInfo->startTs) < 0) return -1; if (tDecodeI64(pCoder, &pInfo->streamId) < 0) return -1; if (tDecodeI32(pCoder, &pInfo->taskId) < 0) return -1; if (tDecodeI64(pCoder, &pInfo->checkpointId) < 0) return -1;