diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 5c61265c01..e275c1511d 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -578,6 +578,7 @@ typedef struct STaskCkptInfo { int32_t activeTransId; // checkpoint trans id int8_t failed; // denote if the checkpoint is failed or not int8_t consensusChkptId; // required the consensus-checkpointId + int64_t consensusTs; // } STaskCkptInfo; typedef struct STaskStatusEntry { @@ -588,8 +589,6 @@ typedef struct STaskStatusEntry { int32_t nodeId; SVersionRange verRange; // start/end version in WAL, only valid for source task int64_t processedVer; // only valid for source task - bool inputQChanging; // inputQ is changing or not - int64_t inputQUnchangeCounter; double inputQUsed; // in MiB double inputRate; double procsThroughput; // duration between one element put into input queue and being processed. diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 4faa8bdb58..d2252d1bee 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -59,7 +59,7 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg); static int32_t extractNodeListFromStream(SMnode *pMnode, SArray *pNodeList); static int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq); static int32_t mndProcessCheckpointReport(SRpcMsg *pReq); -static int32_t mndProcessConsensusCheckpointId(SRpcMsg *pMsg); +//static int32_t mndProcessConsensusCheckpointId(SRpcMsg *pMsg); static int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg); static void doSendQuickRsp(SRpcHandleInfo *pInfo, int32_t msgSize, int32_t vgId, int32_t code); @@ -2617,10 +2617,11 @@ int32_t mndProcessCheckpointReport(SRpcMsg *pReq) { return 0; } -static int64_t getConsensusId(int64_t streamId, int32_t numOfTasks, int32_t* pExistedTasks) { +static int64_t getConsensusId(int64_t streamId, int32_t numOfTasks, int32_t* pExistedTasks, bool *pAllSame) { int32_t num = 0; int64_t chkId = INT64_MAX; *pExistedTasks = 0; + *pAllSame = true; for(int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) { STaskId* p = taosArrayGet(execInfo.pTaskList, i); @@ -2631,6 +2632,9 @@ static int64_t getConsensusId(int64_t streamId, int32_t numOfTasks, int32_t* pEx num += 1; STaskStatusEntry* pe = taosHashGet(execInfo.pTaskMap, p, sizeof(*p)); if (chkId > pe->checkpointInfo.latestId) { + if (chkId != INT64_MAX) { + *pAllSame = false; + } chkId = pe->checkpointInfo.latestId; } } @@ -2653,99 +2657,99 @@ static void doSendQuickRsp(SRpcHandleInfo *pInfo, int32_t msgSize, int32_t vgId, pInfo->handle = NULL; // disable auto rsp } -static int32_t mndProcessConsensusCheckpointId(SRpcMsg *pMsg) { - SMnode *pMnode = pMsg->info.node; - SDecoder decoder = {0}; - - SRestoreCheckpointInfo req = {0}; - tDecoderInit(&decoder, pMsg->pCont, pMsg->contLen); - - if (tDecodeRestoreCheckpointInfo(&decoder, &req)) { - tDecoderClear(&decoder); - terrno = TSDB_CODE_INVALID_MSG; - mError("invalid task consensus-checkpoint msg received"); - return -1; - } - tDecoderClear(&decoder); - - mDebug("receive stream task consensus-checkpoint msg, vgId:%d, s-task:0x%" PRIx64 "-0x%x, checkpointId:%" PRId64, - req.nodeId, req.streamId, req.taskId, req.checkpointId); - - // register to the stream task done map, if all tasks has sent this kinds of message, start the checkpoint trans. - taosThreadMutexLock(&execInfo.lock); - - // mnode handle the create stream transaction too slow may cause this problem - SStreamObj *pStream = mndGetStreamObj(pMnode, req.streamId); - if (pStream == NULL) { - mWarn("failed to find the stream:0x%" PRIx64 ", not handle consensus-checkpointId", req.streamId); - - // not in meta-store yet, try to acquire the task in exec buffer - // the checkpoint req arrives too soon before the completion of the create stream trans. - STaskId id = {.streamId = req.streamId, .taskId = req.taskId}; - void *p = taosHashGet(execInfo.pTaskMap, &id, sizeof(id)); - if (p == NULL) { - mError("failed to find the stream:0x%" PRIx64 " in buf, not handle consensus-checkpointId", req.streamId); - terrno = TSDB_CODE_MND_STREAM_NOT_EXIST; - taosThreadMutexUnlock(&execInfo.lock); - - doSendQuickRsp(&pMsg->info, sizeof(SMStreamReqConsensChkptRsp), req.nodeId, terrno); - return -1; - } else { - mDebug("s-task:0x%" PRIx64 "-0x%x in buf not in mnode/meta, create stream trans may not complete yet", - req.streamId, req.taskId); - // todo wait for stream is created - } - } - - mInfo("vgId:%d stream:0x%" PRIx64 " %s meta-stored checkpointId:%" PRId64, req.nodeId, req.streamId, pStream->name, - pStream->checkpointId); - - int32_t numOfTasks = (pStream == NULL) ? 0 : mndGetNumOfStreamTasks(pStream); - if ((pStream != NULL) && (pStream->checkpointId == 0)) { // not generated checkpoint yet, return 0 directly - taosThreadMutexUnlock(&execInfo.lock); - mndCreateSetConsensusChkptIdTrans(pMnode, pStream, req.taskId, 0, req.startTs); - - doSendQuickRsp(&pMsg->info, sizeof(SMStreamReqConsensChkptRsp), req.nodeId, terrno); - return TSDB_CODE_SUCCESS; - } - - int32_t num = 0; - int64_t chkId = getConsensusId(req.streamId, numOfTasks, &num); - - // some tasks not send hbMsg to mnode yet, wait for 5s. - if (chkId == -1) { - mDebug("not all(%d/%d) task(s) send hbMsg yet, wait for a while and check again, s-task:0x%x", req.taskId, num, - numOfTasks); - SCheckpointConsensusInfo *pInfo = mndGetConsensusInfo(execInfo.pStreamConsensus, req.streamId, numOfTasks); - mndAddConsensusTasks(pInfo, &req); - - taosThreadMutexUnlock(&execInfo.lock); - doSendQuickRsp(&pMsg->info, sizeof(SMStreamReqConsensChkptRsp), req.nodeId, terrno); - return 0; - } - - if (chkId == req.checkpointId) { - mDebug("vgId:%d stream:0x%" PRIx64 " %s consensus-checkpointId is:%" PRId64 ", meta-stored checkpointId:%" PRId64, - req.nodeId, req.streamId, pStream->name, chkId, pStream->checkpointId); - mndCreateSetConsensusChkptIdTrans(pMnode, pStream, req.taskId, chkId, req.startTs); - - taosThreadMutexUnlock(&execInfo.lock); - doSendQuickRsp(&pMsg->info, sizeof(SMStreamReqConsensChkptRsp), req.nodeId, terrno); - return 0; - } - - // wait for 5s and check again - SCheckpointConsensusInfo *pInfo = mndGetConsensusInfo(execInfo.pStreamConsensus, req.streamId, numOfTasks); - mndAddConsensusTasks(pInfo, &req); - - if (pStream != NULL) { - mndReleaseStream(pMnode, pStream); - } - - taosThreadMutexUnlock(&execInfo.lock); - doSendQuickRsp(&pMsg->info, sizeof(SMStreamReqConsensChkptRsp), req.nodeId, terrno); - return 0; -} +//static int32_t mndProcessConsensusCheckpointId(SRpcMsg *pMsg) { +// SMnode *pMnode = pMsg->info.node; +// SDecoder decoder = {0}; +// +// SRestoreCheckpointInfo req = {0}; +// tDecoderInit(&decoder, pMsg->pCont, pMsg->contLen); +// +// if (tDecodeRestoreCheckpointInfo(&decoder, &req)) { +// tDecoderClear(&decoder); +// terrno = TSDB_CODE_INVALID_MSG; +// mError("invalid task consensus-checkpoint msg received"); +// return -1; +// } +// tDecoderClear(&decoder); +// +// mDebug("receive stream task consensus-checkpoint msg, vgId:%d, s-task:0x%" PRIx64 "-0x%x, checkpointId:%" PRId64, +// req.nodeId, req.streamId, req.taskId, req.checkpointId); +// +// // register to the stream task done map, if all tasks has sent this kinds of message, start the checkpoint trans. +// taosThreadMutexLock(&execInfo.lock); +// +// // mnode handle the create stream transaction too slow may cause this problem +// SStreamObj *pStream = mndGetStreamObj(pMnode, req.streamId); +// if (pStream == NULL) { +// mWarn("failed to find the stream:0x%" PRIx64 ", not handle consensus-checkpointId", req.streamId); +// +// // not in meta-store yet, try to acquire the task in exec buffer +// // the checkpoint req arrives too soon before the completion of the create stream trans. +// STaskId id = {.streamId = req.streamId, .taskId = req.taskId}; +// void *p = taosHashGet(execInfo.pTaskMap, &id, sizeof(id)); +// if (p == NULL) { +// mError("failed to find the stream:0x%" PRIx64 " in buf, not handle consensus-checkpointId", req.streamId); +// terrno = TSDB_CODE_MND_STREAM_NOT_EXIST; +// taosThreadMutexUnlock(&execInfo.lock); +// +// doSendQuickRsp(&pMsg->info, sizeof(SMStreamReqConsensChkptRsp), req.nodeId, terrno); +// return -1; +// } else { +// mDebug("s-task:0x%" PRIx64 "-0x%x in buf not in mnode/meta, create stream trans may not complete yet", +// req.streamId, req.taskId); +// // todo wait for stream is created +// } +// } +// +// mInfo("vgId:%d stream:0x%" PRIx64 " %s meta-stored checkpointId:%" PRId64, req.nodeId, req.streamId, pStream->name, +// pStream->checkpointId); +// +// int32_t numOfTasks = (pStream == NULL) ? 0 : mndGetNumOfStreamTasks(pStream); +// if ((pStream != NULL) && (pStream->checkpointId == 0)) { // not generated checkpoint yet, return 0 directly +// taosThreadMutexUnlock(&execInfo.lock); +// mndCreateSetConsensusChkptIdTrans(pMnode, pStream, req.taskId, 0, req.startTs); +// +// doSendQuickRsp(&pMsg->info, sizeof(SMStreamReqConsensChkptRsp), req.nodeId, terrno); +// return TSDB_CODE_SUCCESS; +// } +// +// int32_t num = 0; +// int64_t chkId = getConsensusId(req.streamId, numOfTasks, &num); +// +// // some tasks not send hbMsg to mnode yet, wait for 5s. +// if (chkId == -1) { +// mDebug("not all(%d/%d) task(s) send hbMsg yet, wait for a while and check again, s-task:0x%x", req.taskId, num, +// numOfTasks); +// SCheckpointConsensusInfo *pInfo = mndGetConsensusInfo(execInfo.pStreamConsensus, req.streamId, numOfTasks); +// mndAddConsensusTasks(pInfo, &req); +// +// taosThreadMutexUnlock(&execInfo.lock); +// doSendQuickRsp(&pMsg->info, sizeof(SMStreamReqConsensChkptRsp), req.nodeId, terrno); +// return 0; +// } +// +// if (chkId == req.checkpointId) { +// mDebug("vgId:%d stream:0x%" PRIx64 " %s consensus-checkpointId is:%" PRId64 ", meta-stored checkpointId:%" PRId64, +// req.nodeId, req.streamId, pStream->name, chkId, pStream->checkpointId); +// mndCreateSetConsensusChkptIdTrans(pMnode, pStream, req.taskId, chkId, req.startTs); +// +// taosThreadMutexUnlock(&execInfo.lock); +// doSendQuickRsp(&pMsg->info, sizeof(SMStreamReqConsensChkptRsp), req.nodeId, terrno); +// return 0; +// } +// +// // wait for 5s and check again +// SCheckpointConsensusInfo *pInfo = mndGetConsensusInfo(execInfo.pStreamConsensus, req.streamId, numOfTasks); +// mndAddConsensusTasks(pInfo, &req); +// +// if (pStream != NULL) { +// mndReleaseStream(pMnode, pStream); +// } +// +// taosThreadMutexUnlock(&execInfo.lock); +// doSendQuickRsp(&pMsg->info, sizeof(SMStreamReqConsensChkptRsp), req.nodeId, terrno); +// return 0; +//} int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) { SMnode *pMnode = pMsg->info.node; @@ -2753,6 +2757,15 @@ int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) { SArray *pStreamList = taosArrayInit(4, sizeof(int64_t)); mDebug("start to process consensus-checkpointId in tmr"); + + bool allReady = true; + SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode, &allReady); + taosArrayDestroy(pNodeSnapshot); + if (!allReady) { + mWarn("not all vnodes are ready, end to process the consensus-checkpointId in tmr process"); + return 0; + } + taosThreadMutexLock(&execInfo.lock); void *pIter = NULL; @@ -2765,7 +2778,8 @@ int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) { SStreamObj *pStream = mndGetStreamObj(pMnode, pInfo->streamId); if (pStream == NULL) { // stream has been dropped already - mDebug("stream:0x%"PRIx64" dropped already, continue", pInfo->streamId); + mDebug("stream:0x%" PRIx64 " dropped already, continue", pInfo->streamId); + taosArrayDestroy(pList); continue; } @@ -2773,15 +2787,18 @@ int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) { SCheckpointConsensusEntry *pe = taosArrayGet(pInfo->pTaskList, j); streamId = pe->req.streamId; - if ((now - pe->ts) >= 10 * 1000) { - int32_t existed = 0; - int64_t chkId = getConsensusId(pe->req.streamId, pInfo->numOfTasks, &existed); - if (chkId == -1) { - mDebug("not all(%d/%d) task(s) send hbMsg yet, wait for a while and check again, s-task:0x%x", existed, - pInfo->numOfTasks, pe->req.taskId); - break; - } + int32_t existed = 0; + bool allSame = true; + int64_t chkId = getConsensusId(pe->req.streamId, pInfo->numOfTasks, &existed, &allSame); + if (chkId == -1) { + mDebug("not all(%d/%d) task(s) send hbMsg yet, wait for a while and check again, s-task:0x%x", existed, + pInfo->numOfTasks, pe->req.taskId); + break; + } + if (((now - pe->ts) >= 10 * 1000) || allSame) { + mDebug("s-task:0x%x sendTs:%" PRId64 " wait %.2fs and all tasks have same checkpointId", pe->req.taskId, + (now - pe->ts) / 1000.0, pe->ts); ASSERT(chkId <= pe->req.checkpointId); mndCreateSetConsensusChkptIdTrans(pMnode, pStream, pe->req.taskId, chkId, pe->req.startTs); diff --git a/source/dnode/mnode/impl/src/mndStreamHb.c b/source/dnode/mnode/impl/src/mndStreamHb.c index 8a374c99ef..1452ac77d2 100644 --- a/source/dnode/mnode/impl/src/mndStreamHb.c +++ b/source/dnode/mnode/impl/src/mndStreamHb.c @@ -290,7 +290,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { .streamId = p->id.streamId, .taskId = p->id.taskId, .checkpointId = p->checkpointInfo.latestId, - .startTs = pTaskEntry->startTime, + .startTs = pChkInfo->consensusTs, }; SStreamObj *pStream = mndGetStreamObj(pMnode, p->id.streamId); @@ -320,21 +320,6 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { // remove failed trans from pChkptStreams taosHashRemove(execInfo.pChkptStreams, &p->id.streamId, sizeof(p->id.streamId)); } - -/* if (pChkInfo->consensusChkptId != 0) { - SRestoreCheckpointInfo cp = { - .streamId = p->id.streamId, - .taskId = p->id.taskId, - .checkpointId = p->checkpointInfo.latestId, - .startTs = pTaskEntry->startTime, - }; - - SStreamObj* pStream = mndGetStreamObj(pMnode, p->id.streamId); - int32_t numOfTasks = mndGetNumOfStreamTasks(pStream); - SCheckpointConsensusInfo *pInfo = mndGetConsensusInfo(execInfo.pStreamConsensus, p->id.streamId, numOfTasks); - mndAddConsensusTasks(pInfo, &cp, NULL); - mndReleaseStream(pMnode, pStream); - }*/ } if (p->status == pTaskEntry->status) { diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index 430bfcc3a2..be17082200 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -87,7 +87,7 @@ SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady) { int32_t replica = -1; // do the replica check *allReady = true; - SArray *pVgroupListSnapshot = taosArrayInit(4, sizeof(SNodeEntry)); + SArray *pVgroupList = taosArrayInit(4, sizeof(SNodeEntry)); while (1) { pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); @@ -133,7 +133,7 @@ SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady) { epsetToStr(&entry.epset, buf, tListLen(buf)); mDebug("take node snapshot, nodeId:%d %s", entry.nodeId, buf); - taosArrayPush(pVgroupListSnapshot, &entry); + taosArrayPush(pVgroupList, &entry); sdbRelease(pSdb, pVgroup); } @@ -152,11 +152,11 @@ SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady) { epsetToStr(&entry.epset, buf, tListLen(buf)); mDebug("take snode snapshot, nodeId:%d %s", entry.nodeId, buf); - taosArrayPush(pVgroupListSnapshot, &entry); + taosArrayPush(pVgroupList, &entry); sdbRelease(pSdb, pObj); } - return pVgroupListSnapshot; + return pVgroupList; } SStreamObj *mndGetStreamObj(SMnode *pMnode, int64_t streamId) { @@ -960,9 +960,10 @@ void mndAddConsensusTasks(SCheckpointConsensusInfo *pInfo, const SRestoreCheckpo for (int32_t i = 0; i < taosArrayGetSize(pInfo->pTaskList); ++i) { SCheckpointConsensusEntry *p = taosArrayGet(pInfo->pTaskList, i); if (p->req.taskId == info.req.taskId) { - mDebug("s-task:0x%x already in consensus-checkpointId list for stream:0x%" PRIx64 - ", ignore this, total existed:%d", + mDebug("s-task:0x%x already in consensus-checkpointId list for stream:0x%" PRIx64 ", update ts %" PRId64 + "->%" PRId64 " total existed:%d", pRestoreInfo->taskId, pRestoreInfo->streamId, (int32_t)taosArrayGetSize(pInfo->pTaskList)); + p->req.startTs = info.req.startTs; return; } } diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index dc55acbb5c..a94c17f735 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -1161,7 +1161,7 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { streamMetaAddFailedTask(pMeta, req.streamId, req.taskId); return TSDB_CODE_SUCCESS; } -#if 0 + // discard the rsp, since it is expired. if (req.startTs < pTask->execInfo.created) { tqWarn("s-task:%s vgId:%d create time:%" PRId64 " recv expired consensus checkpointId:%" PRId64 @@ -1171,7 +1171,7 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { streamMetaReleaseTask(pMeta, pTask); return TSDB_CODE_SUCCESS; } -#endif + tqDebug("s-task:%s vgId:%d checkpointId:%" PRId64 " restore to consensus-checkpointId:%" PRId64 " from mnode", pTask->id.idStr, vgId, pTask->chkInfo.checkpointId, req.checkpointId); diff --git a/source/libs/stream/src/streamMsg.c b/source/libs/stream/src/streamMsg.c index f10296f6ff..1bc91d6984 100644 --- a/source/libs/stream/src/streamMsg.c +++ b/source/libs/stream/src/streamMsg.c @@ -350,6 +350,7 @@ int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) { if (tEncodeI64(pEncoder, ps->checkpointInfo.latestSize) < 0) return -1; if (tEncodeI8(pEncoder, ps->checkpointInfo.remoteBackup) < 0) return -1; if (tEncodeI8(pEncoder, ps->checkpointInfo.consensusChkptId) < 0) return -1; + if (tEncodeI64(pEncoder, ps->checkpointInfo.consensusTs) < 0) return -1; if (tEncodeI64(pEncoder, ps->startTime) < 0) return -1; if (tEncodeI64(pEncoder, ps->startCheckpointId) < 0) return -1; if (tEncodeI64(pEncoder, ps->startCheckpointVer) < 0) return -1; @@ -405,6 +406,7 @@ int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) { if (tDecodeI64(pDecoder, &entry.checkpointInfo.latestSize) < 0) return -1; if (tDecodeI8(pDecoder, &entry.checkpointInfo.remoteBackup) < 0) return -1; if (tDecodeI8(pDecoder, &entry.checkpointInfo.consensusChkptId) < 0) return -1; + if (tDecodeI64(pDecoder, &entry.checkpointInfo.consensusTs) < 0) return -1; if (tDecodeI64(pDecoder, &entry.startTime) < 0) return -1; if (tDecodeI64(pDecoder, &entry.startCheckpointId) < 0) return -1; if (tDecodeI64(pDecoder, &entry.startCheckpointVer) < 0) return -1; diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 5506ed2d45..f72a5dd434 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -849,6 +849,7 @@ STaskStatusEntry streamTaskGetStatusEntry(SStreamTask* pTask) { .checkpointInfo.latestSize = 0, .checkpointInfo.remoteBackup = 0, .checkpointInfo.consensusChkptId = 0, + .checkpointInfo.consensusTs = taosGetTimestampMs(), .hTaskId = pTask->hTaskInfo.id.taskId, .procsTotal = SIZE_IN_MiB(pExecInfo->inputDataSize), .outputTotal = SIZE_IN_MiB(pExecInfo->outputDataSize),