From ee6516f3c4e0c2525e508c9dbbd3658b1c1e5c79 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 9 Oct 2023 10:13:53 +0800 Subject: [PATCH] enh(stream): add trans to reset task status to avoid being frozen in checkpoint status due to doing checkpoint failure of partial tasks. --- include/common/tmsg.h | 2 +- include/common/tmsgdef.h | 3 +- include/libs/stream/tstream.h | 23 +- source/dnode/mgmt/mgmt_mnode/src/mmHandle.c | 1 + source/dnode/mgmt/mgmt_vnode/src/vmHandle.c | 3 +- source/dnode/mnode/impl/src/mndStream.c | 303 ++++++++++++++------ source/dnode/mnode/impl/src/mndTrans.c | 1 - source/dnode/snode/src/snode.c | 3 - source/dnode/vnode/src/inc/vnodeInt.h | 2 +- source/dnode/vnode/src/tq/tq.c | 51 ++-- source/dnode/vnode/src/tq/tqRead.c | 2 +- source/dnode/vnode/src/vnd/vnodeSvr.c | 8 +- source/libs/executor/src/scanoperator.c | 3 +- source/libs/stream/src/streamCheckpoint.c | 18 +- source/libs/stream/src/streamMeta.c | 15 +- source/libs/stream/src/streamRecover.c | 1 - source/libs/stream/src/streamTask.c | 78 ++--- 17 files changed, 309 insertions(+), 208 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index f25250a542..efeeba3759 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -3288,7 +3288,7 @@ typedef struct { SMsgHead head; int64_t streamId; int32_t taskId; -} SVPauseStreamTaskReq; +} SVPauseStreamTaskReq, SVResetStreamTaskReq; typedef struct { int8_t reserved; diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 279cf72f0b..c97b8398dc 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -302,13 +302,12 @@ enum { // WARN: new msg should be appended to segment tail TD_DEF_MSG_TYPE(TDMT_SYNC_FORCE_FOLLOWER, "sync-force-become-follower", NULL, NULL) TD_NEW_MSG_SEG(TDMT_VND_STREAM_MSG) -// TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TRIGGER, "vnode-stream-trigger", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_STREAM_SCAN_HISTORY, "vnode-stream-scan-history", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_STREAM_SCAN_HISTORY_FINISH, "vnode-stream-scan-history-finish", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_STREAM_CHECK_POINT_SOURCE, "vnode-stream-checkpoint-source", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TASK_UPDATE, "vnode-stream-update", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TASK_RESET, "vnode-stream-reset", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TASK_CHECK, "vnode-stream-task-check", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_VERUPDATE, "vnode-stream-ver-update", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_STREAM_MAX_MSG, "vnd-stream-max", NULL, NULL) TD_NEW_MSG_SEG(TDMT_VND_TMQ_MSG) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 17c3fbf9c6..8367c47464 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -259,8 +259,9 @@ typedef struct SStreamTaskId { typedef struct SCheckpointInfo { int64_t startTs; int64_t checkpointId; - int64_t checkpointVer; // latest checkpointId version + int64_t checkpointVer; // latest checkpointId version int64_t nextProcessVer; // current offset in WAL, not serialize it + int64_t failedId; // record the latest failed checkpoint id } SCheckpointInfo; typedef struct SStreamStatus { @@ -603,13 +604,15 @@ typedef struct STaskStatusEntry { int32_t status; int32_t stage; int32_t nodeId; - int64_t verStart; // start version in WAL, only valid for source task - int64_t verEnd; // end version in WAL, only valid for source task - int64_t offset; // only valid for source task - double inputQUsed; // in MiB + int64_t verStart; // start version in WAL, only valid for source task + int64_t verEnd; // end version in WAL, only valid for source task + int64_t processedVer; // only valid for source task + int64_t activeCheckpointId; // current active checkpoint id + bool checkpointFailed; // denote if the checkpoint is failed or not + double inputQUsed; // in MiB double inputRate; - double sinkQuota; // existed quota size for sink task - double sinkDataSize; // sink to dest data size + double sinkQuota; // existed quota size for sink task + double sinkDataSize; // sink to dest data size } STaskStatusEntry; typedef struct SStreamHbMsg { @@ -732,7 +735,9 @@ int32_t streamTaskReleaseState(SStreamTask* pTask); int32_t streamTaskReloadState(SStreamTask* pTask); void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId); void streamTaskOpenAllUpstreamInput(SStreamTask* pTask); -int32_t streamTaskUpdateDataVer(SStreamTask* pTask, int64_t ver); + +void streamTaskStatusInit(STaskStatusEntry* pEntry, const SStreamTask* pTask); +void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc); // source level int32_t streamSetParamForStreamScannerStep1(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow); @@ -768,10 +773,10 @@ void streamMetaInitForSnode(SStreamMeta* pMeta); // checkpoint int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq); int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask); +void streamTaskClearCheckInfo(SStreamTask* pTask); int32_t streamAlignTransferState(SStreamTask* pTask); int32_t streamBuildAndSendDropTaskMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskId* pTaskId); -int32_t streamBuildAndSendVerUpdateMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskId* pTaskId, int64_t ver); int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SStreamTask* pTask, int8_t isSucceed); int32_t buildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SRpcMsg* pMsg, diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index 1d1bdcc456..7662c8f8c2 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -212,6 +212,7 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_STOP_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_CHECK_POINT_SOURCE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 9d6b18c677..130367f5a8 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -794,14 +794,13 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_SCAN_HISTORY_FINISH_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; -// if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TRIGGER, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_STOP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_CHECK_POINT_SOURCE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_VERUPDATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index ca06ae3102..be7e6f1fdf 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -44,7 +44,8 @@ typedef struct SNodeEntry { typedef struct SStreamExecNodeInfo { SArray *pNodeEntryList; - int64_t ts; // snapshot ts + int64_t ts; // snapshot ts + int64_t activeCheckpoint; // active check point id SHashObj *pTaskMap; SArray *pTaskList; TdThreadMutex lock; @@ -77,14 +78,18 @@ static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, in static int32_t mndProcessNodeCheck(SRpcMsg *pReq); static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg); -static SArray *doExtractNodeListFromStream(SMnode *pMnode); +static SArray *extractNodeListFromStream(SMnode *pMnode); static SArray *mndTakeVgroupSnapshot(SMnode *pMnode); static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList); -static int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans); + +static STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, const char *name); +static int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans); static void initTransAction(STransAction *pAction, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset); +static int32_t createStreamUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SVgroupChangeInfo *pInfo); static void removeStreamTasksInBuf(SStreamObj* pStream, SStreamExecNodeInfo * pExecNode); static void keepStreamTasksInBuf(SStreamObj *pStream, SStreamExecNodeInfo *pExecNode); +static int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot); int32_t mndInitStream(SMnode *pMnode) { SSdbTable table = { @@ -107,6 +112,7 @@ int32_t mndInitStream(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_RESUME_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_STOP_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_VND_STREAM_TASK_UPDATE_RSP, mndTransProcessRsp); + mndSetMsgHandle(pMnode, TDMT_VND_STREAM_TASK_RESET_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_VND_STREAM_CHECK_POINT_SOURCE_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CHECKPOINT_TIMER, mndProcessStreamCheckpointTmr); @@ -579,21 +585,6 @@ int32_t mndPersistDropStreamLog(SMnode *pMnode, STrans *pTrans, SStreamObj *pStr return 0; } -static int32_t mndSetStreamRecover(SMnode *pMnode, STrans *pTrans, const SStreamObj *pStream) { - SStreamObj streamObj = {0}; - memcpy(streamObj.name, pStream->name, TSDB_STREAM_FNAME_LEN); - streamObj.status = STREAM_STATUS__RECOVER; - - SSdbRaw *pCommitRaw = mndStreamActionEncode(&streamObj); - if (pCommitRaw == NULL) return -1; - if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { - mError("stream trans:%d, failed to append commit log since %s", pTrans->id, terrstr()); - return -1; - } - (void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY); - return 0; -} - static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStreamObj *pStream, const char *user) { SStbObj *pStb = NULL; SDbObj *pDb = NULL; @@ -1147,7 +1138,7 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { execNodeList.pNodeEntryList = taosArrayDestroy(execNodeList.pNodeEntryList); } - execNodeList.pNodeEntryList = doExtractNodeListFromStream(pMnode); + execNodeList.pNodeEntryList = extractNodeListFromStream(pMnode); } if (taosArrayGetSize(execNodeList.pNodeEntryList) == 0) { @@ -1609,7 +1600,7 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock } else if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { // offset info const char *offsetStr = "%" PRId64 " [%" PRId64 ", %" PRId64 "]"; - sprintf(buf, offsetStr, pe->offset, pe->verStart, pe->verEnd); + sprintf(buf, offsetStr, pe->processedVer, pe->verStart, pe->verEnd); } STR_TO_VARSTR(vbuf, buf); @@ -1658,7 +1649,9 @@ static int32_t mndPauseStreamTask(STrans *pTrans, SStreamTask *pTask) { return 0; } -int32_t mndPauseAllStreamTaskImpl(STrans *pTrans, SArray *tasks) { +int32_t mndPauseAllStreamTasks(STrans *pTrans, SStreamObj *pStream) { + SArray* tasks = pStream->tasks; + int32_t size = taosArrayGetSize(tasks); for (int32_t i = 0; i < size; i++) { SArray *pTasks = taosArrayGetP(tasks, i); @@ -1678,16 +1671,6 @@ int32_t mndPauseAllStreamTaskImpl(STrans *pTrans, SArray *tasks) { return 0; } -int32_t mndPauseAllStreamTasks(STrans *pTrans, SStreamObj *pStream) { - int32_t code = mndPauseAllStreamTaskImpl(pTrans, pStream->tasks); - if (code != 0) { - return code; - } - // pStream->pHTasksList is null - // code = mndPauseAllStreamTaskImpl(pTrans, pStream->pHTasksList); - return code; -} - static int32_t mndPersistStreamLog(STrans *pTrans, const SStreamObj *pStream, int8_t status) { SStreamObj streamObj = {0}; memcpy(streamObj.name, pStream->name, TSDB_STREAM_FNAME_LEN); @@ -1741,6 +1724,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) { sdbRelease(pMnode->pSdb, pStream); return -1; } + mInfo("trans:%d, used to pause stream:%s", pTrans->id, pauseReq.name); mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb); @@ -1752,7 +1736,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) { // pause all tasks if (mndPauseAllStreamTasks(pTrans, pStream) < 0) { - mError("stream:%s, failed to drop task since %s", pauseReq.name, terrstr()); + mError("stream:%s, failed to pause task since %s", pauseReq.name, terrstr()); sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); return -1; @@ -1979,20 +1963,9 @@ void initTransAction(STransAction *pAction, void *pCont, int32_t contLen, int32_ // todo extract method: traverse stream tasks // build trans to update the epset static int32_t createStreamUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SVgroupChangeInfo *pInfo) { - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, NULL, "stream-task-update"); + STrans* pTrans = doCreateTrans(pMnode, pStream, "stream-task-update"); if (pTrans == NULL) { - mError("failed to build stream task DAG update, reason: %s", tstrerror(TSDB_CODE_OUT_OF_MEMORY)); - return -1; - } - - mDebug("start to build stream:0x%" PRIx64 " task DAG update", pStream->uid); - - mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb); - if (mndTransCheckConflict(pMnode, pTrans) != 0) { - mError("failed to build stream:0x%" PRIx64 " task DAG update, code:%s", pStream->uid, - tstrerror(TSDB_CODE_MND_TRANS_CONFLICT)); - mndTransDrop(pTrans); - return -1; + return terrno; } taosWLockLatch(&pStream->lock); @@ -2153,7 +2126,7 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange return 0; } -static SArray *doExtractNodeListFromStream(SMnode *pMnode) { +static SArray *extractNodeListFromStream(SMnode *pMnode) { SSdb *pSdb = pMnode->pSdb; SStreamObj *pStream = NULL; void *pIter = NULL; @@ -2174,11 +2147,9 @@ static SArray *doExtractNodeListFromStream(SMnode *pMnode) { int32_t numOfTasks = taosArrayGetSize(pLevel); for (int32_t k = 0; k < numOfTasks; ++k) { SStreamTask *pTask = taosArrayGetP(pLevel, k); - SNodeEntry entry = {0}; - epsetAssign(&entry.epset, &pTask->info.epSet); - entry.nodeId = pTask->info.nodeId; - entry.hbTimestamp = -1; + SNodeEntry entry = {.hbTimestamp = -1, .nodeId = pTask->info.nodeId}; + epsetAssign(&entry.epset, &pTask->info.epSet); taosHashPut(pHash, &entry.nodeId, sizeof(entry.nodeId), &entry, sizeof(entry)); } } @@ -2235,24 +2206,28 @@ static int32_t doRemoveFromTask(SStreamExecNodeInfo* pExecNode, STaskId* pRemove return 0; } -static int32_t removeInvalidStreamTask(SArray *pNodeSnapshot) { +static bool taskNodeExists(SArray* pList, int32_t nodeId) { + size_t num = taosArrayGetSize(pList); + + for(int32_t i = 0; i < num; ++i) { + SNodeEntry* pEntry = taosArrayGet(pList, i); + if (pEntry->nodeId == nodeId) { + return true; + } + } + + return false; +} + +int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot) { SArray* pRemoveTaskList = taosArrayInit(4, sizeof(STaskId)); int32_t numOfTask = taosArrayGetSize(execNodeList.pTaskList); - int32_t numOfVgroups = taosArrayGetSize(pNodeSnapshot); for(int32_t i = 0; i < numOfTask; ++i) { STaskId* pId = taosArrayGet(execNodeList.pTaskList, i); STaskStatusEntry* pEntry = taosHashGet(execNodeList.pTaskMap, pId, sizeof(*pId)); - bool existed = false; - for(int32_t j = 0; j < numOfVgroups; ++j) { - SNodeEntry* pNodeEntry = taosArrayGet(pNodeSnapshot, j); - if (pNodeEntry->nodeId == pEntry->nodeId) { - existed = true; - break; - } - } - + bool existed = taskNodeExists(pNodeSnapshot, pEntry->nodeId); if (!existed) { taosArrayPush(pRemoveTaskList, pId); } @@ -2263,15 +2238,18 @@ static int32_t removeInvalidStreamTask(SArray *pNodeSnapshot) { doRemoveFromTask(&execNodeList, pId); } + mDebug("remove invalid stream tasks:%d, remain:%d", (int32_t)taosArrayGetSize(pRemoveTaskList), + (int32_t) taosArrayGetSize(execNodeList.pTaskList)); + int32_t size = taosArrayGetSize(pNodeSnapshot); SArray* pValidNodeEntryList = taosArrayInit(4, sizeof(SNodeEntry)); for(int32_t i = 0; i < taosArrayGetSize(execNodeList.pNodeEntryList); ++i) { - SNodeEntry* pExisted = taosArrayGet(execNodeList.pNodeEntryList, i); + SNodeEntry* p = taosArrayGet(execNodeList.pNodeEntryList, i); for(int32_t j = 0; j < size; ++j) { SNodeEntry* pEntry = taosArrayGet(pNodeSnapshot, j); - if (pEntry->nodeId == pExisted->nodeId) { - taosArrayPush(pValidNodeEntryList, pExisted); + if (pEntry->nodeId == p->nodeId) { + taosArrayPush(pValidNodeEntryList, p); break; } } @@ -2302,7 +2280,7 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) { execNodeList.pNodeEntryList = taosArrayDestroy(execNodeList.pNodeEntryList); } - execNodeList.pNodeEntryList = doExtractNodeListFromStream(pMnode); + execNodeList.pNodeEntryList = extractNodeListFromStream(pMnode); } if (taosArrayGetSize(execNodeList.pNodeEntryList) == 0) { @@ -2315,7 +2293,7 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) { SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode); taosThreadMutexLock(&execNodeList.lock); - removeInvalidStreamTask(pNodeSnapshot); + removeExpirednodeEntryAndTask(pNodeSnapshot); SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execNodeList.pNodeEntryList, pNodeSnapshot); if (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0) { @@ -2343,9 +2321,13 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) { } typedef struct SMStreamNodeCheckMsg { - int8_t holder; // // to fix windows compile error, define place holder + int8_t placeHolder; // // to fix windows compile error, define place holder } SMStreamNodeCheckMsg; +typedef struct SMStreamTaskResetMsg { + int8_t placeHolder; +} SMStreamTaskResetMsg; + static int32_t mndProcessNodeCheck(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; SSdb *pSdb = pMnode->pSdb; @@ -2363,6 +2345,7 @@ static int32_t mndProcessNodeCheck(SRpcMsg *pReq) { void keepStreamTasksInBuf(SStreamObj *pStream, SStreamExecNodeInfo *pExecNode) { int32_t level = taosArrayGetSize(pStream->tasks); + for (int32_t i = 0; i < level; i++) { SArray *pLevel = taosArrayGetP(pStream->tasks, i); @@ -2373,11 +2356,9 @@ void keepStreamTasksInBuf(SStreamObj *pStream, SStreamExecNodeInfo *pExecNode) { STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; void *p = taosHashGet(pExecNode->pTaskMap, &id, sizeof(id)); if (p == NULL) { - STaskStatusEntry entry = {.id.streamId = pTask->id.streamId, - .id.taskId = pTask->id.taskId, - .stage = -1, - .nodeId = pTask->info.nodeId, - .status = TASK_STATUS__STOP}; + STaskStatusEntry entry = {0}; + streamTaskStatusInit(&entry, pTask); + taosHashPut(pExecNode->pTaskMap, &id, sizeof(id), &entry, sizeof(entry)); taosArrayPush(pExecNode->pTaskList, &id); mInfo("s-task:0x%x add into task buffer, total:%d", (int32_t)entry.id.taskId, @@ -2418,10 +2399,150 @@ void removeStreamTasksInBuf(SStreamObj* pStream, SStreamExecNodeInfo * pExecNode ASSERT(taosHashGetSize(pExecNode->pTaskMap) == taosArrayGetSize(pExecNode->pTaskList)); } +static STrans* doCreateTrans(SMnode* pMnode, SStreamObj* pStream, const char* name) { + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, NULL, name); + if (pTrans == NULL) { + mError("failed to build trans:%s, reason: %s", name, tstrerror(TSDB_CODE_OUT_OF_MEMORY)); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + mDebug("start to build stream:0x%" PRIx64 " task DAG update", pStream->uid); + + mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb); + if (mndTransCheckConflict(pMnode, pTrans) != 0) { + terrno = TSDB_CODE_MND_TRANS_CONFLICT; + mError("failed to build trans:%s for stream:0x%" PRIx64 " code:%s", name, pStream->uid, tstrerror(terrno)); + mndTransDrop(pTrans); + return NULL; + } + + terrno = 0; + return pTrans; +} + +int32_t createStreamResetStatusTrans(SMnode* pMnode, SStreamObj* pStream) { + STrans *pTrans = doCreateTrans(pMnode, pStream, "stream-task-reset"); + if (pTrans == NULL) { + return terrno; + } + + taosWLockLatch(&pStream->lock); + int32_t numOfLevels = taosArrayGetSize(pStream->tasks); + + for (int32_t j = 0; j < numOfLevels; ++j) { + SArray *pLevel = taosArrayGetP(pStream->tasks, j); + + int32_t numOfTasks = taosArrayGetSize(pLevel); + for (int32_t k = 0; k < numOfTasks; ++k) { + SStreamTask *pTask = taosArrayGetP(pLevel, k); + + // todo extract method, with pause stream task + SVResetStreamTaskReq* pReq = taosMemoryCalloc(1, sizeof(SVResetStreamTaskReq)); + if (pReq == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + mError("failed to malloc in reset stream, size:%" PRIzu ", code:%s", sizeof(SVResetStreamTaskReq), + tstrerror(TSDB_CODE_OUT_OF_MEMORY)); + return terrno; + } + + pReq->head.vgId = htonl(pTask->info.nodeId); + pReq->taskId = pTask->id.taskId; + pReq->streamId = pTask->id.streamId; + + STransAction action = {0}; + initTransAction(&action, pReq, sizeof(SVResetStreamTaskReq), TDMT_VND_STREAM_TASK_RESET, &pTask->info.epSet); + if (mndTransAppendRedoAction(pTrans, &action) != 0) { + taosMemoryFree(pReq); + taosWUnLockLatch(&pStream->lock); + mndTransDrop(pTrans); + return terrno; + } + } + } + + taosWUnLockLatch(&pStream->lock); + + int32_t code = mndPersistTransLog(pStream, pTrans); + if (code != TSDB_CODE_SUCCESS) { + sdbRelease(pMnode->pSdb, pStream); + return -1; + } + + if (mndTransPrepare(pMnode, pTrans) != 0) { + mError("trans:%d, failed to prepare update stream trans since %s", pTrans->id, terrstr()); + sdbRelease(pMnode->pSdb, pStream); + mndTransDrop(pTrans); + return -1; + } + + sdbRelease(pMnode->pSdb, pStream); + mndTransDrop(pTrans); + + return TSDB_CODE_ACTION_IN_PROGRESS; +} + +int32_t mndResetFromCheckpoint(SMnode* pMnode) { + // find the checkpoint trans id + int32_t transId = 0; + + { + SSdb *pSdb = pMnode->pSdb; + STrans *pTrans = NULL; + void* pIter = NULL; + while (1) { + pIter = sdbFetch(pSdb, SDB_TRANS, pIter, (void **)&pTrans); + if (pIter == NULL) { + break; + } + + if (strncmp(pTrans->opername, MND_STREAM_CHECKPOINT_NAME, tListLen(pTrans->opername) - 1) == 0) { + transId = pTrans->id; + sdbRelease(pSdb, pTrans); + sdbCancelFetch(pSdb, pIter); + break; + } + + sdbRelease(pSdb, pTrans); + } + } + + if (transId == 0) { + mError("failed to find the checkpoint trans, reset not executed"); + return TSDB_CODE_SUCCESS; + } + + STrans* pTrans = mndAcquireTrans(pMnode, transId); + mndKillTrans(pMnode, pTrans); + + // set all tasks status to be normal, refactor later to be stream level, instead of vnode level. + SSdb *pSdb = pMnode->pSdb; + SStreamObj *pStream = NULL; + void *pIter = NULL; + while (1) { + pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream); + if (pIter == NULL) { + break; + } + + mDebug("stream:%s (0x%" PRIx64 ") reset checkpoint procedure, create reset trans", pStream->name, pStream->uid); + int32_t code = createStreamResetStatusTrans(pMnode, pStream); + if (code != TSDB_CODE_SUCCESS) { + sdbCancelFetch(pSdb, pIter); + return code; + } + } + + return 0; +} + int32_t mndProcessStreamHb(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; SStreamHbMsg req = {0}; + bool checkpointFailed = false; + int64_t activeCheckpointId = 0; + SDecoder decoder = {0}; tDecoderInit(&decoder, pReq->pCont, pReq->contLen); @@ -2442,9 +2563,9 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { for (int32_t i = 0; i < req.numOfTasks; ++i) { STaskStatusEntry *p = taosArrayGet(req.pTaskStatus, i); - STaskStatusEntry* pEntry = taosHashGet(execNodeList.pTaskMap, &p->id, sizeof(p->id)); + STaskStatusEntry *pEntry = taosHashGet(execNodeList.pTaskMap, &p->id, sizeof(p->id)); if (pEntry == NULL) { - mError("s-task:0x%"PRIx64" not found in mnode task list", p->id.taskId); + mError("s-task:0x%" PRIx64 " not found in mnode task list", p->id.taskId); continue; } @@ -2462,16 +2583,18 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { } } } else { - pEntry->stage = p->stage; - pEntry->inputQUsed = p->inputQUsed; - pEntry->inputRate = p->inputRate; -// pEntry->outputQUsed = p->outputQUsed; -// pEntry->outputRate = p->outputRate; - pEntry->offset = p->offset; - pEntry->verStart = p->verStart; - pEntry->verEnd = p->verEnd; - pEntry->sinkQuota = p->sinkQuota; - pEntry->sinkDataSize = p->sinkDataSize; + streamTaskStatusCopy(pEntry, p); + if (p->activeCheckpointId != 0) { + if (activeCheckpointId != 0) { + ASSERT(activeCheckpointId == p->activeCheckpointId); + } else { + activeCheckpointId = p->activeCheckpointId; + } + + if (p->checkpointFailed) { + checkpointFailed = p->checkpointFailed; + } + } } pEntry->status = p->status; @@ -2480,6 +2603,18 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { } } + // current checkpoint is failed, rollback from the checkpoint trans + // kill the checkpoint trans and then set all tasks status to be normal + if (checkpointFailed && activeCheckpointId != 0) { + if (execNodeList.activeCheckpoint != activeCheckpointId) { + mInfo("checkpointId:%"PRId64" failed, issue task-reset trans to reset all tasks status", activeCheckpointId); + execNodeList.activeCheckpoint = activeCheckpointId; + mndResetFromCheckpoint(pMnode); + } else { + mDebug("checkpoint:%"PRId64" reset has issued already, ignore it", activeCheckpointId); + } + } + taosThreadMutexUnlock(&execNodeList.lock); taosArrayDestroy(req.pTaskStatus); diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 5d150b731c..7a526016cc 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -1695,7 +1695,6 @@ static int32_t mndRetrieveTrans(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBl int32_t numOfRows = 0; STrans *pTrans = NULL; int32_t cols = 0; - char *pWrite; while (numOfRows < rows) { pShow->pIter = sdbFetch(pSdb, SDB_TRANS, pShow->pIter, (void **)&pTrans); diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 7fb0b6b40a..6451dba2da 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -177,9 +177,6 @@ int32_t sndProcessTaskDeployReq(SSnode *pSnode, char *msg, int32_t msgLen) { qDebug("snode:%d s-task:%s is deployed on snode and add into meta, status:%s, numOfTasks:%d", SNODE_HANDLE, pTask->id.idStr, streamGetTaskStatusStr(pTask->status.taskStatus), numOfTasks); - // send msg to update the nextProcessedVer attribute for this task if it is a stream task - streamBuildAndSendVerUpdateMsg(pTask->pMsgCb, pSnode->pMeta->vgId, &pTask->id, 0); - streamTaskCheckDownstream(pTask); return 0; } diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 2ffdf2fced..e1b75db723 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -227,7 +227,7 @@ int tqScanWalAsync(STQ* pTq, bool ckPause); int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp); int32_t tqProcessTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg); -int32_t tqProcessTaskDataVerUpdateReq(STQ* pTq, char* pMsg, int32_t msgLen); +int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqCheckAndRunStreamTaskAsync(STQ* pTq); int tqCommit(STQ*); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index b89a671a0e..2c83f91713 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1020,16 +1020,6 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms // only handled in the leader node if (vnodeIsRoleLeader(pTq->pVnode)) { tqDebug("vgId:%d s-task:0x%x is deployed and add into meta, numOfTasks:%d", vgId, taskId, numOfTasks); -#if 0 - if (pTq->pVnode->restored) { - SStreamTask* p = streamMetaAcquireTask(pStreamMeta, streamId, taskId); - if (p != NULL) { - // send msg to update the nextProcessedVer attribute for this task if it is a stream task - streamBuildAndSendVerUpdateMsg(p->pMsgCb, vgId, &p->id, sversion); - streamMetaReleaseTask(pStreamMeta, p); - } - } -#endif SStreamTask* p = streamMetaAcquireTask(pStreamMeta, streamId, taskId); bool restored = pTq->pVnode->restored; @@ -1670,7 +1660,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) int32_t len = pMsg->contLen - sizeof(SMsgHead); int32_t code = 0; - // disable auto rsp to source + // disable auto rsp to mnode pRsp->info.handle = NULL; // todo: add counter to make sure other tasks would not be trapped in checkpoint state @@ -1714,9 +1704,11 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) return TSDB_CODE_SUCCESS; } - // todo: handle the partial failure cases // downstream not ready, current the stream tasks are not all ready. Ignore this checkpoint req. if (pTask->status.downstreamReady != 1) { + pTask->chkInfo.failedId = req.checkpointId; // record the latest failed checkpoint id + pTask->checkpointingId = req.checkpointId; + qError("s-task:%s not ready for checkpoint, since downstream not ready, ignore this checkpoint:%" PRId64 ", set it failure", pTask->id.idStr, req.checkpointId); streamMetaReleaseTask(pMeta, pTask); @@ -1932,34 +1924,25 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { return rsp.code; } -int32_t tqProcessTaskDataVerUpdateReq(STQ* pTq, char* pMsg, int32_t msgLen) { +int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg) { + SVPauseStreamTaskReq* pReq = (SVPauseStreamTaskReq*) pMsg->pCont; + SStreamMeta* pMeta = pTq->pStreamMeta; - int32_t vgId = pMeta->vgId; - - SVStreamTaskVerUpdateReq* pReq = (SVStreamTaskVerUpdateReq*) pMsg; - tqDebug("vgId:%d receive msg to update task dataVer, task:0x%x dataVer:%" PRId64, vgId, pReq->taskId, pReq->dataVer); - SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId); if (pTask == NULL) { - tqError("vgId:%d process dataVer msg, failed to find task:0x%x, it may have been destroyed", vgId, pReq->taskId); - return -1; + tqError("vgId:%d process task-reset req, failed to acquire task:0x%x, it may have been dropped already", pMeta->vgId, + pReq->taskId); + return TSDB_CODE_SUCCESS; } - // commit the dataVer update - streamTaskUpdateDataVer(pTask, pReq->dataVer); + tqDebug("s-task:%s receive task-reset msg from mnode, reset status and ready for data processing", pTask->id.idStr); - if (vnodeIsLeader(pTq->pVnode)) { - if (pTq->pVnode->restored) { - ASSERT(pTask->execInfo.init == 0); - - pTask->execInfo.init = taosGetTimestampMs(); - tqDebug("s-task:%s set the init ts:%" PRId64, pTask->id.idStr, pTask->execInfo.init); - streamTaskCheckDownstream(pTask); - } else { - tqWarn("s-task:%s not launched since vnode (vgId:%d) not ready", pTask->id.idStr, vgId); - } + // clear flag set during do checkpoint, and open inputQ for all upstream tasks + if (pTask->status.taskStatus == TASK_STATUS__CK) { + streamTaskClearCheckInfo(pTask); + streamSetStatusNormal(pTask); } streamMetaReleaseTask(pMeta, pTask); - return 0; -} + return TSDB_CODE_SUCCESS; +} \ No newline at end of file diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index dc2876790a..31eede270f 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -309,7 +309,7 @@ int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, con SWalCont* pCont = &pReader->pHead->head; int64_t ver = pCont->version; if (ver > maxVer) { - tqDebug("maxVer in WAL:%" PRId64 " reached current:%" PRId64 ", do not scan wal anymore, %s", maxVer, ver, id); + tqDebug("maxVer in WAL:%" PRId64 " reached, current:%" PRId64 ", do not scan wal anymore, %s", maxVer, ver, id); return TSDB_CODE_SUCCESS; } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 8ed9f81c92..f9ce398c7f 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -583,9 +583,11 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg goto _err; } } break; - case TDMT_STREAM_TASK_VERUPDATE: - tqProcessTaskDataVerUpdateReq(pVnode->pTq, pMsg->pCont, pMsg->contLen); - break; + case TDMT_VND_STREAM_TASK_RESET: { + if (pVnode->restored/* && vnodeIsLeader(pVnode)*/) { + tqProcessTaskResetReq(pVnode->pTq, pMsg); + } + } break; case TDMT_VND_ALTER_CONFIRM: needCommit = pVnode->config.hashChange; if (vnodeProcessAlterConfirmReq(pVnode, ver, pReq, len, pRsp) < 0) { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 805635f603..e963718f71 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1317,8 +1317,9 @@ static STimeWindow getSlidingWindow(TSKEY* startTsCol, TSKEY* endTsCol, uint64_t } static SSDataBlock* doRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_t tsColIndex, int32_t* pRowIndex) { - qInfo("do stream range scan. windows index:%d", *pRowIndex); + qDebug("do stream range scan. windows index:%d", *pRowIndex); bool prepareRes = true; + while (1) { SSDataBlock* pResult = NULL; pResult = doTableScan(pInfo->pTableScanOp); diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 6924d99585..a26d667421 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -143,8 +143,8 @@ int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSo pTask->execInfo.checkpoint += 1; - // 2. let's dispatch checkpoint msg to downstream task directly and do nothing else. put the checkpoint block into - // inputQ, to make sure all blocks with less version have been handled by this task already. + // 2. Put the checkpoint block into inputQ, to make sure all blocks with less version have been handled by this task + // already. int32_t code = appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER); return code; } @@ -264,6 +264,16 @@ int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask) { return 0; } +void streamTaskClearCheckInfo(SStreamTask* pTask) { + pTask->checkpointingId = 0; // clear the checkpoint id + pTask->chkInfo.failedId = 0; + pTask->chkInfo.startTs = 0; // clear the recorded start time + pTask->checkpointNotReadyTasks = 0; + pTask->checkpointAlignCnt = 0; + taosArrayClear(pTask->pReadyMsgList); + streamTaskOpenAllUpstreamInput(pTask); // open inputQ for all upstream tasks +} + int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) { taosWLockLatch(&pMeta->lock); @@ -283,11 +293,11 @@ int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) { ASSERT(p->chkInfo.checkpointId < p->checkpointingId && p->checkpointingId == checkpointId); p->chkInfo.checkpointId = p->checkpointingId; + streamTaskClearCheckInfo(p); streamSetStatusNormal(p); // save the task streamMetaSaveTask(pMeta, p); - streamTaskOpenAllUpstreamInput(p); // open inputQ for all upstream tasks stDebug( "vgId:%d s-task:%s level:%d open upstream inputQ, commit task status after checkpoint completed, " @@ -318,8 +328,6 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { ASSERT(remain >= 0); double el = (taosGetTimestampMs() - pTask->chkInfo.startTs) / 1000.0; - pTask->chkInfo.startTs = 0; // clear the recorded start time - if (remain == 0) { // all tasks are ready stDebug("s-task:%s all downstreams are ready, ready for do checkpoint", pTask->id.idStr); streamBackendDoCheckpoint(pMeta, pTask->checkpointingId); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 43707098bc..53516202c0 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -778,9 +778,11 @@ int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) { if (tEncodeDouble(pEncoder, ps->inputRate) < 0) return -1; if (tEncodeDouble(pEncoder, ps->sinkQuota) < 0) return -1; if (tEncodeDouble(pEncoder, ps->sinkDataSize) < 0) return -1; - if (tEncodeI64(pEncoder, ps->offset) < 0) return -1; + if (tEncodeI64(pEncoder, ps->processedVer) < 0) return -1; if (tEncodeI64(pEncoder, ps->verStart) < 0) return -1; if (tEncodeI64(pEncoder, ps->verEnd) < 0) return -1; + if (tEncodeI64(pEncoder, ps->activeCheckpointId) < 0) return -1; + if (tEncodeI8(pEncoder, ps->checkpointFailed) < 0) return -1; } tEndEncode(pEncoder); return pEncoder->pos; @@ -805,9 +807,11 @@ int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) { if (tDecodeDouble(pDecoder, &entry.inputRate) < 0) return -1; if (tDecodeDouble(pDecoder, &entry.sinkQuota) < 0) return -1; if (tDecodeDouble(pDecoder, &entry.sinkDataSize) < 0) return -1; - if (tDecodeI64(pDecoder, &entry.offset) < 0) return -1; + if (tDecodeI64(pDecoder, &entry.processedVer) < 0) return -1; if (tDecodeI64(pDecoder, &entry.verStart) < 0) return -1; if (tDecodeI64(pDecoder, &entry.verEnd) < 0) return -1; + if (tDecodeI64(pDecoder, &entry.activeCheckpointId) < 0) return -1; + if (tDecodeI8(pDecoder, (int8_t*)&entry.checkpointFailed) < 0) return -1; entry.id.taskId = taskId; taosArrayPush(pReq->pTaskStatus, &entry); @@ -895,8 +899,13 @@ void metaHbToMnode(void* param, void* tmrId) { entry.sinkDataSize = SIZE_IN_MiB((*pTask)->execInfo.sink.dataSize); } + if ((*pTask)->checkpointingId != 0) { + entry.checkpointFailed = ((*pTask)->chkInfo.failedId >= (*pTask)->checkpointingId); + entry.activeCheckpointId = (*pTask)->checkpointingId; + } + if ((*pTask)->exec.pWalReader != NULL) { - entry.offset = (*pTask)->chkInfo.nextProcessVer; + entry.processedVer = (*pTask)->chkInfo.nextProcessVer - 1; walReaderValidVersionRange((*pTask)->exec.pWalReader, &entry.verStart, &entry.verEnd); } diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 55ed555af6..43c1b84fa7 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -305,7 +305,6 @@ static void doProcessDownstreamReadyRsp(SStreamTask* pTask, int32_t numOfReqs) { streamLaunchFillHistoryTask(pTask); } -// todo handle error int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) { ASSERT(pTask->id.taskId == pRsp->upstreamTaskId); const char* id = pTask->id.idStr; diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index b6a60e28d7..37af1ce64f 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -700,63 +700,6 @@ int32_t streamBuildAndSendDropTaskMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskI return code; } -int32_t streamBuildAndSendVerUpdateMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskId* pTaskId, int64_t ver) { - SVStreamTaskVerUpdateReq* pReq = rpcMallocCont(sizeof(SVStreamTaskVerUpdateReq)); - if (pReq == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - pReq->head.vgId = vgId; - pReq->taskId = pTaskId->taskId; - pReq->streamId = pTaskId->streamId; - pReq->dataVer = ver; - - SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_VERUPDATE, .pCont = pReq, .contLen = sizeof(SVStreamTaskVerUpdateReq)}; - int32_t code = tmsgPutToQueue(pMsgCb, WRITE_QUEUE, &msg); - if (code != TSDB_CODE_SUCCESS) { - stError("vgId:%d failed to send update task:0x%x dataVer msg, code:%s", vgId, pTaskId->taskId, tstrerror(code)); - return code; - } - - stDebug("vgId:%d build and send update table:0x%x dataVer:%"PRId64" msg", vgId, pTaskId->taskId, ver); - return code; -} - -int32_t streamTaskUpdateDataVer(SStreamTask* pTask, int64_t ver) { - SStreamMeta* pMeta = pTask->pMeta; - - // commit the dataVer update - int64_t prevVer = 0; - taosThreadMutexLock(&pTask->lock); - - if (pTask->chkInfo.checkpointId == 0) { - prevVer = pTask->chkInfo.nextProcessVer; - pTask->chkInfo.nextProcessVer = ver; - taosThreadMutexUnlock(&pTask->lock); - - taosWLockLatch(&pMeta->lock); - if (streamMetaSaveTask(pMeta, pTask) < 0) { -// return -1; - } - - if (streamMetaCommit(pMeta) < 0) { - // persist to disk - } - - stDebug("s-task:%s nextProcessedVer is update from %" PRId64 " to %" PRId64 " checkpointId:%" PRId64 - " checkpointVer:%" PRId64, - pTask->id.idStr, prevVer, ver, pTask->chkInfo.checkpointId, pTask->chkInfo.checkpointVer); - taosWUnLockLatch(&pMeta->lock); - } else { - stDebug("s-task:%s not update the dataVer, existed:%" PRId64 ", checkpointId:%" PRId64 " checkpointVer:%" PRId64, - pTask->id.idStr, pTask->chkInfo.nextProcessVer, pTask->chkInfo.checkpointId, pTask->chkInfo.checkpointVer); - taosThreadMutexUnlock(&pTask->lock); - } - - return TSDB_CODE_SUCCESS; -} - STaskId streamTaskExtractKey(const SStreamTask* pTask) { STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; return id; @@ -788,4 +731,25 @@ const char* streamGetTaskStatusStr(int32_t status) { case TASK_STATUS__UNINIT: return "uninitialized"; default:return ""; } +} + +void streamTaskStatusInit(STaskStatusEntry* pEntry, const SStreamTask* pTask) { + pEntry->id.streamId = pTask->id.streamId; + pEntry->id.taskId = pTask->id.taskId; + pEntry->stage = -1; + pEntry->nodeId = pTask->info.nodeId; + pEntry->status = TASK_STATUS__STOP; +} + +void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc) { + pDst->stage = pSrc->stage; + pDst->inputQUsed = pSrc->inputQUsed; + pDst->inputRate = pSrc->inputRate; + pDst->processedVer = pSrc->processedVer; + pDst->verStart = pSrc->verStart; + pDst->verEnd = pSrc->verEnd; + pDst->sinkQuota = pSrc->sinkQuota; + pDst->sinkDataSize = pSrc->sinkDataSize; + pDst->activeCheckpointId = pSrc->activeCheckpointId; + pDst->checkpointFailed = pSrc->checkpointFailed; } \ No newline at end of file