From b27b635c104808c259fe7316d71f1b0380e24496 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 21 Feb 2024 13:55:06 +0800 Subject: [PATCH 1/8] fix(stream): allow hb message rsp. --- source/libs/stream/src/streamMeta.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index ab519e2b4b..b35f401cb9 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -1160,7 +1160,7 @@ static int32_t metaHeartbeatToMnodeImpl(SStreamMeta* pMeta) { } tEncoderClear(&encoder); - SRpcMsg msg = {.info.noResp = 1}; + SRpcMsg msg = {0}; initRpcMsg(&msg, TDMT_MND_STREAM_HEARTBEAT, buf, tlen); pMeta->pHbInfo->hbCount += 1; From b361eb4a7c6ac93a5efd0a7bdf619d9c271b58ac Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 21 Feb 2024 15:16:07 +0800 Subject: [PATCH 2/8] fix(stream): add hb rsp handle function for vnode and snode. --- include/dnode/vnode/tqCommon.h | 1 + source/dnode/mgmt/mgmt_snode/src/smHandle.c | 1 + source/dnode/mgmt/mgmt_vnode/src/vmHandle.c | 1 + source/dnode/snode/src/snode.c | 2 ++ source/dnode/vnode/src/inc/vnodeInt.h | 1 + source/dnode/vnode/src/tq/tq.c | 5 +++++ source/dnode/vnode/src/tqCommon/tqCommon.c | 6 +++++- source/dnode/vnode/src/vnd/vnodeSvr.c | 2 ++ 8 files changed, 18 insertions(+), 1 deletion(-) diff --git a/include/dnode/vnode/tqCommon.h b/include/dnode/vnode/tqCommon.h index fc9b88340f..84d0dd4982 100644 --- a/include/dnode/vnode/tqCommon.h +++ b/include/dnode/vnode/tqCommon.h @@ -26,6 +26,7 @@ int32_t tqStreamTaskProcessRetrieveReq(SStreamMeta* pMeta, SRpcMsg* pMsg); int32_t tqStreamTaskProcessCheckReq(SStreamMeta* pMeta, SRpcMsg* pMsg); int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader); int32_t tqStreamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg); +int32_t tqStreamProcessStreamHbRsp(SStreamMeta* pMeta, SRpcMsg* pMsg); int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sversion, char* msg, int32_t msgLen, bool isLeader, bool restored); int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen); diff --git a/source/dnode/mgmt/mgmt_snode/src/smHandle.c b/source/dnode/mgmt/mgmt_snode/src/smHandle.c index 1488df3cb1..8c726f9f03 100644 --- a/source/dnode/mgmt/mgmt_snode/src/smHandle.c +++ b/source/dnode/mgmt/mgmt_snode/src/smHandle.c @@ -88,6 +88,7 @@ SArray *smGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; code = 0; _OVER: diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index a2f0b7aced..4213541351 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -837,6 +837,7 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 9a017e7074..34623c021b 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -178,6 +178,8 @@ int32_t sndProcessStreamMsg(SSnode *pSnode, SRpcMsg *pMsg) { return tqStreamTaskProcessCheckRsp(pSnode->pMeta, pMsg, true); case TDMT_STREAM_TASK_CHECKPOINT_READY: return tqStreamTaskProcessCheckpointReadyMsg(pSnode->pMeta, pMsg); + case TDMT_MND_STREAM_HEARTBEAT_RSP: + return tqStreamProcessStreamHbRsp(pSnode->pMeta, pMsg); default: sndError("invalid snode msg:%d", pMsg->msgType); ASSERT(0); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index fb6a86843c..b83af84086 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -242,6 +242,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) int32_t tqProcessTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg); +int32_t tqProcessStreamHbRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver); int32_t tqScanWal(STQ* pTq); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 6b7420b55f..a3fd19c7f8 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1220,3 +1220,8 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg) { return tqStreamTaskProcessTaskResetReq(pTq->pStreamMeta, pMsg); } + +// this function is needed, do not try to remove it. +int32_t tqProcessStreamHbRsp(STQ* pTq, SRpcMsg* pMsg) { + return tqStreamProcessStreamHbRsp(pTq->pStreamMeta, pMsg); +} diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 3b7fe3cfdd..c8a4f2a288 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -936,4 +936,8 @@ int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* m int32_t tqStreamTasksGetTotalNum(SStreamMeta* pMeta) { return taosArrayGetSize(pMeta->pTaskList); -} \ No newline at end of file +} + +int32_t tqStreamProcessStreamHbRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { + return TSDB_CODE_SUCCESS; +} diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index e4d7b11176..86355cdbb0 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -796,6 +796,8 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) return tqProcessTaskScanHistory(pVnode->pTq, pMsg); case TDMT_STREAM_TASK_CHECKPOINT_READY: return tqProcessTaskCheckpointReadyMsg(pVnode->pTq, pMsg); + case TDMT_MND_STREAM_HEARTBEAT_RSP: + return tqProcessStreamHbRsp(pVnode->pTq, pMsg); default: vError("unknown msg type:%d in stream queue", pMsg->msgType); return TSDB_CODE_APP_ERROR; From 8740c3cb4a215a2aeeebca4a137624f8c1d05ca9 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 21 Feb 2024 16:17:06 +0800 Subject: [PATCH 3/8] fix(stream): send hb rsp explicitly. --- source/dnode/mnode/impl/src/mndStream.c | 2 +- source/dnode/mnode/impl/src/mndStreamHb.c | 22 ++++++++++++++++++---- 2 files changed, 19 insertions(+), 5 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 4e0c76f6de..e9a8820c0c 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -29,7 +29,7 @@ #define MND_STREAM_MAX_NUM 60 -typedef struct SMStreamNodeCheckMsg { +typedef struct { int8_t placeHolder; // // to fix windows compile error, define place holder } SMStreamNodeCheckMsg; diff --git a/source/dnode/mnode/impl/src/mndStreamHb.c b/source/dnode/mnode/impl/src/mndStreamHb.c index 5de442951c..e94daf6c93 100644 --- a/source/dnode/mnode/impl/src/mndStreamHb.c +++ b/source/dnode/mnode/impl/src/mndStreamHb.c @@ -16,6 +16,10 @@ #include "mndStream.h" #include "mndTrans.h" +typedef struct { + int8_t placeholder; // placeholder +} SMStreamHbRspMsg; + typedef struct SFailedCheckpointInfo { int64_t streamUid; int64_t checkpointId; @@ -222,11 +226,11 @@ int32_t suspendAllStreams(SMnode *pMnode, SRpcHandleInfo* info){ int32_t mndProcessStreamHb(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; SStreamHbMsg req = {0}; - SArray *pFailedTasks = taosArrayInit(4, sizeof(SFailedCheckpointInfo)); - SArray *pOrphanTasks = taosArrayInit(3, sizeof(SOrphanTask)); + SArray *pFailedTasks = NULL; + SArray *pOrphanTasks = NULL; - if(grantCheckExpire(TSDB_GRANT_STREAMS) < 0){ - if(suspendAllStreams(pMnode, &pReq->info) < 0){ + if (grantCheckExpire(TSDB_GRANT_STREAMS) < 0) { + if (suspendAllStreams(pMnode, &pReq->info) < 0) { return -1; } } @@ -244,6 +248,9 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { mTrace("receive stream-meta hb from vgId:%d, active numOfTasks:%d", req.vgId, req.numOfTasks); + pFailedTasks = taosArrayInit(4, sizeof(SFailedCheckpointInfo)); + pOrphanTasks = taosArrayInit(3, sizeof(SOrphanTask)); + taosThreadMutexLock(&execInfo.lock); // extract stream task list @@ -349,5 +356,12 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { taosArrayDestroy(pFailedTasks); taosArrayDestroy(pOrphanTasks); + { + pReq->info.handle = NULL; // disable auto rsp + SRpcMsg rsp = {.code = 0, .info = pReq->info, .contLen = sizeof(SMStreamHbRspMsg)}; + rsp.pCont = rpcMallocCont(rsp.contLen); + tmsgSendRsp(&rsp); + } + return TSDB_CODE_SUCCESS; } From 9739b9feb6a6fd6f827ecf455d0a31763a3bc08d Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Wed, 21 Feb 2024 16:22:51 +0800 Subject: [PATCH 4/8] set group id of delete range --- source/libs/executor/src/streamcountwindowoperator.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/executor/src/streamcountwindowoperator.c b/source/libs/executor/src/streamcountwindowoperator.c index 706b4c5a01..080f9d4e2b 100644 --- a/source/libs/executor/src/streamcountwindowoperator.c +++ b/source/libs/executor/src/streamcountwindowoperator.c @@ -164,7 +164,7 @@ void getCountWinRange(SStreamAggSupporter* pAggSup, const SSessionKey* pKey, ESt } else { pCur = pAggSup->stateStore.streamStateSessionSeekKeyCurrentNext(pAggSup->pState, pKey); } - SSessionKey tmpKey = {0}; + SSessionKey tmpKey = {.groupId = pKey->groupId, .win.ekey = INT64_MIN, .win.skey = INT64_MIN}; int32_t code = pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, &tmpKey, NULL, 0); if (code != TSDB_CODE_SUCCESS) { pAggSup->stateStore.streamStateFreeCur(pCur); From 6f8a54f27b155e5421c703e5630f1ed47b289588 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 21 Feb 2024 16:27:52 +0800 Subject: [PATCH 5/8] fix(stream): free dummy rsp msg. --- source/dnode/vnode/src/tqCommon/tqCommon.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index c8a4f2a288..4e49091da4 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -939,5 +939,8 @@ int32_t tqStreamTasksGetTotalNum(SStreamMeta* pMeta) { } int32_t tqStreamProcessStreamHbRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { + rpcFreeCont(pMsg->pCont); + pMsg->pCont = NULL; + return TSDB_CODE_SUCCESS; } From 4440243b1766057949dd7581fa5e9f0093c34a0b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 21 Feb 2024 18:17:31 +0800 Subject: [PATCH 6/8] fix(stream): clear the handle after send msg. --- source/dnode/mnode/impl/src/mndStreamHb.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/source/dnode/mnode/impl/src/mndStreamHb.c b/source/dnode/mnode/impl/src/mndStreamHb.c index e94daf6c93..f9727d4fd7 100644 --- a/source/dnode/mnode/impl/src/mndStreamHb.c +++ b/source/dnode/mnode/impl/src/mndStreamHb.c @@ -357,10 +357,11 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { taosArrayDestroy(pOrphanTasks); { - pReq->info.handle = NULL; // disable auto rsp SRpcMsg rsp = {.code = 0, .info = pReq->info, .contLen = sizeof(SMStreamHbRspMsg)}; rsp.pCont = rpcMallocCont(rsp.contLen); tmsgSendRsp(&rsp); + + pReq->info.handle = NULL; // disable auto rsp } return TSDB_CODE_SUCCESS; From bad373038709dcd295fbc3ff86b7fa715de6f530 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 21 Feb 2024 18:25:35 +0800 Subject: [PATCH 7/8] fix(stream): set the vgId in hb rsp msg. --- source/dnode/mnode/impl/src/mndStreamHb.c | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/source/dnode/mnode/impl/src/mndStreamHb.c b/source/dnode/mnode/impl/src/mndStreamHb.c index f9727d4fd7..fe8a287eaf 100644 --- a/source/dnode/mnode/impl/src/mndStreamHb.c +++ b/source/dnode/mnode/impl/src/mndStreamHb.c @@ -17,7 +17,7 @@ #include "mndTrans.h" typedef struct { - int8_t placeholder; // placeholder + SMsgHead head; } SMStreamHbRspMsg; typedef struct SFailedCheckpointInfo { @@ -359,6 +359,9 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { { SRpcMsg rsp = {.code = 0, .info = pReq->info, .contLen = sizeof(SMStreamHbRspMsg)}; rsp.pCont = rpcMallocCont(rsp.contLen); + SMsgHead* pHead = rsp.pCont; + pHead->vgId = htonl(req.vgId); + tmsgSendRsp(&rsp); pReq->info.handle = NULL; // disable auto rsp From b629c27691f3e0a8350076064d7e7901ae1183e0 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 21 Feb 2024 10:53:21 +0000 Subject: [PATCH 8/8] fix invalid read --- source/libs/transport/src/transCli.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index e2b69dd145..2370efa460 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -961,7 +961,7 @@ static void cliSendCb(uv_write_t* req, int status) { tTrace("%s conn %p send cost:%dus ", CONN_GET_INST_LABEL(pConn), pConn, (int)cost); } } - if (pMsg->msg.contLen == 0 && pMsg->msg.pCont != 0) { + if (pMsg != NULL && pMsg->msg.contLen == 0 && pMsg->msg.pCont != 0) { rpcFreeCont(pMsg->msg.pCont); pMsg->msg.pCont = 0; }