From d0675a660ef2811ea8c246d63e47521d3ae503cb Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 7 Jun 2024 17:53:53 +0800 Subject: [PATCH 01/21] fix(stream): init the node list and task list when starting mnode. --- source/dnode/mnode/impl/inc/mndStream.h | 5 +- source/dnode/mnode/impl/src/mndMain.c | 2 + source/dnode/mnode/impl/src/mndStream.c | 74 ++++++++------------- source/dnode/mnode/impl/src/mndStreamHb.c | 60 ----------------- source/dnode/mnode/impl/src/mndStreamUtil.c | 42 ++++++++++++ 5 files changed, 75 insertions(+), 108 deletions(-) diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index 6aed50e508..c8a967620c 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -120,8 +120,9 @@ void destroyStreamTaskIter(SStreamTaskIter *pIter); bool streamTaskIterNextTask(SStreamTaskIter *pIter); SStreamTask *streamTaskIterGetCurrent(SStreamTaskIter *pIter); void mndInitExecInfo(); -void removeExpiredNodeInfo(const SArray *pNodeSnapshot); -void removeTasksInBuf(SArray* pTaskIds, SStreamExecInfo* pExecInfo); +void mndInitStreamExecInfo(SMnode *pMnode, SStreamExecInfo *pExecInfo); +int32_t removeExpiredNodeEntryAndTaskInBuf(SArray *pNodeSnapshot); +void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index cad8c6d745..9252843d2f 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -680,6 +680,8 @@ SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) { return NULL; } + mndInitStreamExecInfo(pMnode, &execInfo); + mInfo("mnode open successfully"); return pMnode; } diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 5fc6e465ad..1f1eaa999e 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -56,13 +56,14 @@ static int32_t mndBuildStreamCheckpointSourceReq(void **pBuf, int32_t *pLen, int int64_t streamId, int32_t taskId, int32_t transId, int8_t mndTrigger); static int32_t mndProcessNodeCheck(SRpcMsg *pReq); static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg); -static SArray *extractNodeListFromStream(SMnode *pMnode, SArray* pNodeList); +static int32_t extractNodeListFromStream(SMnode *pMnode, SArray* pNodeList); static int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq); static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList); -void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode); -static int32_t removeExpiredNodeEntryAndTaskInBuf(SArray *pNodeSnapshot); +static void removeTasksInBuf(SArray *pTaskIds, SStreamExecInfo *pExecInfo); +static void addAllStreamTasksIntoBuf(SMnode *pMnode, SStreamExecInfo *pExecInfo); +static void removeExpiredNodeInfo(const SArray *pNodeSnapshot); static int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDbName, size_t len); static SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw); @@ -131,9 +132,11 @@ int32_t mndInitStream(SMnode *pMnode) { if (sdbSetTable(pMnode->pSdb, table) != 0) { return -1; } + if (sdbSetTable(pMnode->pSdb, tableSeq) != 0) { return -1; } + return 0; } @@ -2169,7 +2172,7 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange return 0; } -static SArray *extractNodeListFromStream(SMnode *pMnode, SArray* pNodeList) { +static int32_t extractNodeListFromStream(SMnode *pMnode, SArray* pNodeList) { SSdb *pSdb = pMnode->pSdb; SStreamObj *pStream = NULL; void *pIter = NULL; @@ -2215,48 +2218,6 @@ static SArray *extractNodeListFromStream(SMnode *pMnode, SArray* pNodeList) { return TSDB_CODE_SUCCESS; } -static bool taskNodeExists(SArray *pList, int32_t nodeId) { - size_t num = taosArrayGetSize(pList); - - for (int32_t i = 0; i < num; ++i) { - SNodeEntry *pEntry = taosArrayGet(pList, i); - if (pEntry->nodeId == nodeId) { - return true; - } - } - - return false; -} - -int32_t removeExpiredNodeEntryAndTaskInBuf(SArray *pNodeSnapshot) { - SArray *pRemovedTasks = taosArrayInit(4, sizeof(STaskId)); - - int32_t numOfTask = taosArrayGetSize(execInfo.pTaskList); - for (int32_t i = 0; i < numOfTask; ++i) { - STaskId *pId = taosArrayGet(execInfo.pTaskList, i); - - STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, pId, sizeof(*pId)); - if (pEntry->nodeId == SNODE_HANDLE) { - continue; - } - - bool existed = taskNodeExists(pNodeSnapshot, pEntry->nodeId); - if (!existed) { - taosArrayPush(pRemovedTasks, pId); - } - } - - removeTasksInBuf(pRemovedTasks, &execInfo); - - mDebug("remove invalid stream tasks:%d, remain:%d", (int32_t)taosArrayGetSize(pRemovedTasks), - (int32_t)taosArrayGetSize(execInfo.pTaskList)); - - removeExpiredNodeInfo(pNodeSnapshot); - - taosArrayDestroy(pRemovedTasks); - return 0; -} - // this function runs by only one thread, so it is not multi-thread safe static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) { int32_t code = 0; @@ -2510,3 +2471,24 @@ static int32_t mndProcessDropStreamReqFromMNode(SRpcMsg *pReq) { } return code; } + +void mndInitStreamExecInfo(SMnode *pMnode, SStreamExecInfo *pExecInfo) { + addAllStreamTasksIntoBuf(pMnode, pExecInfo); + extractNodeListFromStream(pMnode, pExecInfo->pNodeList); +} + +void addAllStreamTasksIntoBuf(SMnode *pMnode, SStreamExecInfo* pExecInfo) { + SSdb *pSdb = pMnode->pSdb; + SStreamObj *pStream = NULL; + void *pIter = NULL; + + while (1) { + pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream); + if (pIter == NULL) { + break; + } + + saveTaskAndNodeInfoIntoBuf(pStream, pExecInfo); + sdbRelease(pSdb, pStream); + } +} \ No newline at end of file diff --git a/source/dnode/mnode/impl/src/mndStreamHb.c b/source/dnode/mnode/impl/src/mndStreamHb.c index 42efb6589e..6f398fbc11 100644 --- a/source/dnode/mnode/impl/src/mndStreamHb.c +++ b/source/dnode/mnode/impl/src/mndStreamHb.c @@ -22,55 +22,6 @@ typedef struct SFailedCheckpointInfo { int32_t transId; } SFailedCheckpointInfo; -static void addAllStreamTasksIntoBuf(SMnode *pMnode, SStreamExecInfo* pExecInfo) { - SSdb *pSdb = pMnode->pSdb; - SStreamObj *pStream = NULL; - void *pIter = NULL; - - while (1) { - pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream); - if (pIter == NULL) { - break; - } - - saveTaskAndNodeInfoIntoBuf(pStream, pExecInfo); - sdbRelease(pSdb, pStream); - } -} - -static void removeDroppedStreamTasksInBuf(SMnode *pMnode, SStreamExecInfo *pExecInfo) { - if (pMnode == NULL) { - return; - } - - int32_t num = taosArrayGetSize(pExecInfo->pTaskList); - - SHashObj *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); - SArray *pIdList = taosArrayInit(4, sizeof(STaskId)); - - for (int32_t i = 0; i < num; ++i) { - STaskId* pId = taosArrayGet(pExecInfo->pTaskList, i); - - void* p = taosHashGet(pHash, &pId->streamId, sizeof(int64_t)); - if (p != NULL) { - continue; - } - - void* pObj = mndGetStreamObj(pMnode, pId->streamId); - if (pObj != NULL) { - mndReleaseStream(pMnode, pObj); - taosHashPut(pHash, &pId->streamId, sizeof(int64_t), NULL, 0); - } else { - taosArrayPush(pIdList, pId); - } - } - - removeTasksInBuf(pIdList, &execInfo); - - taosArrayDestroy(pIdList); - taosHashCleanup(pHash); -} - static void updateStageInfo(STaskStatusEntry *pTaskEntry, int64_t stage) { int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeList); for (int32_t j = 0; j < numOfNodes; ++j) { @@ -290,17 +241,6 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { taosThreadMutexLock(&execInfo.lock); - // extract stream task list - if (taosHashGetSize(execInfo.pTaskMap) == 0) { - addAllStreamTasksIntoBuf(pMnode, &execInfo); - } else { - // the already dropped tasks may be added by hb from vnode at the time when the pTaskMap happens to be empty. - // let's drop them here. - removeDroppedStreamTasksInBuf(pMnode, &execInfo); - } - - extractStreamNodeList(pMnode); - int32_t numOfUpdated = taosArrayGetSize(req.pUpdateNodes); if (numOfUpdated > 0) { mDebug("%d stream node(s) need updated from hbMsg(vgId:%d)", numOfUpdated, req.vgId); diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index d138254afd..7e72f67f45 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -645,4 +645,46 @@ void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) { taosThreadMutexUnlock(&pExecNode->lock); destroyStreamTaskIter(pIter); +} + +static bool taskNodeExists(SArray *pList, int32_t nodeId) { + size_t num = taosArrayGetSize(pList); + + for (int32_t i = 0; i < num; ++i) { + SNodeEntry *pEntry = taosArrayGet(pList, i); + if (pEntry->nodeId == nodeId) { + return true; + } + } + + return false; +} + +int32_t removeExpiredNodeEntryAndTaskInBuf(SArray *pNodeSnapshot) { + SArray *pRemovedTasks = taosArrayInit(4, sizeof(STaskId)); + + int32_t numOfTask = taosArrayGetSize(execInfo.pTaskList); + for (int32_t i = 0; i < numOfTask; ++i) { + STaskId *pId = taosArrayGet(execInfo.pTaskList, i); + + STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, pId, sizeof(*pId)); + if (pEntry->nodeId == SNODE_HANDLE) { + continue; + } + + bool existed = taskNodeExists(pNodeSnapshot, pEntry->nodeId); + if (!existed) { + taosArrayPush(pRemovedTasks, pId); + } + } + + removeTasksInBuf(pRemovedTasks, &execInfo); + + mDebug("remove invalid stream tasks:%d, remain:%d", (int32_t)taosArrayGetSize(pRemovedTasks), + (int32_t)taosArrayGetSize(execInfo.pTaskList)); + + removeExpiredNodeInfo(pNodeSnapshot); + + taosArrayDestroy(pRemovedTasks); + return 0; } \ No newline at end of file From cbf8b363fc3317f4c638531a220785b6da3857e2 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 10 Jun 2024 00:46:01 +0800 Subject: [PATCH 02/21] fix(stream): update checkpoint info by using trans. --- include/common/tmsg.h | 4 - include/common/tmsgdef.h | 5 +- include/dnode/vnode/tqCommon.h | 1 + include/libs/stream/streammsg.h | 14 ++ include/libs/stream/tstream.h | 3 +- source/dnode/mgmt/mgmt_mnode/src/mmHandle.c | 2 + source/dnode/mgmt/mgmt_snode/src/smHandle.c | 9 +- source/dnode/mgmt/mgmt_vnode/src/vmHandle.c | 1 + source/dnode/mnode/impl/inc/mndStream.h | 34 +++- source/dnode/mnode/impl/src/mndMain.c | 2 - source/dnode/mnode/impl/src/mndStream.c | 201 +++++++++++++++++++- source/dnode/mnode/impl/src/mndStreamHb.c | 19 +- source/dnode/mnode/impl/src/mndStreamUtil.c | 112 +++++++++++ source/dnode/snode/src/snode.c | 2 + source/dnode/vnode/src/inc/vnodeInt.h | 1 + source/dnode/vnode/src/tq/tq.c | 4 + source/dnode/vnode/src/tqCommon/tqCommon.c | 2 + source/dnode/vnode/src/vnd/vnodeSvr.c | 2 + source/libs/stream/src/streamCheckpoint.c | 22 ++- source/libs/stream/src/streamTask.c | 65 ++++--- source/libs/stream/src/streammsg.c | 28 +++ 21 files changed, 465 insertions(+), 68 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index fa294c3fc5..98ca414466 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -3627,10 +3627,6 @@ typedef struct { int32_t taskId; } SVPauseStreamTaskReq, SVResetStreamTaskReq; -typedef struct { - int8_t reserved; -} SVPauseStreamTaskRsp; - typedef struct { char name[TSDB_STREAM_FNAME_LEN]; int8_t igNotExists; diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index d1ac2c79c3..db5ae21692 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -207,9 +207,9 @@ TD_DEF_MSG_TYPE(TDMT_MND_RESTORE_DNODE, "restore-dnode", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_PAUSE_STREAM, "pause-stream", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_RESUME_STREAM, "resume-stream", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_MND_STREAM_CHECKPOINT_TIMER, "stream-checkpoint-tmr", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_STREAM_CHECKPOINT_TIMER, "stream-checkpoint-tmr", NULL, NULL) // unused,reserved TD_DEF_MSG_TYPE(TDMT_MND_STREAM_BEGIN_CHECKPOINT, "stream-begin-checkpoint", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_MND_STREAM_CHECKPOINT_CANDIDITATE, "stream-checkpoint-remain", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_STREAM_CHKPT_REPORT, "stream-chkpt-report", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_STREAM_NODECHANGE_CHECK, "stream-nodechange-check", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_TRIM_DB_TIMER, "trim-db-tmr", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_GRANT_NOTIFY, "grant-notify", NULL, NULL) @@ -370,6 +370,7 @@ TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TASK_RESET, "vnode-stream-reset", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TASK_CHECK, "vnode-stream-task-check", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_STREAM_MAX_MSG, "vnd-stream-max", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_GET_STREAM_PROGRESS, "vnd-stream-progress", NULL, NULL) TD_CLOSE_MSG_SEG(TDMT_END_VND_STREAM_MSG) diff --git a/include/dnode/vnode/tqCommon.h b/include/dnode/vnode/tqCommon.h index 0076d79312..0fb690e756 100644 --- a/include/dnode/vnode/tqCommon.h +++ b/include/dnode/vnode/tqCommon.h @@ -28,6 +28,7 @@ int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLe int32_t tqStreamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg); int32_t tqStreamProcessStreamHbRsp(SStreamMeta* pMeta, SRpcMsg* pMsg); int32_t tqStreamProcessReqCheckpointRsp(SStreamMeta* pMeta, SRpcMsg* pMsg); +int32_t tqStreamProcessChkptReportRsp(SStreamMeta* pMeta, SRpcMsg* pMsg); int32_t tqStreamProcessCheckpointReadyRsp(SStreamMeta* pMeta, SRpcMsg* pMsg); int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sversion, char* msg, int32_t msgLen, bool isLeader, bool restored); diff --git a/include/libs/stream/streammsg.h b/include/libs/stream/streammsg.h index 91bfc6afc8..723c9fd099 100644 --- a/include/libs/stream/streammsg.h +++ b/include/libs/stream/streammsg.h @@ -190,6 +190,20 @@ typedef struct SCheckpointTriggerRsp { int32_t rspCode; } SCheckpointTriggerRsp; +typedef struct SCheckpointReport { + int64_t streamId; + int32_t taskId; + int32_t nodeId; + int64_t checkpointId; + int64_t checkpointVer; + int64_t checkpointTs; + int32_t transId; + int8_t dropHTask; +} SCheckpointReport; + +int32_t tEncodeStreamTaskChkptReport(SEncoder* pEncoder, const SCheckpointReport* pReq); +int32_t tDecodeStreamTaskChkptReport(SDecoder* pDecoder, SCheckpointReport* pReq); + typedef struct { SMsgHead head; int64_t streamId; diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index d07a302920..1996c2ed63 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -762,8 +762,7 @@ int32_t streamBuildAndSendDropTaskMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskI int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SStreamTask* pTask); int32_t streamTaskBuildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SRpcMsg* pMsg, int32_t setCode); -int32_t streamBuildAndSendCheckpointUpdateMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskId* pTaskId, STaskId* pHTaskId, - SCheckpointInfo* pCheckpointInfo, int8_t dropRelHTask); +int32_t streamSendChkptReportMsg(SStreamTask* pTask, SCheckpointInfo* pCheckpointInfo, int8_t dropRelHTask); int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, SVUpdateCheckpointInfoReq* pReq); SActiveCheckpointInfo* streamTaskCreateActiveChkptInfo(); diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index e4afba6722..3d5da860ad 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -232,6 +232,7 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_STOP_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_UPDATE_CHKPT_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_CREATE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_DROP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_CREATE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; @@ -240,6 +241,7 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_REPORT, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_KILL_COMPACT_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mgmt/mgmt_snode/src/smHandle.c b/source/dnode/mgmt/mgmt_snode/src/smHandle.c index 9b07b6a3d8..856d7b2051 100644 --- a/source/dnode/mgmt/mgmt_snode/src/smHandle.c +++ b/source/dnode/mgmt/mgmt_snode/src/smHandle.c @@ -75,26 +75,27 @@ SArray *smGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_UPDATE_CHKPT, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DROP, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RUN, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_UPDATE_CHKPT, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_STOP, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_TRIGGER, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_TRIGGER_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_GET_STREAM_PROGRESS, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_GET_STREAM_PROGRESS, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_REPORT_RSP, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; code = 0; _OVER: diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index bfc9e92293..41b9352f18 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -967,6 +967,7 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_REPORT_RSP, vmPutMsgToWriteQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_GET_STREAM_PROGRESS, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_UPDATE_CHKPT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index c8a967620c..6ca61265bb 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -26,13 +26,14 @@ extern "C" { #define MND_STREAM_RESERVE_SIZE 64 #define MND_STREAM_VER_NUMBER 5 -#define MND_STREAM_CREATE_NAME "stream-create" -#define MND_STREAM_CHECKPOINT_NAME "stream-checkpoint" -#define MND_STREAM_PAUSE_NAME "stream-pause" -#define MND_STREAM_RESUME_NAME "stream-resume" -#define MND_STREAM_DROP_NAME "stream-drop" -#define MND_STREAM_TASK_RESET_NAME "stream-task-reset" -#define MND_STREAM_TASK_UPDATE_NAME "stream-task-update" +#define MND_STREAM_CREATE_NAME "stream-create" +#define MND_STREAM_CHECKPOINT_NAME "stream-checkpoint" +#define MND_STREAM_PAUSE_NAME "stream-pause" +#define MND_STREAM_RESUME_NAME "stream-resume" +#define MND_STREAM_DROP_NAME "stream-drop" +#define MND_STREAM_TASK_RESET_NAME "stream-task-reset" +#define MND_STREAM_TASK_UPDATE_NAME "stream-task-update" +#define MND_STREAM_CHKPT_UPDATE_NAME "stream-chkpt-update" typedef struct SStreamTransInfo { int64_t startTime; @@ -51,6 +52,7 @@ typedef struct SStreamTransMgmt { } SStreamTransMgmt; typedef struct SStreamExecInfo { + bool initTaskList; SArray *pNodeList; int64_t ts; // snapshot ts SStreamTransMgmt transMgmt; @@ -58,6 +60,7 @@ typedef struct SStreamExecInfo { SArray *pTaskList; TdThreadMutex lock; SHashObj *pTransferStateStreams; + SHashObj *pChkptStreams; } SStreamExecInfo; extern SStreamExecInfo execInfo; @@ -78,7 +81,18 @@ typedef struct SOrphanTask { typedef struct { SMsgHead head; -} SMStreamHbRspMsg, SMStreamReqCheckpointRspMsg; +} SMStreamHbRspMsg, SMStreamReqCheckpointRsp, SMStreamUpdateChkptRsp; + +typedef struct STaskChkptInfo { + int32_t nodeId; + int32_t taskId; + int64_t streamId; + int64_t checkpointId; + int64_t version; + int64_t ts; + int32_t transId; + int8_t dropHTask; +}STaskChkptInfo; int32_t mndInitStream(SMnode *pMnode); void mndCleanupStream(SMnode *pMnode); @@ -114,6 +128,10 @@ int32_t mndStreamSetDropAction(SMnode *pMnode, STrans *pTrans, SStreamObj *p int32_t mndStreamSetDropActionFromList(SMnode *pMnode, STrans *pTrans, SArray *pList); int32_t mndStreamSetResetTaskAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream); +int32_t mndStreamSetUpdateChkptAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); +int32_t mndCreateStreamChkptInfoUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SArray *pChkptInfoList); +void scanCheckpointReportInfo(SMnode *pMnode); +void removeTasksInBuf(SArray *pTaskIds, SStreamExecInfo *pExecInfo); SStreamTaskIter *createStreamTaskIter(SStreamObj *pStream); void destroyStreamTaskIter(SStreamTaskIter *pIter); diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index 9252843d2f..cad8c6d745 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -680,8 +680,6 @@ SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) { return NULL; } - mndInitStreamExecInfo(pMnode, &execInfo); - mInfo("mnode open successfully"); return pMnode; } diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 1f1eaa999e..7ee34d502a 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -58,10 +58,10 @@ static int32_t mndProcessNodeCheck(SRpcMsg *pReq); static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg); static int32_t extractNodeListFromStream(SMnode *pMnode, SArray* pNodeList); static int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq); +static int32_t mndProcessCheckpointReport(SRpcMsg *pReq); static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList); -static void removeTasksInBuf(SArray *pTaskIds, SStreamExecInfo *pExecInfo); static void addAllStreamTasksIntoBuf(SMnode *pMnode, SStreamExecInfo *pExecInfo); static void removeExpiredNodeInfo(const SArray *pNodeSnapshot); static int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDbName, size_t len); @@ -104,6 +104,7 @@ int32_t mndInitStream(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_STOP_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_VND_STREAM_TASK_UPDATE_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_VND_STREAM_TASK_RESET_RSP, mndTransProcessRsp); + mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_UPDATE_CHKPT_RSP, mndTransProcessRsp); // for msgs inside mnode // TODO change the name @@ -114,6 +115,7 @@ int32_t mndInitStream(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_VND_STREAM_CHECK_POINT_SOURCE_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_MND_STREAM_BEGIN_CHECKPOINT, mndProcessStreamCheckpoint); + mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CHKPT_REPORT, mndProcessCheckpointReport); mndSetMsgHandle(pMnode, TDMT_MND_STREAM_REQ_CHKPT, mndProcessStreamReqCheckpoint); mndSetMsgHandle(pMnode, TDMT_MND_STREAM_HEARTBEAT, mndProcessStreamHb); mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_REPORT_CHECKPOINT, mndTransProcessRsp); @@ -1091,7 +1093,7 @@ static bool taskNodeIsUpdated(SMnode *pMnode) { } static int32_t mndCheckTaskAndNodeStatus(SMnode *pMnode) { - bool ready = true; + bool ready = true; if (taskNodeIsUpdated(pMnode)) { return -1; } @@ -1102,6 +1104,8 @@ static int32_t mndCheckTaskAndNodeStatus(SMnode *pMnode) { ASSERT(taosArrayGetSize(execInfo.pTaskList) == 0); } + SArray* pInvalidList = taosArrayInit(4, sizeof(STaskId)); + for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) { STaskId *p = taosArrayGet(execInfo.pTaskList, i); STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p)); @@ -1109,11 +1113,32 @@ static int32_t mndCheckTaskAndNodeStatus(SMnode *pMnode) { continue; } + // for stopped stream task entry, we do additional check for it + if (pEntry->status == TASK_STATUS__STOP) { + bool invalid = false; + for(int32_t j = 0; j < taosArrayGetSize(pInvalidList); ++j) { + STaskId* pId = taosArrayGet(pInvalidList, j); + if (pEntry->id.streamId == pId->streamId) { + taosArrayPush(pInvalidList, &pEntry->id); + invalid = true; + break; + } + } + + if (!invalid) { + SStreamObj *pObj = mndGetStreamObj(pMnode, pEntry->id.streamId); + if (pObj == NULL) { + taosArrayPush(pInvalidList, &pEntry->id); + } else { + mndReleaseStream(pMnode, pObj); + } + } + } + if (pEntry->status != TASK_STATUS__READY) { mDebug("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s not ready, checkpoint msg not issued", pEntry->id.streamId, (int32_t)pEntry->id.taskId, pEntry->nodeId, streamTaskGetStatusStr(pEntry->status)); ready = false; - break; } if (pEntry->hTaskId != 0) { @@ -1126,6 +1151,8 @@ static int32_t mndCheckTaskAndNodeStatus(SMnode *pMnode) { } } + removeTasksInBuf(pInvalidList, &execInfo); + taosThreadMutexUnlock(&execInfo.lock); return ready ? 0 : -1; } @@ -1801,6 +1828,10 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock int32_t numOfRows = 0; SStreamObj *pStream = NULL; + taosThreadMutexLock(&execInfo.lock); + mndInitStreamExecInfo(pMnode, &execInfo); + taosThreadMutexUnlock(&execInfo.lock); + while (numOfRows < rowsCapacity) { pShow->pIter = sdbFetch(pSdb, SDB_STREAM, pShow->pIter, (void **)&pStream); if (pShow->pIter == NULL) { @@ -2437,13 +2468,136 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) { taosThreadMutexUnlock(&execInfo.lock); { - SRpcMsg rsp = {.code = 0, .info = pReq->info, .contLen = sizeof(SMStreamReqCheckpointRspMsg)}; + SRpcMsg rsp = {.code = 0, .info = pReq->info, .contLen = sizeof(SMStreamReqCheckpointRsp)}; rsp.pCont = rpcMallocCont(rsp.contLen); SMsgHead *pHead = rsp.pCont; pHead->vgId = htonl(req.nodeId); tmsgSendRsp(&rsp); + pReq->info.handle = NULL; // disable auto rsp + } + return 0; +} + +static void doAddTaskInfo(SArray* pList, SCheckpointReport* pReport) { + bool existed = false; + for(int32_t i = 0; i < taosArrayGetSize(pList); ++i) { + STaskChkptInfo* p = taosArrayGet(pList ,i); + if (p->taskId == pReport->taskId) { + existed = true; + break; + } + } + + if (!existed) { + STaskChkptInfo info = { + .streamId = pReport->streamId, + .taskId = pReport->taskId, + .transId = pReport->transId, + .dropHTask = pReport->dropHTask, + .version = pReport->checkpointVer, + .ts = pReport->checkpointTs, + .checkpointId = pReport->checkpointId, + .nodeId = pReport->nodeId, + }; + taosArrayPush(pList, &info); + } +} + +int32_t mndProcessCheckpointReport(SRpcMsg *pReq) { + SMnode *pMnode = pReq->info.node; + SCheckpointReport req = {0}; + + SDecoder decoder = {0}; + tDecoderInit(&decoder, pReq->pCont, pReq->contLen); + + if (tDecodeStreamTaskChkptReport(&decoder, &req)) { + tDecoderClear(&decoder); + terrno = TSDB_CODE_INVALID_MSG; + mError("invalid task checkpoint-report msg received"); + return -1; + } + tDecoderClear(&decoder); + + mDebug("receive stream task checkpoint-report msg, vgId:%d, s-task:0x%x, checkpointId:%" PRId64 + " checkpointVer:%" PRId64 " transId:%d", + req.nodeId, req.taskId, req.checkpointId, req.checkpointVer, req.transId); + + // register to the stream task done map, if all tasks has sent this kinds of message, start the checkpoint trans. + taosThreadMutexLock(&execInfo.lock); + + SStreamObj *pStream = mndGetStreamObj(pMnode, req.streamId); + if (pStream == NULL) { + mWarn("failed to find the stream:0x%" PRIx64 ", not handle checkpoint-report, try to acquire in buf", + req.streamId); + + // not in meta-store yet, try to acquire the task in exec buffer + // the checkpoint req arrives too soon before the completion of the create stream trans. + STaskId id = {.streamId = req.streamId, .taskId = req.taskId}; + void *p = taosHashGet(execInfo.pTaskMap, &id, sizeof(id)); + if (p == NULL) { + mError("failed to find the stream:0x%" PRIx64 " in buf, not handle the checkpoint-report", req.streamId); + terrno = TSDB_CODE_MND_STREAM_NOT_EXIST; + taosThreadMutexUnlock(&execInfo.lock); + return -1; + } else { + mDebug("s-task:0x%" PRIx64 "-0x%x in buf not in mnode/meta, create stream trans may not complete yet", + req.streamId, req.taskId); + } + } + + int32_t numOfTasks = (pStream == NULL) ? 0 : mndGetNumOfStreamTasks(pStream); + + SArray **pReqTaskList = (SArray **)taosHashGet(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId)); + if (pReqTaskList == NULL) { + SArray *pList = taosArrayInit(4, sizeof(STaskChkptInfo)); + doAddTaskInfo(pList, &req); + taosHashPut(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId), &pList, POINTER_BYTES); + + pReqTaskList = (SArray **)taosHashGet(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId)); + } else { + doAddTaskInfo(*pReqTaskList, &req); + } + + int32_t total = taosArrayGetSize(*pReqTaskList); + if (total == numOfTasks) { // all tasks has send the reqs + mInfo("stream:0x%" PRIx64 + " %s all %d tasks send checkpoint-report, start to update checkpoint meta-info for checkpointId:%" PRId64, + req.streamId, pStream->name, total, req.checkpointId); + + if (pStream != NULL) { + bool conflict = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHKPT_UPDATE_NAME, false); + if (conflict) { + mDebug("stream:%x%"PRIx64" active checkpoint trans not finished yet, wait", req.streamId); + } else { + int32_t code = mndCreateStreamChkptInfoUpdateTrans(pMnode, pStream, *pReqTaskList); + if (code == TSDB_CODE_SUCCESS) { // remove this entry + taosHashRemove(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId)); + + int32_t numOfStreams = taosHashGetSize(execInfo.pChkptStreams); + mDebug("stream:0x%" PRIx64 " removed, remain streams:%d in checkpoint procedure", req.streamId, numOfStreams); + } else { + mDebug("stream:0x%" PRIx64 " not launch chkpt update trans, due to checkpoint not finished yet", + req.streamId); + } + } + } + } + + if (pStream != NULL) { + mndReleaseStream(pMnode, pStream); + } + + taosThreadMutexUnlock(&execInfo.lock); + + { + SRpcMsg rsp = {.code = 0, .info = pReq->info, .contLen = sizeof(SMStreamUpdateChkptRsp)}; + rsp.pCont = rpcMallocCont(rsp.contLen); + SMsgHead *pHead = rsp.pCont; + pHead->vgId = htonl(req.nodeId); + + tmsgSendRsp(&rsp); pReq->info.handle = NULL; // disable auto rsp } @@ -2473,8 +2627,13 @@ static int32_t mndProcessDropStreamReqFromMNode(SRpcMsg *pReq) { } void mndInitStreamExecInfo(SMnode *pMnode, SStreamExecInfo *pExecInfo) { + if (pExecInfo->initTaskList) { + return; + } + addAllStreamTasksIntoBuf(pMnode, pExecInfo); extractNodeListFromStream(pMnode, pExecInfo->pNodeList); + pExecInfo->initTaskList = true; } void addAllStreamTasksIntoBuf(SMnode *pMnode, SStreamExecInfo* pExecInfo) { @@ -2491,4 +2650,38 @@ void addAllStreamTasksIntoBuf(SMnode *pMnode, SStreamExecInfo* pExecInfo) { saveTaskAndNodeInfoIntoBuf(pStream, pExecInfo); sdbRelease(pSdb, pStream); } +} + +int32_t mndCreateStreamChkptInfoUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SArray* pChkptInfoList) { + STrans *pTrans = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_CHKPT_UPDATE_NAME, "update checkpoint-info"); + if (pTrans == NULL) { + return terrno; + } + + /*int32_t code = */mndStreamRegisterTrans(pTrans, MND_STREAM_CHKPT_UPDATE_NAME, pStream->uid); + int32_t code = mndStreamSetUpdateChkptAction(pMnode, pTrans, pStream); + if (code != 0) { + sdbRelease(pMnode->pSdb, pStream); + mndTransDrop(pTrans); + return code; + } + + code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY); + if (code != TSDB_CODE_SUCCESS) { + sdbRelease(pMnode->pSdb, pStream); + mndTransDrop(pTrans); + return -1; + } + + if (mndTransPrepare(pMnode, pTrans) != 0) { + mError("trans:%d, failed to prepare update checkpoint-info meta trans since %s", pTrans->id, terrstr()); + sdbRelease(pMnode->pSdb, pStream); + mndTransDrop(pTrans); + return -1; + } + + sdbRelease(pMnode->pSdb, pStream); + mndTransDrop(pTrans); + + return TSDB_CODE_ACTION_IN_PROGRESS; } \ No newline at end of file diff --git a/source/dnode/mnode/impl/src/mndStreamHb.c b/source/dnode/mnode/impl/src/mndStreamHb.c index 6f398fbc11..7c07d003b7 100644 --- a/source/dnode/mnode/impl/src/mndStreamHb.c +++ b/source/dnode/mnode/impl/src/mndStreamHb.c @@ -241,6 +241,8 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { taosThreadMutexLock(&execInfo.lock); + mndInitStreamExecInfo(pMnode, &execInfo); + int32_t numOfUpdated = taosArrayGetSize(req.pUpdateNodes); if (numOfUpdated > 0) { mDebug("%d stream node(s) need updated from hbMsg(vgId:%d)", numOfUpdated, req.vgId); @@ -266,18 +268,6 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { snodeChanged = true; } } else { - // task is idle for more than 50 sec. -// if (fabs(pTaskEntry->inputQUsed - p->inputQUsed) <= DBL_EPSILON) { -// if (!pTaskEntry->inputQChanging) { -// pTaskEntry->inputQUnchangeCounter++; -// } else { -// pTaskEntry->inputQChanging = false; -// } -// } else { -// pTaskEntry->inputQChanging = true; -// pTaskEntry->inputQUnchangeCounter = 0; -// } - streamTaskStatusCopy(pTaskEntry, p); STaskCkptInfo *pChkInfo = &p->checkpointInfo; @@ -288,6 +278,9 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { SFailedCheckpointInfo info = { .transId = pChkInfo->activeTransId, .checkpointId = pChkInfo->activeId, .streamUid = p->id.streamId}; addIntoCheckpointList(pFailedChkpt, &info); + + // remove failed trans from pChkptStreams + taosHashRemove(execInfo.pChkptStreams, &p->id.streamId, sizeof(p->id.streamId)); } } @@ -333,6 +326,8 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { mndDropOrphanTasks(pMnode, pOrphanTasks); } + scanCheckpointReportInfo(pMnode); + taosThreadMutexUnlock(&execInfo.lock); tCleanupStreamHbMsg(&req); diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index 7e72f67f45..695c33f646 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -574,6 +574,7 @@ void mndInitExecInfo() { execInfo.pTaskMap = taosHashInit(64, fn, true, HASH_NO_LOCK); execInfo.transMgmt.pDBTrans = taosHashInit(32, fn, true, HASH_NO_LOCK); execInfo.pTransferStateStreams = taosHashInit(32, fn, true, HASH_NO_LOCK); + execInfo.pChkptStreams = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); execInfo.pNodeList = taosArrayInit(4, sizeof(SNodeEntry)); taosHashSetFreeFp(execInfo.pTransferStateStreams, freeTaskList); @@ -687,4 +688,115 @@ int32_t removeExpiredNodeEntryAndTaskInBuf(SArray *pNodeSnapshot) { taosArrayDestroy(pRemovedTasks); return 0; +} + +static int32_t doSetUpdateChkptAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask) { + SVUpdateCheckpointInfoReq *pReq = taosMemoryCalloc(1, sizeof(SVUpdateCheckpointInfoReq)); + if (pReq == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + mError("failed to malloc in reset stream, size:%" PRIzu ", code:%s", sizeof(SVUpdateCheckpointInfoReq), + tstrerror(TSDB_CODE_OUT_OF_MEMORY)); + return terrno; + } + + pReq->head.vgId = htonl(pTask->info.nodeId); + pReq->taskId = pTask->id.taskId; + pReq->streamId = pTask->id.streamId; + + SArray **pReqTaskList = (SArray **)taosHashGet(execInfo.pChkptStreams, &pTask->id.streamId, sizeof(pTask->id.streamId)); + ASSERT(pReqTaskList); + + int32_t size = taosArrayGetSize(*pReqTaskList); + for(int32_t i = 0; i < size; ++i) { + STaskChkptInfo* pInfo = taosArrayGet(*pReqTaskList, i); + if (pInfo->taskId == pTask->id.taskId) { + pReq->checkpointId = pInfo->checkpointId; + pReq->checkpointVer = pInfo->version; + pReq->checkpointTs = pInfo->ts; + pReq->dropRelHTask = pInfo->dropHTask; + pReq->transId = pInfo->transId; + } + } + + SEpSet epset = {0}; + bool hasEpset = false; + int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId); + if (code != TSDB_CODE_SUCCESS || !hasEpset) { + taosMemoryFree(pReq); + return code; + } + + code = setTransAction(pTrans, pReq, sizeof(SVUpdateCheckpointInfoReq), TDMT_STREAM_TASK_UPDATE_CHKPT, &epset, 0); + if (code != TSDB_CODE_SUCCESS) { + taosMemoryFree(pReq); + } + + return code; +} + +int32_t mndStreamSetUpdateChkptAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) { + taosWLockLatch(&pStream->lock); + + SStreamTaskIter *pIter = createStreamTaskIter(pStream); + while (streamTaskIterNextTask(pIter)) { + SStreamTask *pTask = streamTaskIterGetCurrent(pIter); + + int32_t code = doSetUpdateChkptAction(pMnode, pTrans, pTask); + if (code != TSDB_CODE_SUCCESS) { + destroyStreamTaskIter(pIter); + taosWUnLockLatch(&pStream->lock); + return -1; + } + } + + destroyStreamTaskIter(pIter); + taosWUnLockLatch(&pStream->lock); + return 0; +} + +void scanCheckpointReportInfo(SMnode* pMnode) { + void* pIter = NULL; + mDebug("start to scan checkpoint report info"); + + while ((pIter = taosHashIterate(execInfo.pChkptStreams, pIter)) != NULL) { + SArray *pList = *(SArray **)pIter; + + STaskChkptInfo* pInfo = taosArrayGet(pList, 0); + SStreamObj* pStream = mndGetStreamObj(pMnode, pInfo->streamId); + if (pStream == NULL) { + mError("failed to acquire stream:0x%"PRIx64" remove it from checkpoint-report list", pInfo->streamId); + taosHashRemove(execInfo.pChkptStreams, &pInfo->streamId, sizeof(pInfo->streamId)); + continue; + } + + int32_t total = mndGetNumOfStreamTasks(pStream); + int32_t existed = (int32_t) taosArrayGetSize(pList); + + if (total == existed) { + mDebug("stream:0x%" PRIx64 " %s all %d tasks send checkpoint-report, start to update checkpoint-info", + pStream->uid, pStream->name, total); + + bool conflict = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHKPT_UPDATE_NAME, false); + if (!conflict) { + int32_t code = mndCreateStreamChkptInfoUpdateTrans(pMnode, pStream, pList); + if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) { // remove this entry + taosHashRemove(execInfo.pChkptStreams, &pInfo->streamId, sizeof(pInfo->streamId)); + + int32_t numOfStreams = taosHashGetSize(execInfo.pChkptStreams); + mDebug("stream:0x%" PRIx64 " removed, remain streams:%d in checkpoint procedure", pInfo->streamId, numOfStreams); + } else { + mDebug("stream:0x%" PRIx64 " not launch chkpt update trans, due to checkpoint not finished yet", + pInfo->streamId); + } + } else { + mDebug("stream:%x%"PRIx64" active checkpoint trans not finished yet, wait", pInfo->streamId); + } + } else { + mDebug("stream:0x%" PRIx64 " %s %d/%d tasks send checkpoint-report, %d not send", pInfo->streamId, pStream->name, + existed, total, total - existed); + } + + sdbRelease(pMnode->pSdb, pStream); + } + } \ No newline at end of file diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index c61988574c..b0d61ebc06 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -136,6 +136,8 @@ int32_t sndProcessStreamMsg(SSnode *pSnode, SRpcMsg *pMsg) { return tqStreamProcessReqCheckpointRsp(pSnode->pMeta, pMsg); case TDMT_STREAM_TASK_CHECKPOINT_READY_RSP: return tqStreamProcessCheckpointReadyRsp(pSnode->pMeta, pMsg); + case TDMT_MND_STREAM_CHKPT_REPORT_RSP: + return tqStreamProcessChkptReportRsp(pSnode->pMeta, pMsg); case TDMT_STREAM_RETRIEVE_TRIGGER: return tqStreamTaskProcessRetrieveTriggerReq(pSnode->pMeta, pMsg); case TDMT_STREAM_RETRIEVE_TRIGGER_RSP: diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 8222af4d60..7f5ab8b6e6 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -261,6 +261,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessStreamHbRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessStreamReqCheckpointRsp(STQ* pTq, SRpcMsg* pMsg); +int32_t tqProcessTaskChkptReportRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskCheckpointReadyRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 712cfbaa55..a2ca3662d7 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1278,3 +1278,7 @@ int32_t tqProcessStreamReqCheckpointRsp(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskCheckpointReadyRsp(STQ* pTq, SRpcMsg* pMsg) { return tqStreamProcessCheckpointReadyRsp(pTq->pStreamMeta, pMsg); } + +int32_t tqProcessTaskChkptReportRsp(STQ* pTq, SRpcMsg* pMsg) { + return tqStreamProcessChkptReportRsp(pTq->pStreamMeta, pMsg); +} diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index c55745e5c5..7c8ad159bc 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -1078,6 +1078,8 @@ int32_t tqStreamProcessStreamHbRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { return d int32_t tqStreamProcessReqCheckpointRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { return doProcessDummyRspMsg(pMeta, pMsg); } +int32_t tqStreamProcessChkptReportRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {return doProcessDummyRspMsg(pMeta, pMsg);} + int32_t tqStreamProcessCheckpointReadyRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { SMStreamCheckpointReadyRspMsg* pRsp = pMsg->pCont; diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 002f04b8a7..b1f353af81 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -854,6 +854,8 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) return tqProcessStreamReqCheckpointRsp(pVnode->pTq, pMsg); case TDMT_VND_GET_STREAM_PROGRESS: return tqStreamProgressRetrieveReq(pVnode->pTq, pMsg); + case TDMT_MND_STREAM_CHKPT_REPORT_RSP: + return tqProcessTaskChkptReportRsp(pVnode->pTq, pMsg); default: vError("unknown msg type:%d in stream queue", pMsg->msgType); return TSDB_CODE_APP_ERROR; diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index ab3b5d6fa0..56ac794d8e 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -441,8 +441,10 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, SVUpdateCheckpoin if (pReq->checkpointId <= pInfo->checkpointId) { stDebug("s-task:%s vgId:%d latest checkpointId:%" PRId64 " checkpointVer:%" PRId64 - " no need to update the checkpoint info, updated checkpointId:%" PRId64 " checkpointVer:%" PRId64 " ignored", - id, vgId, pInfo->checkpointId, pInfo->checkpointVer, pReq->checkpointId, pReq->checkpointVer); + " no need to update the checkpoint info, updated checkpointId:%" PRId64 " checkpointVer:%" PRId64 + " transId:%d ignored", + id, vgId, pInfo->checkpointId, pInfo->checkpointVer, pReq->checkpointId, pReq->checkpointVer, + pReq->transId); taosThreadMutexUnlock(&pTask->lock); { // destroy the related fill-history tasks @@ -703,9 +705,7 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { // update the latest checkpoint info if all works are done successfully, for rsma, the pMsgCb is null. if (code == TSDB_CODE_SUCCESS && (pTask->pMsgCb != NULL)) { - STaskId* pHTaskId = &pTask->hTaskInfo.id; - code = streamBuildAndSendCheckpointUpdateMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id, pHTaskId, &pTask->chkInfo, - dropRelHTask); + code = streamSendChkptReportMsg(pTask, &pTask->chkInfo, dropRelHTask); if (code == TSDB_CODE_SUCCESS) { code = streamTaskRemoteBackupCheckpoint(pTask, ckId, (char*)id); if (code != TSDB_CODE_SUCCESS) { @@ -770,6 +770,18 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) { streamMetaReleaseTask(pTask->pMeta, pTask); return; } + + // checkpoint-trigger recv flag is set, quit + if (pActiveInfo->allUpstreamTriggerRecv) { + int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + stDebug("s-task:%s vgId:%d all checkpoint-trigger recv, quit from monitor checkpoint-trigger, ref:%d", + pTask->id.idStr, vgId, ref); + + taosThreadMutexUnlock(&pTask->lock); + streamMetaReleaseTask(pTask->pMeta, pTask); + return; + } + taosThreadMutexUnlock(&pTask->lock); taosThreadMutexLock(&pActiveInfo->lock); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index dc9d2166e6..f8a4deb6b1 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -731,35 +731,50 @@ int32_t streamBuildAndSendDropTaskMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskI return code; } -int32_t streamBuildAndSendCheckpointUpdateMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskId* pTaskId, STaskId* pHTaskId, - SCheckpointInfo* pCheckpointInfo, int8_t dropRelHTask) { - SVUpdateCheckpointInfoReq* pReq = rpcMallocCont(sizeof(SVUpdateCheckpointInfoReq)); - if (pReq == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; +int32_t streamSendChkptReportMsg(SStreamTask* pTask, SCheckpointInfo* pCheckpointInfo, int8_t dropRelHTask) { + int32_t code; + int32_t tlen = 0; + int32_t vgId = pTask->pMeta->vgId; + const char* id = pTask->id.idStr; + SActiveCheckpointInfo* pActive = pCheckpointInfo->pActiveInfo; + + SCheckpointReport req = {.streamId = pTask->id.streamId, + .taskId = pTask->id.taskId, + .nodeId = vgId, + .dropHTask = dropRelHTask, + .transId = pActive->transId, + .checkpointId = pActive->activeId, + .checkpointVer = pCheckpointInfo->processedVer, + .checkpointTs = pCheckpointInfo->startTs}; + + tEncodeSize(tEncodeStreamTaskChkptReport, &req, tlen, code); + if (code < 0) { + stError("s-task:%s vgId:%d encode stream task checkpoint-report failed, code:%s", id, vgId, tstrerror(code)); return -1; } - pReq->head.vgId = vgId; - pReq->taskId = pTaskId->taskId; - pReq->streamId = pTaskId->streamId; - pReq->dropRelHTask = dropRelHTask; - pReq->hStreamId = pHTaskId->streamId; - pReq->hTaskId = pHTaskId->taskId; - pReq->transId = pCheckpointInfo->pActiveInfo->transId; - - pReq->checkpointId = pCheckpointInfo->pActiveInfo->activeId; - pReq->checkpointVer = pCheckpointInfo->processedVer; - pReq->checkpointTs = pCheckpointInfo->startTs; - - SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_UPDATE_CHKPT, .pCont = pReq, .contLen = sizeof(SVUpdateCheckpointInfoReq)}; - int32_t code = tmsgPutToQueue(pMsgCb, WRITE_QUEUE, &msg); - - if (code != TSDB_CODE_SUCCESS) { - stError("vgId:%d task:0x%x failed to send update checkpoint info msg, code:%s", vgId, pTaskId->taskId, tstrerror(code)); - } else { - stDebug("vgId:%d task:0x%x build and send update checkpoint info msg msg", vgId, pTaskId->taskId); + void* buf = rpcMallocCont(tlen); + if (buf == NULL) { + stError("s-task:%s vgId:%d encode stream task checkpoint-report msg failed, code:%s", id, vgId, + tstrerror(TSDB_CODE_OUT_OF_MEMORY)); + return -1; } - return code; + + SEncoder encoder; + tEncoderInit(&encoder, buf, tlen); + if ((code = tEncodeStreamTaskChkptReport(&encoder, &req)) < 0) { + rpcFreeCont(buf); + stError("s-task:%s vgId:%d encode stream task checkpoint-report msg failed, code:%s", id, vgId, tstrerror(code)); + return -1; + } + tEncoderClear(&encoder); + + SRpcMsg msg = {0}; + initRpcMsg(&msg, TDMT_MND_STREAM_CHKPT_REPORT, buf, tlen); + stDebug("s-task:%s vgId:%d build and send task checkpoint-report to mnode", id, vgId); + + tmsgSendReq(&pTask->info.mnodeEpset, &msg); + return 0; } STaskId streamTaskGetTaskId(const SStreamTask* pTask) { diff --git a/source/libs/stream/src/streammsg.c b/source/libs/stream/src/streammsg.c index 705406f044..a6ab6a60c2 100644 --- a/source/libs/stream/src/streammsg.c +++ b/source/libs/stream/src/streammsg.c @@ -594,6 +594,34 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { } if (tDecodeCStrTo(pDecoder, pTask->reserve) < 0) return -1; + tEndDecode(pDecoder); + return 0; +} + +int32_t tEncodeStreamTaskChkptReport(SEncoder* pEncoder, const SCheckpointReport* pReq) { + if (tStartEncode(pEncoder) < 0) return -1; + if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->nodeId) < 0) return -1; + if (tEncodeI64(pEncoder, pReq->checkpointId) < 0) return -1; + if (tEncodeI64(pEncoder, pReq->checkpointVer) < 0) return -1; + if (tEncodeI64(pEncoder, pReq->checkpointTs) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->transId) < 0) return -1; + if (tEncodeI8(pEncoder, pReq->dropHTask) < 0) return -1; + tEndEncode(pEncoder); + return 0; +} + +int32_t tDecodeStreamTaskChkptReport(SDecoder* pDecoder, SCheckpointReport* pReq) { + if (tStartDecode(pDecoder) < 0) return -1; + if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->nodeId) < 0) return -1; + if (tDecodeI64(pDecoder, &pReq->checkpointId) < 0) return -1; + if (tDecodeI64(pDecoder, &pReq->checkpointVer) < 0) return -1; + if (tDecodeI64(pDecoder, &pReq->checkpointTs) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->transId) < 0) return -1; + if (tDecodeI8(pDecoder, &pReq->dropHTask) < 0) return -1; tEndDecode(pDecoder); return 0; } \ No newline at end of file From 6effeabc8e2a55d6cff6243afc1985044b514b1c Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 11 Jun 2024 10:53:32 +0800 Subject: [PATCH 03/21] fix(stream): handle checkpoint-info-update in write queue. --- source/dnode/mgmt/mgmt_mnode/src/mmHandle.c | 2 +- source/dnode/mnode/impl/src/mndStreamTrans.c | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index 3d5da860ad..e137bcbdec 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -241,7 +241,7 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_REPORT, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_REPORT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_KILL_COMPACT_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mnode/impl/src/mndStreamTrans.c b/source/dnode/mnode/impl/src/mndStreamTrans.c index ff31aa0f7d..8137081631 100644 --- a/source/dnode/mnode/impl/src/mndStreamTrans.c +++ b/source/dnode/mnode/impl/src/mndStreamTrans.c @@ -159,7 +159,7 @@ STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, ETrnCo return NULL; } - mInfo("s-task:0x%" PRIx64 " start to build trans %s, transId:%d", pStream->uid, pMsg, pTrans->id); + mInfo("stream:0x%" PRIx64 " start to build trans %s, transId:%d", pStream->uid, pMsg, pTrans->id); mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetSTbName); if (mndTransCheckConflict(pMnode, pTrans) != 0) { From 68aac5dee10c5eded4ee062d60e6ed5cce1fa266 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 11 Jun 2024 11:12:24 +0800 Subject: [PATCH 04/21] other: fix syntax error. --- source/dnode/mnode/impl/src/mndStream.c | 2 +- source/dnode/mnode/impl/src/mndStreamUtil.c | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 7ee34d502a..c1747f9379 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -2569,7 +2569,7 @@ int32_t mndProcessCheckpointReport(SRpcMsg *pReq) { if (pStream != NULL) { bool conflict = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHKPT_UPDATE_NAME, false); if (conflict) { - mDebug("stream:%x%"PRIx64" active checkpoint trans not finished yet, wait", req.streamId); + mDebug("stream:0x%"PRIx64" active checkpoint trans not finished yet, wait", req.streamId); } else { int32_t code = mndCreateStreamChkptInfoUpdateTrans(pMnode, pStream, *pReqTaskList); if (code == TSDB_CODE_SUCCESS) { // remove this entry diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index 695c33f646..f7b33c5e75 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -789,7 +789,7 @@ void scanCheckpointReportInfo(SMnode* pMnode) { pInfo->streamId); } } else { - mDebug("stream:%x%"PRIx64" active checkpoint trans not finished yet, wait", pInfo->streamId); + mDebug("stream:0x%"PRIx64" active checkpoint trans not finished yet, wait", pInfo->streamId); } } else { mDebug("stream:0x%" PRIx64 " %s %d/%d tasks send checkpoint-report, %d not send", pInfo->streamId, pStream->name, From 00eb6218257811d8eab3546100bc7c5a6e71da8d Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 11 Jun 2024 14:09:50 +0800 Subject: [PATCH 05/21] fix(stream): do scan checkpoint-report in write queue. --- include/common/tmsgdef.h | 2 +- source/dnode/mgmt/mgmt_snode/src/smHandle.c | 2 +- source/dnode/mgmt/mgmt_vnode/src/vmHandle.c | 2 +- source/dnode/mnode/impl/inc/mndStream.h | 2 +- source/dnode/mnode/impl/src/mndStream.c | 8 ++++-- source/dnode/mnode/impl/src/mndStreamHb.c | 13 ++++++++- source/dnode/mnode/impl/src/mndStreamUtil.c | 32 +++++++++++++++------ source/libs/stream/src/streamCheckpoint.c | 20 +------------ source/libs/stream/src/streamExec.c | 17 +++++------ 9 files changed, 52 insertions(+), 46 deletions(-) diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index a07830af19..1a25bac0c8 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -207,7 +207,7 @@ TD_DEF_MSG_TYPE(TDMT_MND_RESTORE_DNODE, "restore-dnode", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_PAUSE_STREAM, "pause-stream", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_RESUME_STREAM, "resume-stream", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_MND_STREAM_CHECKPOINT_TIMER, "stream-checkpoint-tmr", NULL, NULL) // unused,reserved + TD_DEF_MSG_TYPE(TDMT_MND_STREAM_UPDATE_CHKPT_EVT, "stream-update-chkpt-evt", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_STREAM_BEGIN_CHECKPOINT, "stream-begin-checkpoint", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_STREAM_CHKPT_REPORT, "stream-chkpt-report", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_STREAM_NODECHANGE_CHECK, "stream-nodechange-check", NULL, NULL) diff --git a/source/dnode/mgmt/mgmt_snode/src/smHandle.c b/source/dnode/mgmt/mgmt_snode/src/smHandle.c index 856d7b2051..6a9f2f1275 100644 --- a/source/dnode/mgmt/mgmt_snode/src/smHandle.c +++ b/source/dnode/mgmt/mgmt_snode/src/smHandle.c @@ -95,7 +95,7 @@ SArray *smGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_GET_STREAM_PROGRESS, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_REPORT_RSP, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_REPORT_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; code = 0; _OVER: diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 41b9352f18..7bc41559c3 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -967,7 +967,7 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_REPORT_RSP, vmPutMsgToWriteQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_REPORT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_GET_STREAM_PROGRESS, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_UPDATE_CHKPT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index 6ca61265bb..9289909b19 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -130,7 +130,7 @@ int32_t mndStreamSetResetTaskAction(SMnode *pMnode, STrans *pTrans, SStreamO int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream); int32_t mndStreamSetUpdateChkptAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); int32_t mndCreateStreamChkptInfoUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SArray *pChkptInfoList); -void scanCheckpointReportInfo(SMnode *pMnode); +int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq); void removeTasksInBuf(SArray *pTaskIds, SStreamExecInfo *pExecInfo); SStreamTaskIter *createStreamTaskIter(SStreamObj *pStream); diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index c1747f9379..ab72996058 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -115,10 +115,11 @@ int32_t mndInitStream(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_VND_STREAM_CHECK_POINT_SOURCE_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_MND_STREAM_BEGIN_CHECKPOINT, mndProcessStreamCheckpoint); - mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CHKPT_REPORT, mndProcessCheckpointReport); mndSetMsgHandle(pMnode, TDMT_MND_STREAM_REQ_CHKPT, mndProcessStreamReqCheckpoint); - mndSetMsgHandle(pMnode, TDMT_MND_STREAM_HEARTBEAT, mndProcessStreamHb); + mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CHKPT_REPORT, mndProcessCheckpointReport); + mndSetMsgHandle(pMnode, TDMT_MND_STREAM_UPDATE_CHKPT_EVT, mndScanCheckpointReportInfo); mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_REPORT_CHECKPOINT, mndTransProcessRsp); + mndSetMsgHandle(pMnode, TDMT_MND_STREAM_HEARTBEAT, mndProcessStreamHb); mndSetMsgHandle(pMnode, TDMT_MND_STREAM_NODECHANGE_CHECK, mndProcessNodeCheckReq); mndSetMsgHandle(pMnode, TDMT_MND_PAUSE_STREAM, mndProcessPauseStreamReq); @@ -1181,7 +1182,8 @@ static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) { int32_t numOfCheckpointTrans = 0; if ((code = mndCheckTaskAndNodeStatus(pMnode)) != 0) { - return code; + terrno = TSDB_CODE_STREAM_TASK_IVLD_STATUS; + return -1; } SArray* pList = taosArrayInit(4, sizeof(SCheckpointInterval)); diff --git a/source/dnode/mnode/impl/src/mndStreamHb.c b/source/dnode/mnode/impl/src/mndStreamHb.c index 7c07d003b7..2cb4111e97 100644 --- a/source/dnode/mnode/impl/src/mndStreamHb.c +++ b/source/dnode/mnode/impl/src/mndStreamHb.c @@ -22,6 +22,8 @@ typedef struct SFailedCheckpointInfo { int32_t transId; } SFailedCheckpointInfo; +static void mndStreamStartUpdateCheckpointInfo(SMnode *pMnode); + static void updateStageInfo(STaskStatusEntry *pTaskEntry, int64_t stage) { int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeList); for (int32_t j = 0; j < numOfNodes; ++j) { @@ -326,7 +328,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { mndDropOrphanTasks(pMnode, pOrphanTasks); } - scanCheckpointReportInfo(pMnode); + mndStreamStartUpdateCheckpointInfo(pMnode); taosThreadMutexUnlock(&execInfo.lock); tCleanupStreamHbMsg(&req); @@ -346,3 +348,12 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { return TSDB_CODE_SUCCESS; } + +void mndStreamStartUpdateCheckpointInfo(SMnode *pMnode) { // here reuse the doCheckpointmsg + SMStreamDoCheckpointMsg *pMsg = rpcMallocCont(sizeof(SMStreamDoCheckpointMsg)); + if (pMsg != NULL) { + int32_t size = sizeof(SMStreamDoCheckpointMsg); + SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_UPDATE_CHKPT_EVT, .pCont = pMsg, .contLen = size}; + tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); + } +} \ No newline at end of file diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index f7b33c5e75..7a45ce1f2a 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -754,8 +754,11 @@ int32_t mndStreamSetUpdateChkptAction(SMnode *pMnode, STrans *pTrans, SStreamObj return 0; } -void scanCheckpointReportInfo(SMnode* pMnode) { - void* pIter = NULL; +int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq) { + SMnode *pMnode = pReq->info.node; + void *pIter = NULL; + SArray *pDropped = taosArrayInit(4, sizeof(int64_t)); + mDebug("start to scan checkpoint report info"); while ((pIter = taosHashIterate(execInfo.pChkptStreams, pIter)) != NULL) { @@ -764,8 +767,8 @@ void scanCheckpointReportInfo(SMnode* pMnode) { STaskChkptInfo* pInfo = taosArrayGet(pList, 0); SStreamObj* pStream = mndGetStreamObj(pMnode, pInfo->streamId); if (pStream == NULL) { - mError("failed to acquire stream:0x%"PRIx64" remove it from checkpoint-report list", pInfo->streamId); - taosHashRemove(execInfo.pChkptStreams, &pInfo->streamId, sizeof(pInfo->streamId)); + mDebug("failed to acquire stream:0x%" PRIx64 " remove it from checkpoint-report list", pInfo->streamId); + taosArrayPush(pDropped, &pInfo->streamId); continue; } @@ -780,14 +783,13 @@ void scanCheckpointReportInfo(SMnode* pMnode) { if (!conflict) { int32_t code = mndCreateStreamChkptInfoUpdateTrans(pMnode, pStream, pList); if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) { // remove this entry - taosHashRemove(execInfo.pChkptStreams, &pInfo->streamId, sizeof(pInfo->streamId)); - - int32_t numOfStreams = taosHashGetSize(execInfo.pChkptStreams); - mDebug("stream:0x%" PRIx64 " removed, remain streams:%d in checkpoint procedure", pInfo->streamId, numOfStreams); + taosArrayPush(pDropped, &pInfo->streamId); + mDebug("stream:0x%" PRIx64 " removed, remain streams:%d in checkpoint procedure", pInfo->streamId); } else { - mDebug("stream:0x%" PRIx64 " not launch chkpt update trans, due to checkpoint not finished yet", + mDebug("stream:0x%" PRIx64 " not launch chkpt-meta update trans, due to checkpoint not finished yet", pInfo->streamId); } + break; } else { mDebug("stream:0x%"PRIx64" active checkpoint trans not finished yet, wait", pInfo->streamId); } @@ -799,4 +801,16 @@ void scanCheckpointReportInfo(SMnode* pMnode) { sdbRelease(pMnode->pSdb, pStream); } + if (taosArrayGetSize(pDropped) > 0) { + for (int32_t i = 0; i < taosArrayGetSize(pDropped); ++i) { + int64_t streamId = *(int64_t *)taosArrayGet(pDropped, i); + taosHashRemove(execInfo.pChkptStreams, &streamId, sizeof(streamId)); + } + + int32_t numOfStreams = taosHashGetSize(execInfo.pChkptStreams); + mDebug("drop %d stream(s) in checkpoint-report list, remain:%d", numOfStreams); + } + + taosArrayDestroy(pDropped); + return TSDB_CODE_SUCCESS; } \ No newline at end of file diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 56ac794d8e..1583734a99 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -40,26 +40,8 @@ static void checkpointTriggerMonitorFn(void* param, void* tmrId); static SStreamDataBlock* createChkptTriggerBlock(SStreamTask* pTask, int32_t checkpointType, int64_t checkpointId, int32_t transId); -bool streamTaskIsAllUpstreamSendTrigger(SStreamTask* pTask) { - SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo; - int32_t numOfUpstreams = taosArrayGetSize(pTask->upstreamInfo.pList); - bool allSend = true; - - taosThreadMutexLock(&pActiveInfo->lock); - int32_t numOfRecv = taosArrayGetSize(pActiveInfo->pReadyMsgList); - - if (numOfRecv < numOfUpstreams) { - stDebug("s-task:%s received checkpoint-trigger block, idx:%d, %d upstream tasks not send yet, total:%d", - pTask->id.idStr, pTask->info.selfChildId, (numOfUpstreams - numOfRecv), numOfUpstreams); - allSend = false; - } - - taosThreadMutexUnlock(&pActiveInfo->lock); - return allSend; -} - SStreamDataBlock* createChkptTriggerBlock(SStreamTask* pTask, int32_t checkpointType, int64_t checkpointId, - int32_t transId) { + int32_t transId) { SStreamDataBlock* pChkpoint = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SSDataBlock)); if (pChkpoint == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 1828409f89..98168abae1 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -615,7 +615,7 @@ void flushStateDataInExecutor(SStreamTask* pTask, SStreamQueueItem* pCheckpointB * appropriate batch of blocks should be handled in 5 to 10 sec. */ static int32_t doStreamExecTask(SStreamTask* pTask) { - const char* id = pTask->id.idStr; + const char* id = pTask->id.idStr; // merge multiple input data if possible in the input queue. stDebug("s-task:%s start to extract data block from inputQ", id); @@ -699,20 +699,15 @@ static int32_t doStreamExecTask(SStreamTask* pTask) { if (type != STREAM_INPUT__CHECKPOINT) { doStreamTaskExecImpl(pTask, pInput); - } - - streamFreeQitem(pInput); - - // todo other thread may change the status + streamFreeQitem(pInput); + } else { // todo other thread may change the status // do nothing after sync executor state to storage backend, untill the vnode-level checkpoint is completed. - if (type == STREAM_INPUT__CHECKPOINT) { - // todo add lock + taosThreadMutexLock(&pTask->lock); SStreamTaskState* pState = streamTaskGetStatus(pTask); if (pState->state == TASK_STATUS__CK) { stDebug("s-task:%s checkpoint block received, set status:%s", id, pState->name); streamTaskBuildCheckpoint(pTask); - } else { - // todo refactor + } else { // todo refactor int32_t code = 0; if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { code = streamTaskSendCheckpointSourceRsp(pTask); @@ -727,6 +722,8 @@ static int32_t doStreamExecTask(SStreamTask* pTask) { } } + taosThreadMutexUnlock(&pTask->lock); + streamFreeQitem(pInput); return 0; } } From 0245d82e54990cd78a9128fe5b99745a941c67f1 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 11 Jun 2024 14:26:29 +0800 Subject: [PATCH 06/21] fix(stream): make sure that the unit test case can work. --- source/dnode/mnode/impl/src/mndStream.c | 2 +- source/dnode/mnode/impl/src/mndStreamHb.c | 4 +++- source/dnode/mnode/impl/src/mndStreamUtil.c | 7 ++++--- 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index ab72996058..2fb0505d86 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -2629,7 +2629,7 @@ static int32_t mndProcessDropStreamReqFromMNode(SRpcMsg *pReq) { } void mndInitStreamExecInfo(SMnode *pMnode, SStreamExecInfo *pExecInfo) { - if (pExecInfo->initTaskList) { + if (pExecInfo->initTaskList || pMnode == NULL) { return; } diff --git a/source/dnode/mnode/impl/src/mndStreamHb.c b/source/dnode/mnode/impl/src/mndStreamHb.c index 2cb4111e97..a79fe0cf0a 100644 --- a/source/dnode/mnode/impl/src/mndStreamHb.c +++ b/source/dnode/mnode/impl/src/mndStreamHb.c @@ -328,7 +328,9 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { mndDropOrphanTasks(pMnode, pOrphanTasks); } - mndStreamStartUpdateCheckpointInfo(pMnode); + if (pMnode != NULL) { // make sure that the unit test case can work + mndStreamStartUpdateCheckpointInfo(pMnode); + } taosThreadMutexUnlock(&execInfo.lock); tCleanupStreamHbMsg(&req); diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index 7a45ce1f2a..915235ab47 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -801,14 +801,15 @@ int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq) { sdbRelease(pMnode->pSdb, pStream); } - if (taosArrayGetSize(pDropped) > 0) { - for (int32_t i = 0; i < taosArrayGetSize(pDropped); ++i) { + int32_t size = taosArrayGetSize(pDropped); + if (size > 0) { + for (int32_t i = 0; i < size; ++i) { int64_t streamId = *(int64_t *)taosArrayGet(pDropped, i); taosHashRemove(execInfo.pChkptStreams, &streamId, sizeof(streamId)); } int32_t numOfStreams = taosHashGetSize(execInfo.pChkptStreams); - mDebug("drop %d stream(s) in checkpoint-report list, remain:%d", numOfStreams); + mDebug("drop %d stream(s) in checkpoint-report list, remain:%d", size, numOfStreams); } taosArrayDestroy(pDropped); From d6e252513e4350fc5571b740e59cf2e327f3b14f Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 11 Jun 2024 14:33:59 +0800 Subject: [PATCH 07/21] fix(stream):fix syntax error. --- source/dnode/mnode/impl/src/mndStreamUtil.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index 915235ab47..031b97ce54 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -784,7 +784,7 @@ int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq) { int32_t code = mndCreateStreamChkptInfoUpdateTrans(pMnode, pStream, pList); if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) { // remove this entry taosArrayPush(pDropped, &pInfo->streamId); - mDebug("stream:0x%" PRIx64 " removed, remain streams:%d in checkpoint procedure", pInfo->streamId); + mDebug("stream:0x%" PRIx64 " removed", pInfo->streamId); } else { mDebug("stream:0x%" PRIx64 " not launch chkpt-meta update trans, due to checkpoint not finished yet", pInfo->streamId); From 3b537367c924fd82d10887c6ca682d7ee3636c8e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 11 Jun 2024 15:21:48 +0800 Subject: [PATCH 08/21] fix(stream): fix memory leak. --- source/dnode/mnode/impl/src/mndStream.c | 1 + source/dnode/mnode/impl/src/mndStreamUtil.c | 1 + 2 files changed, 2 insertions(+) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 2fb0505d86..09189bf593 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -149,6 +149,7 @@ void mndCleanupStream(SMnode *pMnode) { taosHashCleanup(execInfo.pTaskMap); taosHashCleanup(execInfo.transMgmt.pDBTrans); taosHashCleanup(execInfo.pTransferStateStreams); + taosHashCleanup(execInfo.pChkptStreams); taosThreadMutexDestroy(&execInfo.lock); mDebug("mnd stream exec info cleanup"); } diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index 031b97ce54..6745acac34 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -578,6 +578,7 @@ void mndInitExecInfo() { execInfo.pNodeList = taosArrayInit(4, sizeof(SNodeEntry)); taosHashSetFreeFp(execInfo.pTransferStateStreams, freeTaskList); + taosHashSetFreeFp(execInfo.pChkptStreams, freeTaskList); } void removeExpiredNodeInfo(const SArray *pNodeSnapshot) { From 6b1d4b842e78fd0c4473b568e320a3af93d9798a Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Tue, 11 Jun 2024 09:54:53 +0800 Subject: [PATCH 09/21] fix: case when memory leak --- source/libs/nodes/src/nodesCloneFuncs.c | 1 + source/libs/nodes/src/nodesUtilFuncs.c | 2 ++ tests/parallel_test/cases.task | 1 + tests/script/tsim/query/nestedJoinView.sim | 19 +++++++++++++++++++ 4 files changed, 23 insertions(+) create mode 100644 tests/script/tsim/query/nestedJoinView.sim diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index 84d3f734fe..f02fefd977 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -843,6 +843,7 @@ static int32_t selectStmtCopy(const SSelectStmt* pSrc, SSelectStmt* pDst) { CLONE_NODE_FIELD_EX(pLimit, SLimitNode*); COPY_CHAR_ARRAY_FIELD(stmtName); COPY_SCALAR_FIELD(precision); + COPY_SCALAR_FIELD(isSubquery); COPY_SCALAR_FIELD(isEmptyResult); COPY_SCALAR_FIELD(timeLineResMode); COPY_SCALAR_FIELD(timeLineFromOrderBy); diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index beedffc4f2..c081841b3b 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -963,12 +963,14 @@ void nodesDestroyNode(SNode* pNode) { break; case QUERY_NODE_WHEN_THEN: { SWhenThenNode* pWhenThen = (SWhenThenNode*)pNode; + destroyExprNode((SExprNode*)pNode); nodesDestroyNode(pWhenThen->pWhen); nodesDestroyNode(pWhenThen->pThen); break; } case QUERY_NODE_CASE_WHEN: { SCaseWhenNode* pCaseWhen = (SCaseWhenNode*)pNode; + destroyExprNode((SExprNode*)pNode); nodesDestroyNode(pCaseWhen->pCase); nodesDestroyNode(pCaseWhen->pElse); nodesDestroyList(pCaseWhen->pWhenThenList); diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 1eaf17ccb1..307260cf6a 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -1485,6 +1485,7 @@ ,,y,script,./test.sh -f tsim/view/view.sim ,,y,script,./test.sh -f tsim/query/cache_last.sim ,,y,script,./test.sh -f tsim/query/const.sim +,,y,script,./test.sh -f tsim/query/nestedJoinView.sim #develop test diff --git a/tests/script/tsim/query/nestedJoinView.sim b/tests/script/tsim/query/nestedJoinView.sim new file mode 100644 index 0000000000..efdec9fcbc --- /dev/null +++ b/tests/script/tsim/query/nestedJoinView.sim @@ -0,0 +1,19 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/exec.sh -n dnode1 -s start +sql connect + +sql create database test; +sql use test; + +sql CREATE TABLE `resource_info` ( job_id_ts TIMESTAMP , role VARCHAR(20) primary key, start_time TIMESTAMP, ip VARCHAR(15), cpu FLOAT, memory FLOAT, io_write FLOAT, io_read FLOAT, net_write FLOAT, net_read FLOAT) TAGS ( end_time TIMESTAMP); + +sql CREATE STABLE `test_results` ( `job_id_ts` TIMESTAMP , `end_time` VARCHAR(40) PRIMARY KEY, `job_id` BIGINT, `time_cost` FLOAT, `write_speed` FLOAT, `qps` FLOAT, `min_delay` FLOAT, `p90_delay` FLOAT, `p95_delay` FLOAT, `p99_delay` FLOAT, `max_delay` FLOAT, `avg_delay` FLOAT, `hostname` VARCHAR(15), `tdengine_commit_id` VARCHAR(50), `tdinternal_commit_id` VARCHAR(50), `load_type` VARCHAR(50), `cpu` FLOAT, `memory` FLOAT, `io_write` FLOAT, `io_read` FLOAT) TAGS ( `branch` VARCHAR(50), `scenario` VARCHAR(50), `test_case` VARCHAR(1000), `env_id` INT, `type` VARCHAR(50)); + +sql CREATE TABLE `job_info` ( `start_time` TIMESTAMP , `finish_time` TIMESTAMP , `job_id` INT, `job_status` VARCHAR(20), `test_type` VARCHAR(50), `environment` INT, `version` VARCHAR(20), `tdengine_commit_id` VARCHAR(50), `tdinternal_commit_id` VARCHAR(50), `type` VARCHAR(50), `scenario` VARCHAR(50), `note` VARCHAR(500), `version_number` VARCHAR(20)); + +sql create view abc as select * from ( select a.job_id, a.start_time as job_start_time, a.finish_time as job_end_time, a.job_status, a.test_type, a.environment, case when a.version_number <> null then a.version else CONCAT(a.version,'_',a.version_number) end as version_info, a.tdengine_commit_id, a.tdinternal_commit_id, a.type, a.scenario, a.note, a.version_number, b.end_time as tc_end_time, b.time_cost, b.write_speed, b.qps, b.min_delay, b.p90_delay, b.p95_delay, b.p99_delay, b.`max_delay`, b.avg_delay, b.hostname, b.load_type, b.scenario, b.test_case, b.type from job_info a, test_results b where a.start_time=b.job_id_ts and a.job_status='finished') s1 inner join resource_info s2 on s1.job_start_time=s2.job_id_ts and s1.job_id=2 and s1.tc_end_time=s2.end_time; + +sql select * from abc; + +system sh/exec.sh -n dnode1 -s stop -x SIGINT From 781b644a8c66c0121b0866cd1bb3b16695138faa Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 11 Jun 2024 16:34:47 +0800 Subject: [PATCH 10/21] fix(stream):fix memory leak and update test cases. --- source/dnode/mnode/impl/src/mndStream.c | 1 + tests/system-test/2-query/compa4096_tsma.json | 140 ++++++++++-------- tests/system-test/2-query/tsma.py | 11 +- .../system-test/8-stream/stream_multi_agg.py | 2 +- 4 files changed, 84 insertions(+), 70 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 09189bf593..7b00978a62 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1154,6 +1154,7 @@ static int32_t mndCheckTaskAndNodeStatus(SMnode *pMnode) { } removeTasksInBuf(pInvalidList, &execInfo); + taosArrayDestroy(pInvalidList); taosThreadMutexUnlock(&execInfo.lock); return ready ? 0 : -1; diff --git a/tests/system-test/2-query/compa4096_tsma.json b/tests/system-test/2-query/compa4096_tsma.json index 66d98ceebe..993eb8ae58 100644 --- a/tests/system-test/2-query/compa4096_tsma.json +++ b/tests/system-test/2-query/compa4096_tsma.json @@ -1,66 +1,80 @@ { - "filetype": "insert", - "cfgdir": "/etc/taos", - "host": "localhost", - "port": 6030, - "rest_port": 6041, - "user": "root", - "password": "taosdata", - "thread_count": 100, - "create_table_thread_count": 24, - "result_file": "taosBenchmark_result.log", - "confirm_parameter_prompt": "no", - "insert_interval": 0, - "num_of_records_per_req": 1000000, - "max_sql_len": 1024000, - "databases": [ - { - "dbinfo": { - "name": "db4096", - "drop": "yes", - "replica": 1, - "duration": 10, - "precision": "ms", - "keep": 3650, - "comp": 2, - "vgroups": 2, - "buffer": 1000 - }, - "super_tables": [ + "filetype": "insert", + "cfgdir": "/etc/taos", + "host": "localhost", + "port": 6030, + "rest_port": 6041, + "user": "root", + "password": "taosdata", + "thread_count": 100, + "create_table_thread_count": 24, + "result_file": "taosBenchmark_result.log", + "confirm_parameter_prompt": "no", + "insert_interval": 0, + "num_of_records_per_req": 1000000, + "max_sql_len": 1024000, + "databases": [ { - "name": "stb0", - "child_table_exists": "no", - "childtable_count":2, - "childtable_prefix": "ctb0", - "escape_character": "no", - "auto_create_table": "no", - "batch_create_tbl_num": 500, - "data_source": "rand", - "insert_mode": "taosc", - "rollup": null, - "interlace_rows": 0, - "line_protocol": null, - "tcp_transfer": "no", - "insert_rows": 10, - "childtable_limit": 0, - "childtable_offset": 0, - "rows_per_tbl": 0, - "max_sql_len": 1048576, - "disorder_ratio": 0, - "disorder_range": 1000, - "timestamp_step": 1000, - "start_timestamp": "2022-10-22 17:20:36", - "sample_format": "csv", - "sample_file": "./sample.csv", - "tags_file": "", - "columns": [{ "type": "INT","count": 4093}], - "tags": [{"type": "TINYINT", "count": 1},{"type": "NCHAR","count": 1}] + "dbinfo": { + "name": "db4096", + "drop": "yes", + "replica": 1, + "duration": 10, + "precision": "ms", + "keep": 3650, + "comp": 2, + "vgroups": 2, + "buffer": 1000 + }, + "super_tables": [ + { + "name": "stb0", + "child_table_exists": "no", + "childtable_count": 2, + "childtable_prefix": "ctb0", + "escape_character": "no", + "auto_create_table": "no", + "batch_create_tbl_num": 500, + "data_source": "rand", + "insert_mode": "taosc", + "rollup": null, + "interlace_rows": 0, + "line_protocol": null, + "tcp_transfer": "no", + "insert_rows": 10, + "childtable_limit": 0, + "childtable_offset": 0, + "rows_per_tbl": 0, + "max_sql_len": 1048576, + "disorder_ratio": 0, + "disorder_range": 1000, + "timestamp_step": 1000, + "start_timestamp": "2022-10-22 17:20:36", + "sample_format": "csv", + "sample_file": "./sample.csv", + "tags_file": "", + "columns": [ + { + "type": "INT", + "count": 4093 + } + ], + "tags": [ + { + "type": "TINYINT", + "count": 1 + }, + { + "type": "NCHAR", + "count": 1 + } + ] + } + ] } - ] - } - ], - "prepare_rand": 10000, - "chinese": "no", - "streams": false, - "test_log": "/root/testlog/" -} + ], + "prepare_rand": 10000, + "chinese": "no", + "streams": false, + "test_log": "/root/testlog/" +} \ No newline at end of file diff --git a/tests/system-test/2-query/tsma.py b/tests/system-test/2-query/tsma.py index f80aab0e82..d7dc1d24f3 100644 --- a/tests/system-test/2-query/tsma.py +++ b/tests/system-test/2-query/tsma.py @@ -1270,15 +1270,14 @@ class TDTestCase: def test_drop_tsma(self): function_name = sys._getframe().f_code.co_name tdLog.debug(f'-----{function_name}------') - self.create_tsma('tsma1', 'test', 'meters', [ - 'avg(c1)', 'avg(c2)'], '5m') + self.create_tsma('tsma1', 'test', 'meters', ['avg(c1)', 'avg(c2)'], '5m') self.create_recursive_tsma('tsma1', 'tsma2', 'test', '15m', 'meters') # drop recursive tsma first tdSql.error('drop tsma test.tsma1', -2147482491) tdSql.execute('drop tsma test.tsma2', queryTimes=1) tdSql.execute('drop tsma test.tsma1', queryTimes=1) - self.wait_query('show transactions', 0, 10, lambda row: row[3] != 'stream-checkpo') + self.wait_query('show transactions', 0, 10, lambda row: row[3] != 'stream-chkpt-u') tdSql.execute('drop database test', queryTimes=1) self.init_data() @@ -1319,7 +1318,7 @@ class TDTestCase: 'create tsma tsma1 on nsdb.meters function(avg(c1), avg(c2), avg(t3)) interval(5m)', -2147471096) tdSql.execute('alter table nsdb.meters drop tag t3', queryTimes=1) - self.wait_query('show transactions', 0, 10, lambda row: row[3] != 'stream-checkpo') + self.wait_query('show transactions', 0, 10, lambda row: row[3] != 'stream-chkpt-u') tdSql.execute('drop database nsdb') # drop norm table @@ -1346,7 +1345,7 @@ class TDTestCase: # test drop stream tdSql.error('drop stream tsma1', -2147471088) ## TSMA must be dropped first - self.wait_query('show transactions', 0, 10, lambda row: row[3] != 'stream-checkpo') + self.wait_query('show transactions', 0, 10, lambda row: row[3] != 'stream-chkpt-u') tdSql.execute('drop database test', queryTimes=1) self.init_data() @@ -1449,7 +1448,7 @@ class TDTestCase: tdSql.error( 'create tsma tsma1 on test.meters function(avg(c1), avg(c2)) interval(2h)', -2147471097) - self.wait_query('show transactions', 0, 10, lambda row: row[3] != 'stream-checkpo') + self.wait_query('show transactions', 0, 10, lambda row: row[3] != 'stream-chkpt-u') tdSql.execute('drop database nsdb') def test_create_tsma_on_norm_table(self): diff --git a/tests/system-test/8-stream/stream_multi_agg.py b/tests/system-test/8-stream/stream_multi_agg.py index acb80f528b..4e8d965f32 100644 --- a/tests/system-test/8-stream/stream_multi_agg.py +++ b/tests/system-test/8-stream/stream_multi_agg.py @@ -68,7 +68,7 @@ class TDTestCase: # create stream tdSql.execute("use db", queryTimes=100) tdSql.execute("create stream stream1 fill_history 1 into sta as select count(*) as cnt from meters interval(10a);",show=True) - time.sleep(5) + time.sleep(10) sql = "select count(*) from sta" # loop wait max 60s to check count is ok From b22679c9418c1733c01eb036cf28bad05edbebe8 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 11 Jun 2024 17:13:44 +0800 Subject: [PATCH 11/21] fix(stream): do NOT remove the stream if the stream object is not accessiable, since the deploy trans may not completed yet. --- source/dnode/mnode/impl/src/mndStream.c | 14 +------------- 1 file changed, 1 insertion(+), 13 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 7b00978a62..136839451f 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1115,30 +1115,18 @@ static int32_t mndCheckTaskAndNodeStatus(SMnode *pMnode) { continue; } - // for stopped stream task entry, we do additional check for it if (pEntry->status == TASK_STATUS__STOP) { - bool invalid = false; for(int32_t j = 0; j < taosArrayGetSize(pInvalidList); ++j) { STaskId* pId = taosArrayGet(pInvalidList, j); if (pEntry->id.streamId == pId->streamId) { taosArrayPush(pInvalidList, &pEntry->id); - invalid = true; break; } } - - if (!invalid) { - SStreamObj *pObj = mndGetStreamObj(pMnode, pEntry->id.streamId); - if (pObj == NULL) { - taosArrayPush(pInvalidList, &pEntry->id); - } else { - mndReleaseStream(pMnode, pObj); - } - } } if (pEntry->status != TASK_STATUS__READY) { - mDebug("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s not ready, checkpoint msg not issued", + mDebug("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s, checkpoint not issued", pEntry->id.streamId, (int32_t)pEntry->id.taskId, pEntry->nodeId, streamTaskGetStatusStr(pEntry->status)); ready = false; } From 5b0bb82ed046b97972adcaed59bc0820483be78d Mon Sep 17 00:00:00 2001 From: kailixu Date: Tue, 11 Jun 2024 17:46:53 +0800 Subject: [PATCH 12/21] fix: reserve log file name --- source/util/src/tlog.c | 40 +++++++++++++++++++++++++--------------- 1 file changed, 25 insertions(+), 15 deletions(-) diff --git a/source/util/src/tlog.c b/source/util/src/tlog.c index 0c09174970..1dad1d9519 100644 --- a/source/util/src/tlog.c +++ b/source/util/src/tlog.c @@ -267,21 +267,31 @@ static void taosUnLockLogFile(TdFilePtr pFile) { } } -static void taosKeepOldLog(char *oldName) { - if (tsLogKeepDays == 0) return; - - int64_t fileSec = taosGetTimestampSec(); - char fileName[LOG_FILE_NAME_LEN + 20]; - snprintf(fileName, LOG_FILE_NAME_LEN + 20, "%s.%" PRId64, tsLogObj.logName, fileSec); - - (void)taosRenameFile(oldName, fileName); - - char compressFileName[LOG_FILE_NAME_LEN + 20]; - snprintf(compressFileName, LOG_FILE_NAME_LEN + 20, "%s.%" PRId64 ".gz", tsLogObj.logName, fileSec); - if (taosCompressFile(fileName, compressFileName) == 0) { - (void)taosRemoveFile(fileName); +static void taosReserveOldLog(char *oldName, char *keepName) { + if (tsLogKeepDays == 0) { + keepName[0] = 0; + return; } + int32_t code = 0; + int64_t fileSec = taosGetTimestampSec(); + snprintf(keepName, LOG_FILE_NAME_LEN + 20, "%s.%" PRId64, tsLogObj.logName, fileSec); + if ((code = taosRenameFile(oldName, keepName))) { + keepName[0] = 0; + uError("failed to rename file:%s to %s since %s", oldName, keepName, tstrerror(code)); + } +} + +static void taosKeepOldLog(char *oldName) { + if (oldName[0] == '\0') goto _end; + + char compressFileName[LOG_FILE_NAME_LEN + 20]; + snprintf(compressFileName, LOG_FILE_NAME_LEN + 20, "%s.gz", oldName); + if (taosCompressFile(oldName, compressFileName) == 0) { + (void)taosRemoveFile(oldName); + } + +_end: if (tsLogKeepDays > 0) { taosRemoveOldFiles(tsLogDir, tsLogKeepDays); } @@ -316,13 +326,13 @@ static OldFileKeeper *taosOpenNewFile() { tsLogObj.logHandle->pFile = pFile; tsLogObj.lines = 0; tsLogObj.openInProgress = 0; - OldFileKeeper* oldFileKeeper = taosMemoryMalloc(sizeof(OldFileKeeper)); + OldFileKeeper *oldFileKeeper = taosMemoryMalloc(sizeof(OldFileKeeper)); if (oldFileKeeper == NULL) { uError("create old log keep info faild! mem is not enough."); return NULL; } oldFileKeeper->pOldFile = pOldFile; - memcpy(oldFileKeeper->keepName, keepName, LOG_FILE_NAME_LEN + 20); + taosReserveOldLog(keepName, oldFileKeeper->keepName); uInfo(" new log file:%d is opened", tsLogObj.flag); uInfo("=================================="); From 581f5e79d9a539d92f32a2eb5d177beb9bc14a36 Mon Sep 17 00:00:00 2001 From: kailixu Date: Tue, 11 Jun 2024 18:20:00 +0800 Subject: [PATCH 13/21] fix: reserve log file name --- source/util/src/tlog.c | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/source/util/src/tlog.c b/source/util/src/tlog.c index 1dad1d9519..ca809e1d5d 100644 --- a/source/util/src/tlog.c +++ b/source/util/src/tlog.c @@ -66,6 +66,7 @@ typedef struct { int32_t lines; int32_t flag; int32_t openInProgress; + int64_t lastFileSec; pid_t pid; char logName[LOG_FILE_NAME_LEN]; SLogBuff *logHandle; @@ -275,6 +276,11 @@ static void taosReserveOldLog(char *oldName, char *keepName) { int32_t code = 0; int64_t fileSec = taosGetTimestampSec(); + if (tsLogObj.lastFileSec < fileSec) { + tsLogObj.lastFileSec = fileSec; + } else { + fileSec = ++tsLogObj.lastFileSec; + } snprintf(keepName, LOG_FILE_NAME_LEN + 20, "%s.%" PRId64, tsLogObj.logName, fileSec); if ((code = taosRenameFile(oldName, keepName))) { keepName[0] = 0; From f1fa948594b4d85fd2fd89304271961d4aa913d9 Mon Sep 17 00:00:00 2001 From: kailixu Date: Tue, 11 Jun 2024 18:27:46 +0800 Subject: [PATCH 14/21] fix: reserve log file name --- source/util/src/tlog.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/source/util/src/tlog.c b/source/util/src/tlog.c index ca809e1d5d..e2a5d39888 100644 --- a/source/util/src/tlog.c +++ b/source/util/src/tlog.c @@ -66,7 +66,7 @@ typedef struct { int32_t lines; int32_t flag; int32_t openInProgress; - int64_t lastFileSec; + int64_t lastKeepFileSec; pid_t pid; char logName[LOG_FILE_NAME_LEN]; SLogBuff *logHandle; @@ -276,10 +276,10 @@ static void taosReserveOldLog(char *oldName, char *keepName) { int32_t code = 0; int64_t fileSec = taosGetTimestampSec(); - if (tsLogObj.lastFileSec < fileSec) { - tsLogObj.lastFileSec = fileSec; + if (tsLogObj.lastKeepFileSec < fileSec) { + tsLogObj.lastKeepFileSec = fileSec; } else { - fileSec = ++tsLogObj.lastFileSec; + fileSec = ++tsLogObj.lastKeepFileSec; } snprintf(keepName, LOG_FILE_NAME_LEN + 20, "%s.%" PRId64, tsLogObj.logName, fileSec); if ((code = taosRenameFile(oldName, keepName))) { From a41a04dc37d4445f69ed3be983818e17c7666944 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 11 Jun 2024 18:35:59 +0800 Subject: [PATCH 15/21] fix(stream): remove the related history task correctly. --- include/common/tmsg.h | 2 -- include/libs/stream/tstream.h | 3 +++ source/dnode/vnode/src/tqCommon/tqCommon.c | 9 +++++---- source/libs/stream/src/streamCheckpoint.c | 12 +++++++----- source/libs/stream/src/streamMeta.c | 11 +++++++++++ 5 files changed, 26 insertions(+), 11 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 98ca414466..387df52f16 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -3473,8 +3473,6 @@ typedef struct SVUpdateCheckpointInfoReq { int64_t checkpointTs; int32_t transId; int8_t dropRelHTask; - int64_t hStreamId; - int64_t hTaskId; } SVUpdateCheckpointInfoReq; typedef struct { diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 1996c2ed63..bcf081dbfb 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -734,6 +734,9 @@ int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32 void streamMetaAddFailedTaskSelf(SStreamTask* pTask, int64_t failedTs); void streamMetaAddIntoUpdateTaskList(SStreamMeta* pMeta, SStreamTask* pTask, SStreamTask* pHTask, int32_t transId, int64_t startTs); +void streamMetaClearUpdateTaskList(SStreamMeta* pMeta); +void streamMetaInitUpdateTaskList(SStreamMeta* pMeta, int32_t transId); + void streamMetaRLock(SStreamMeta* pMeta); void streamMetaRUnLock(SStreamMeta* pMeta); void streamMetaWLock(SStreamMeta* pMeta); diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 7c8ad159bc..44aef4a91e 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -185,7 +185,7 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM SStreamTask* pTask = *ppTask; const char* idstr = pTask->id.idStr; - if (pMeta->updateInfo.transId != req.transId) { + if ((pMeta->updateInfo.transId != req.transId) && (pMeta->updateInfo.transId != -1)) { if (req.transId < pMeta->updateInfo.transId) { tqError("s-task:%s vgId:%d disorder update nodeEp msg recv, discarded, newest transId:%d, recv:%d", idstr, vgId, pMeta->updateInfo.transId, req.transId); @@ -197,13 +197,12 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM } else { tqInfo("s-task:%s vgId:%d receive new trans to update nodeEp msg from mnode, transId:%d, prev transId:%d", idstr, vgId, req.transId, pMeta->updateInfo.transId); - // info needs to be kept till the new trans to update the nodeEp arrived. - taosHashClear(pMeta->updateInfo.pTasks); - pMeta->updateInfo.transId = req.transId; + streamMetaInitUpdateTaskList(pMeta, req.transId); } } else { tqDebug("s-task:%s vgId:%d recv trans to update nodeEp from mnode, transId:%d", idstr, vgId, req.transId); + streamMetaInitUpdateTaskList(pMeta, req.transId); } // duplicate update epset msg received, discard this redundant message @@ -280,6 +279,8 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM // persist to disk } + streamMetaClearUpdateTaskList(pMeta); + if (!restored) { tqDebug("vgId:%d vnode restore not completed, not start the tasks, clear the start after nodeUpdate flag", vgId); pMeta->startInfo.tasksWillRestart = 0; diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 1583734a99..6696c9f8c2 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -418,6 +418,7 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, SVUpdateCheckpoin int32_t code = 0; const char* id = pTask->id.idStr; SCheckpointInfo* pInfo = &pTask->chkInfo; + STaskId hTaskId = {0}; taosThreadMutexLock(&pTask->lock); @@ -433,12 +434,11 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, SVUpdateCheckpoin // drop task should not in the meta-lock, and drop the related fill-history task now streamMetaWUnLock(pMeta); if (pReq->dropRelHTask) { - streamMetaUnregisterTask(pMeta, pReq->hStreamId, pReq->hTaskId); + streamMetaUnregisterTask(pMeta, pTask->hTaskInfo.id.streamId, pTask->hTaskInfo.id.taskId); int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); stDebug("s-task:%s vgId:%d related fill-history task:0x%x dropped in update checkpointInfo, remain tasks:%d", id, vgId, pReq->taskId, numOfTasks); } - streamMetaWLock(pMeta); } @@ -476,8 +476,9 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, SVUpdateCheckpoin } if (pReq->dropRelHTask) { + hTaskId = pTask->hTaskInfo.id; stDebug("s-task:0x%x vgId:%d drop the related fill-history task:0x%" PRIx64 " after update checkpoint", - pReq->taskId, vgId, pReq->hTaskId); + pReq->taskId, vgId, hTaskId.taskId); CLEAR_RELATED_FILLHISTORY_TASK(pTask); } @@ -498,9 +499,10 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, SVUpdateCheckpoin // drop task should not in the meta-lock, and drop the related fill-history task now if (pReq->dropRelHTask) { - streamMetaUnregisterTask(pMeta, pReq->hStreamId, pReq->hTaskId); + streamMetaUnregisterTask(pMeta, hTaskId.streamId, hTaskId.taskId); int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); - stDebug("s-task:%s vgId:%d related fill-history task:0x%x dropped, remain tasks:%d", id, vgId, (int32_t) pReq->hTaskId, numOfTasks); + stDebug("s-task:%s vgId:%d related fill-history task:0x%x dropped, remain tasks:%d", id, vgId, + (int32_t)hTaskId.taskId, numOfTasks); } streamMetaWLock(pMeta); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index f6449829a3..da1fec5565 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -372,6 +372,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF pMeta->expandFunc = expandFunc; pMeta->stage = stage; pMeta->role = (vgId == SNODE_HANDLE) ? NODE_ROLE_LEADER : NODE_ROLE_UNINIT; + pMeta->updateInfo.transId = -1; pMeta->startInfo.completeFn = fn; pMeta->pTaskDbUnique = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); @@ -1740,4 +1741,14 @@ void streamMetaAddIntoUpdateTaskList(SStreamMeta* pMeta, SStreamTask* pTask, SSt stDebug("s-task:%s vgId:%d transId:%d task nodeEp update completed, streamTask closed, elapsed time:%" PRId64 "ms", id, vgId, transId, el); } +} + +void streamMetaClearUpdateTaskList(SStreamMeta* pMeta) { + taosHashClear(pMeta->updateInfo.pTasks); + pMeta->updateInfo.transId = -1; +} + +void streamMetaInitUpdateTaskList(SStreamMeta* pMeta, int32_t transId) { + taosHashClear(pMeta->updateInfo.pTasks); + pMeta->updateInfo.transId = transId; } \ No newline at end of file From 9406be9aa681738d78117efb4c47da05f2939b31 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 11 Jun 2024 18:58:16 +0800 Subject: [PATCH 16/21] fix(stream): remove invalid reset. --- source/dnode/vnode/src/tqCommon/tqCommon.c | 1 - 1 file changed, 1 deletion(-) diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 44aef4a91e..cabccfc0c8 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -202,7 +202,6 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM } } else { tqDebug("s-task:%s vgId:%d recv trans to update nodeEp from mnode, transId:%d", idstr, vgId, req.transId); - streamMetaInitUpdateTaskList(pMeta, req.transId); } // duplicate update epset msg received, discard this redundant message From 90e54d0603d970e738b79a48f7d31b39f1a2f01b Mon Sep 17 00:00:00 2001 From: factosea <285808407@qq.com> Date: Tue, 11 Jun 2024 19:38:31 +0800 Subject: [PATCH 17/21] test case: mem overflow --- source/dnode/mnode/impl/test/func/func.cpp | 27 ++++++++++++++-------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/source/dnode/mnode/impl/test/func/func.cpp b/source/dnode/mnode/impl/test/func/func.cpp index ee60556639..0fc903f5db 100644 --- a/source/dnode/mnode/impl/test/func/func.cpp +++ b/source/dnode/mnode/impl/test/func/func.cpp @@ -169,7 +169,8 @@ TEST_F(MndTestFunc, 03_Retrieve_Func) { SRetrieveFuncReq retrieveReq = {0}; retrieveReq.numOfFuncs = 1; retrieveReq.pFuncNames = taosArrayInit(1, TSDB_FUNC_NAME_LEN); - taosArrayPush(retrieveReq.pFuncNames, "f1"); + char name[TSDB_FUNC_NAME_LEN] = "f1"; + taosArrayPush(retrieveReq.pFuncNames, name); int32_t contLen = tSerializeSRetrieveFuncReq(NULL, 0, &retrieveReq); void* pReq = rpcMallocCont(contLen); @@ -220,7 +221,8 @@ TEST_F(MndTestFunc, 03_Retrieve_Func) { retrieveReq.numOfFuncs = TSDB_FUNC_MAX_RETRIEVE + 1; retrieveReq.pFuncNames = taosArrayInit(TSDB_FUNC_MAX_RETRIEVE + 1, TSDB_FUNC_NAME_LEN); for (int32_t i = 0; i < TSDB_FUNC_MAX_RETRIEVE + 1; ++i) { - taosArrayPush(retrieveReq.pFuncNames, "1"); + char name[TSDB_FUNC_NAME_LEN] = "1"; + taosArrayPush(retrieveReq.pFuncNames, name); } int32_t contLen = tSerializeSRetrieveFuncReq(NULL, 0, &retrieveReq); @@ -237,7 +239,8 @@ TEST_F(MndTestFunc, 03_Retrieve_Func) { SRetrieveFuncReq retrieveReq = {0}; retrieveReq.numOfFuncs = 1; retrieveReq.pFuncNames = taosArrayInit(1, TSDB_FUNC_NAME_LEN); - taosArrayPush(retrieveReq.pFuncNames, "f2"); + char name[TSDB_FUNC_NAME_LEN] = "f2"; + taosArrayPush(retrieveReq.pFuncNames, name); int32_t contLen = tSerializeSRetrieveFuncReq(NULL, 0, &retrieveReq); void* pReq = rpcMallocCont(contLen); @@ -279,7 +282,8 @@ TEST_F(MndTestFunc, 03_Retrieve_Func) { SRetrieveFuncReq retrieveReq = {0}; retrieveReq.numOfFuncs = 1; retrieveReq.pFuncNames = taosArrayInit(1, TSDB_FUNC_NAME_LEN); - taosArrayPush(retrieveReq.pFuncNames, "f2"); + char name[TSDB_FUNC_NAME_LEN] = "f2"; + taosArrayPush(retrieveReq.pFuncNames, name); int32_t contLen = tSerializeSRetrieveFuncReq(NULL, 0, &retrieveReq); void* pReq = rpcMallocCont(contLen); @@ -316,8 +320,10 @@ TEST_F(MndTestFunc, 03_Retrieve_Func) { SRetrieveFuncReq retrieveReq = {0}; retrieveReq.numOfFuncs = 2; retrieveReq.pFuncNames = taosArrayInit(1, TSDB_FUNC_NAME_LEN); - taosArrayPush(retrieveReq.pFuncNames, "f2"); - taosArrayPush(retrieveReq.pFuncNames, "f1"); + char name1[TSDB_FUNC_NAME_LEN] = "f2"; + taosArrayPush(retrieveReq.pFuncNames, name1); + char name2[TSDB_FUNC_NAME_LEN] = "f1"; + taosArrayPush(retrieveReq.pFuncNames, name2); int32_t contLen = tSerializeSRetrieveFuncReq(NULL, 0, &retrieveReq); void* pReq = rpcMallocCont(contLen); @@ -367,8 +373,10 @@ TEST_F(MndTestFunc, 03_Retrieve_Func) { SRetrieveFuncReq retrieveReq = {0}; retrieveReq.numOfFuncs = 2; retrieveReq.pFuncNames = taosArrayInit(1, TSDB_FUNC_NAME_LEN); - taosArrayPush(retrieveReq.pFuncNames, "f2"); - taosArrayPush(retrieveReq.pFuncNames, "f3"); + char name1[TSDB_FUNC_NAME_LEN] = "f2"; + taosArrayPush(retrieveReq.pFuncNames, name1); + char name2[TSDB_FUNC_NAME_LEN] = "f3"; + taosArrayPush(retrieveReq.pFuncNames, name2); int32_t contLen = tSerializeSRetrieveFuncReq(NULL, 0, &retrieveReq); void* pReq = rpcMallocCont(contLen); @@ -483,7 +491,8 @@ TEST_F(MndTestFunc, 05_Actual_code) { SRetrieveFuncReq retrieveReq = {0}; retrieveReq.numOfFuncs = 1; retrieveReq.pFuncNames = taosArrayInit(1, TSDB_FUNC_NAME_LEN); - taosArrayPush(retrieveReq.pFuncNames, "udf1"); + char name[TSDB_FUNC_NAME_LEN] = "udf1"; + taosArrayPush(retrieveReq.pFuncNames, name); int32_t contLen = tSerializeSRetrieveFuncReq(NULL, 0, &retrieveReq); void* pReq = rpcMallocCont(contLen); From 80eb88d6ac9c7eebaa2a82ce017e4c591fb713d8 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 11 Jun 2024 19:54:38 +0800 Subject: [PATCH 18/21] fix(stream): wait for more 5 sec. --- tests/system-test/8-stream/stream_multi_agg.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/system-test/8-stream/stream_multi_agg.py b/tests/system-test/8-stream/stream_multi_agg.py index 4e8d965f32..32c2648b46 100644 --- a/tests/system-test/8-stream/stream_multi_agg.py +++ b/tests/system-test/8-stream/stream_multi_agg.py @@ -42,7 +42,7 @@ class TDTestCase: tdSql.execute("use test", queryTimes=100) tdSql.query("create stream if not exists s1 trigger at_once ignore expired 0 ignore update 0 fill_history 1 into st1 as select _wstart,sum(voltage),groupid from meters partition by groupid interval(2s)") tdLog.debug("========create stream and insert data ok========") - time.sleep(15) + time.sleep(20) tdSql.query("select _wstart,sum(voltage),groupid from meters partition by groupid interval(2s) order by groupid,_wstart") rowCnt = tdSql.getRows() From db44febadf551982d6c9c54096d5150e331f94ab Mon Sep 17 00:00:00 2001 From: kailixu Date: Wed, 12 Jun 2024 08:27:46 +0800 Subject: [PATCH 19/21] fix: reserve log file name --- source/util/src/tlog.c | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/source/util/src/tlog.c b/source/util/src/tlog.c index e2a5d39888..8023541af3 100644 --- a/source/util/src/tlog.c +++ b/source/util/src/tlog.c @@ -289,15 +289,14 @@ static void taosReserveOldLog(char *oldName, char *keepName) { } static void taosKeepOldLog(char *oldName) { - if (oldName[0] == '\0') goto _end; - - char compressFileName[LOG_FILE_NAME_LEN + 20]; - snprintf(compressFileName, LOG_FILE_NAME_LEN + 20, "%s.gz", oldName); - if (taosCompressFile(oldName, compressFileName) == 0) { - (void)taosRemoveFile(oldName); + if (oldName[0] != 0) { + char compressFileName[LOG_FILE_NAME_LEN + 20]; + snprintf(compressFileName, LOG_FILE_NAME_LEN + 20, "%s.gz", oldName); + if (taosCompressFile(oldName, compressFileName) == 0) { + (void)taosRemoveFile(oldName); + } } -_end: if (tsLogKeepDays > 0) { taosRemoveOldFiles(tsLogDir, tsLogKeepDays); } From dc2497791a89d99b6efaf657f635bec3c707aa4b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 12 Jun 2024 10:11:55 +0800 Subject: [PATCH 20/21] fix(stream): accept invalid vgroup id error code when handling the update checkpoint info. --- source/dnode/mnode/impl/inc/mndStream.h | 2 +- source/dnode/mnode/impl/src/mndStream.c | 43 ++++++++++---------- source/dnode/mnode/impl/src/mndStreamTrans.c | 12 +++--- source/dnode/mnode/impl/src/mndStreamUtil.c | 14 +++---- source/os/src/osTimer.c | 5 ++- 5 files changed, 40 insertions(+), 36 deletions(-) diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index 9289909b19..dade7b4bdf 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -110,7 +110,7 @@ int32_t mndGetNumOfStreamTasks(const SStreamObj *pStream); SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady); void mndKillTransImpl(SMnode *pMnode, int32_t transId, const char *pDbName); int32_t setTransAction(STrans *pTrans, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset, - int32_t retryCode); + int32_t retryCode, int32_t acceptCode); STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, ETrnConflct conflict, const char *name, const char *pMsg); int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans, int32_t status); SSdbRaw *mndStreamActionEncode(SStreamObj *pStream); diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 136839451f..c9598c4b38 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -515,7 +515,7 @@ int32_t mndPersistTaskDeployReq(STrans *pTrans, SStreamTask *pTask) { tEncodeStreamTask(&encoder, pTask); tEncoderClear(&encoder); - int32_t code = setTransAction(pTrans, buf, tlen, TDMT_STREAM_TASK_DEPLOY, &pTask->info.epSet, 0); + int32_t code = setTransAction(pTrans, buf, tlen, TDMT_STREAM_TASK_DEPLOY, &pTask->info.epSet, 0, 0); if (code != 0) { taosMemoryFree(buf); return -1; @@ -959,7 +959,7 @@ static int32_t doSetCheckpointAction(SMnode *pMnode, STrans *pTrans, SStreamTask return -1; } - code = setTransAction(pTrans, buf, tlen, TDMT_VND_STREAM_CHECK_POINT_SOURCE, &epset, TSDB_CODE_SYN_PROPOSE_NOT_READY); + code = setTransAction(pTrans, buf, tlen, TDMT_VND_STREAM_CHECK_POINT_SOURCE, &epset, TSDB_CODE_SYN_PROPOSE_NOT_READY, 0); if (code != 0) { taosMemoryFree(buf); } @@ -2554,27 +2554,28 @@ int32_t mndProcessCheckpointReport(SRpcMsg *pReq) { int32_t total = taosArrayGetSize(*pReqTaskList); if (total == numOfTasks) { // all tasks has send the reqs - mInfo("stream:0x%" PRIx64 - " %s all %d tasks send checkpoint-report, start to update checkpoint meta-info for checkpointId:%" PRId64, + mInfo("stream:0x%" PRIx64 " %s all %d tasks send checkpoint-report, checkpoint meta-info for checkpointId:%" PRId64 + " will be issued soon", req.streamId, pStream->name, total, req.checkpointId); - if (pStream != NULL) { - bool conflict = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHKPT_UPDATE_NAME, false); - if (conflict) { - mDebug("stream:0x%"PRIx64" active checkpoint trans not finished yet, wait", req.streamId); - } else { - int32_t code = mndCreateStreamChkptInfoUpdateTrans(pMnode, pStream, *pReqTaskList); - if (code == TSDB_CODE_SUCCESS) { // remove this entry - taosHashRemove(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId)); - - int32_t numOfStreams = taosHashGetSize(execInfo.pChkptStreams); - mDebug("stream:0x%" PRIx64 " removed, remain streams:%d in checkpoint procedure", req.streamId, numOfStreams); - } else { - mDebug("stream:0x%" PRIx64 " not launch chkpt update trans, due to checkpoint not finished yet", - req.streamId); - } - } - } + // if (pStream != NULL) { + // bool conflict = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHKPT_UPDATE_NAME, false); + // if (conflict) { + // mDebug("stream:0x%"PRIx64" active checkpoint trans not finished yet, wait", req.streamId); + // } else { + // int32_t code = mndCreateStreamChkptInfoUpdateTrans(pMnode, pStream, *pReqTaskList); + // if (code == TSDB_CODE_SUCCESS) { // remove this entry + // taosHashRemove(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId)); + // + // int32_t numOfStreams = taosHashGetSize(execInfo.pChkptStreams); + // mDebug("stream:0x%" PRIx64 " removed, remain streams:%d in checkpoint procedure", req.streamId, + // numOfStreams); + // } else { + // mDebug("stream:0x%" PRIx64 " not launch chkpt update trans, due to checkpoint not finished yet", + // req.streamId); + // } + // } + // } } if (pStream != NULL) { diff --git a/source/dnode/mnode/impl/src/mndStreamTrans.c b/source/dnode/mnode/impl/src/mndStreamTrans.c index 8137081631..0daa383d3e 100644 --- a/source/dnode/mnode/impl/src/mndStreamTrans.c +++ b/source/dnode/mnode/impl/src/mndStreamTrans.c @@ -127,7 +127,7 @@ bool mndStreamTransConflictCheck(SMnode* pMnode, int64_t streamId, const char* p return false; } -int32_t mndStreamGetRelTrans(SMnode* pMnode, int64_t streamUid) { +int32_t mndStreamGetRelTrans(SMnode* pMnode, int64_t streamId) { taosThreadMutexLock(&execInfo.lock); int32_t num = taosHashGetSize(execInfo.transMgmt.pDBTrans); if (num <= 0) { @@ -136,12 +136,13 @@ int32_t mndStreamGetRelTrans(SMnode* pMnode, int64_t streamUid) { } mndStreamClearFinishedTrans(pMnode, NULL); - SStreamTransInfo* pEntry = taosHashGet(execInfo.transMgmt.pDBTrans, &streamUid, sizeof(streamUid)); + SStreamTransInfo* pEntry = taosHashGet(execInfo.transMgmt.pDBTrans, &streamId, sizeof(streamId)); if (pEntry != NULL) { SStreamTransInfo tInfo = *pEntry; taosThreadMutexUnlock(&execInfo.lock); - if (strcmp(tInfo.name, MND_STREAM_CHECKPOINT_NAME) == 0 || strcmp(tInfo.name, MND_STREAM_TASK_UPDATE_NAME) == 0) { + if (strcmp(tInfo.name, MND_STREAM_CHECKPOINT_NAME) == 0 || strcmp(tInfo.name, MND_STREAM_TASK_UPDATE_NAME) == 0 || + strcmp(tInfo.name, MND_STREAM_CHKPT_UPDATE_NAME) == 0) { return tInfo.transId; } } else { @@ -246,8 +247,9 @@ int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans, int32_t status) } int32_t setTransAction(STrans *pTrans, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset, - int32_t retryCode) { - STransAction action = {.epSet = *pEpset, .contLen = contLen, .pCont = pCont, .msgType = msgType, .retryCode = retryCode}; + int32_t retryCode, int32_t acceptCode) { + STransAction action = {.epSet = *pEpset, .contLen = contLen, .pCont = pCont, .msgType = msgType, .retryCode = retryCode, + .acceptableCode = acceptCode}; return mndTransAppendRedoAction(pTrans, &action); } diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index 6745acac34..a0731833e6 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -230,7 +230,7 @@ static int32_t doSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamTask *pT return -1; } - code = setTransAction(pTrans, pReq, sizeof(SVResumeStreamTaskReq), TDMT_STREAM_TASK_RESUME, &epset, 0); + code = setTransAction(pTrans, pReq, sizeof(SVResumeStreamTaskReq), TDMT_STREAM_TASK_RESUME, &epset, 0, 0); if (code != 0) { taosMemoryFree(pReq); return -1; @@ -308,7 +308,7 @@ static int32_t doSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTa epsetToStr(&epset, buf, tListLen(buf)); mDebug("pause stream task in node:%d, epset:%s", pTask->info.nodeId, buf); - code = setTransAction(pTrans, pReq, sizeof(SVPauseStreamTaskReq), TDMT_STREAM_TASK_PAUSE, &epset, 0); + code = setTransAction(pTrans, pReq, sizeof(SVPauseStreamTaskReq), TDMT_STREAM_TASK_PAUSE, &epset, 0, 0); if (code != 0) { taosMemoryFree(pReq); return -1; @@ -356,7 +356,7 @@ static int32_t doSetDropAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTas } // The epset of nodeId of this task may have been expired now, let's use the newest epset from mnode. - code = setTransAction(pTrans, pReq, sizeof(SVDropStreamTaskReq), TDMT_STREAM_TASK_DROP, &epset, 0); + code = setTransAction(pTrans, pReq, sizeof(SVDropStreamTaskReq), TDMT_STREAM_TASK_DROP, &epset, 0, 0); if (code != 0) { taosMemoryFree(pReq); return -1; @@ -400,7 +400,7 @@ static int32_t doSetDropActionFromId(SMnode *pMnode, STrans *pTrans, SOrphanTask } // The epset of nodeId of this task may have been expired now, let's use the newest epset from mnode. - code = setTransAction(pTrans, pReq, sizeof(SVDropStreamTaskReq), TDMT_STREAM_TASK_DROP, &epset, 0); + code = setTransAction(pTrans, pReq, sizeof(SVDropStreamTaskReq), TDMT_STREAM_TASK_DROP, &epset, 0, 0); if (code != 0) { taosMemoryFree(pReq); return -1; @@ -484,7 +484,7 @@ static int32_t doSetUpdateTaskAction(SMnode *pMnode, STrans *pTrans, SStreamTask return code; } - code = setTransAction(pTrans, pBuf, len, TDMT_VND_STREAM_TASK_UPDATE, &epset, TSDB_CODE_VND_INVALID_VGROUP_ID); + code = setTransAction(pTrans, pBuf, len, TDMT_VND_STREAM_TASK_UPDATE, &epset, TSDB_CODE_VND_INVALID_VGROUP_ID, 0); if (code != TSDB_CODE_SUCCESS) { taosMemoryFree(pBuf); } @@ -534,7 +534,7 @@ static int32_t doSetResetAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTa return code; } - code = setTransAction(pTrans, pReq, sizeof(SVResetStreamTaskReq), TDMT_VND_STREAM_TASK_RESET, &epset, 0); + code = setTransAction(pTrans, pReq, sizeof(SVResetStreamTaskReq), TDMT_VND_STREAM_TASK_RESET, &epset, 0, 0); if (code != TSDB_CODE_SUCCESS) { taosMemoryFree(pReq); } @@ -727,7 +727,7 @@ static int32_t doSetUpdateChkptAction(SMnode *pMnode, STrans *pTrans, SStreamTas return code; } - code = setTransAction(pTrans, pReq, sizeof(SVUpdateCheckpointInfoReq), TDMT_STREAM_TASK_UPDATE_CHKPT, &epset, 0); + code = setTransAction(pTrans, pReq, sizeof(SVUpdateCheckpointInfoReq), TDMT_STREAM_TASK_UPDATE_CHKPT, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID); if (code != TSDB_CODE_SUCCESS) { taosMemoryFree(pReq); } diff --git a/source/os/src/osTimer.c b/source/os/src/osTimer.c index b8eb54e10e..36d364382a 100644 --- a/source/os/src/osTimer.c +++ b/source/os/src/osTimer.c @@ -86,8 +86,9 @@ static void taosDeleteTimer(void *tharg) { static TdThread timerThread; static timer_t timerId; static volatile bool stopTimer = false; -static void *taosProcessAlarmSignal(void *tharg) { - // Block the signal + +static void *taosProcessAlarmSignal(void *tharg) { + // Block the signal sigset_t sigset; sigemptyset(&sigset); sigaddset(&sigset, SIGALRM); From fa05b2c8b634c359726d6f20e7460b4cd60418df Mon Sep 17 00:00:00 2001 From: kailixu Date: Wed, 12 Jun 2024 11:17:08 +0800 Subject: [PATCH 21/21] fix: reserve log file name --- source/util/src/tlog.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/util/src/tlog.c b/source/util/src/tlog.c index 8023541af3..b224eac8c2 100644 --- a/source/util/src/tlog.c +++ b/source/util/src/tlog.c @@ -269,7 +269,7 @@ static void taosUnLockLogFile(TdFilePtr pFile) { } static void taosReserveOldLog(char *oldName, char *keepName) { - if (tsLogKeepDays == 0) { + if (tsLogKeepDays <= 0) { keepName[0] = 0; return; }