From 5e130c4d538e9b27b48217b3bef31650b0daf33b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 9 Jan 2025 09:17:08 +0800 Subject: [PATCH 1/7] enh(stream): add ctrl queue to handle hb rsp, to avoid hb rsp not being confirmed if all stream threads are occupied. --- include/common/tmsgcb.h | 1 + source/dnode/mgmt/mgmt_vnode/inc/vmInt.h | 3 ++ source/dnode/mgmt/mgmt_vnode/src/vmHandle.c | 2 +- source/dnode/mgmt/mgmt_vnode/src/vmInt.c | 4 ++ source/dnode/mgmt/mgmt_vnode/src/vmWorker.c | 49 ++++++++++++++++++++- source/dnode/vnode/inc/vnode.h | 1 + source/dnode/vnode/src/vnd/vnodeSvr.c | 22 +++++++-- 7 files changed, 77 insertions(+), 5 deletions(-) diff --git a/include/common/tmsgcb.h b/include/common/tmsgcb.h index 6752287ed1..c934cb6961 100644 --- a/include/common/tmsgcb.h +++ b/include/common/tmsgcb.h @@ -37,6 +37,7 @@ typedef enum { SYNC_RD_QUEUE, STREAM_QUEUE, ARB_QUEUE, + STREAM_CTRL_QUEUE, QUEUE_MAX, } EQueueType; diff --git a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h index 164be636a9..84f5149624 100644 --- a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h +++ b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h @@ -32,6 +32,7 @@ typedef struct SVnodeMgmt { const char *name; SQueryAutoQWorkerPool queryPool; SAutoQWorkerPool streamPool; + SWWorkerPool streamCtrlPool; SWWorkerPool fetchPool; SSingleWorker mgmtWorker; SSingleWorker mgmtMultiWorker; @@ -73,6 +74,7 @@ typedef struct { SMultiWorker pApplyW; STaosQueue *pQueryQ; STaosQueue *pStreamQ; + STaosQueue *pStreamCtrlQ; STaosQueue *pFetchQ; STaosQueue *pMultiMgmQ; } SVnodeObj; @@ -134,6 +136,7 @@ int32_t vmPutMsgToSyncRdQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmPutMsgToQueryQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmPutMsgToFetchQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmPutMsgToStreamQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); +int32_t vmPutMsgToStreamCtrlQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmPutMsgToMergeQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmPutMsgToMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmPutMsgToMultiMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 0f0dbfb3f1..927a679056 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -1022,7 +1022,7 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_TRIGGER_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_REPORT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_GET_STREAM_PROGRESS, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c index 6d5e117181..4400d7fac0 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c @@ -395,9 +395,13 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal, while (!taosQueueEmpty(pVnode->pQueryQ)) taosMsleep(10); tqNotifyClose(pVnode->pImpl->pTq); + dInfo("vgId:%d, wait for vnode stream queue:%p is empty", pVnode->vgId, pVnode->pStreamQ); while (!taosQueueEmpty(pVnode->pStreamQ)) taosMsleep(10); + dInfo("vgId:%d, wait for vnode stream ctrl queue:%p is empty", pVnode->vgId, pVnode->pStreamCtrlQ); + while (!taosQueueEmpty(pVnode->pStreamCtrlQ)) taosMsleep(10); + dInfo("vgId:%d, all vnode queues is empty", pVnode->vgId); dInfo("vgId:%d, post close", pVnode->vgId); diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index 334c213945..14b4b0ec93 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -137,6 +137,34 @@ static void vmProcessStreamQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { taosFreeQitem(pMsg); } +static void vmProcessStreamCtrlQueue(SQueueInfo *pInfo, STaosQall* pQall, int32_t numOfItems) { + SVnodeObj *pVnode = pInfo->ahandle; + void *pItem = NULL; + int32_t code = 0; + + while (1) { + if (taosGetQitem(pQall, &pItem) == 0) { + break; + } + + SRpcMsg *pMsg = pItem; + const STraceId *trace = &pMsg->info.traceId; + + dGTrace("vgId:%d, msg:%p get from vnode-ctrl-stream queue", pVnode->vgId, pMsg); + code = vnodeProcessStreamCtrlMsg(pVnode->pImpl, pMsg, pInfo); + if (code != 0) { + terrno = code; + dGError("vgId:%d, msg:%p failed to process stream ctrl msg %s since %s", pVnode->vgId, pMsg, + TMSG_INFO(pMsg->msgType), tstrerror(code)); + vmSendRsp(pMsg, code); + } + + dGTrace("vgId:%d, msg:%p is freed, code:0x%x", pVnode->vgId, pMsg, code); + rpcFreeCont(pMsg->pCont); + taosFreeQitem(pMsg); + } +} + static void vmProcessFetchQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { SVnodeObj *pVnode = pInfo->ahandle; SRpcMsg *pMsg = NULL; @@ -245,6 +273,10 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp dGTrace("vgId:%d, msg:%p put into vnode-stream queue", pVnode->vgId, pMsg); code = taosWriteQitem(pVnode->pStreamQ, pMsg); break; + case STREAM_CTRL_QUEUE: + dGTrace("vgId:%d, msg:%p put into vnode-ctrl-stream queue", pVnode->vgId, pMsg); + code = taosWriteQitem(pVnode->pStreamCtrlQ, pMsg); + break; case FETCH_QUEUE: dGTrace("vgId:%d, msg:%p put into vnode-fetch queue", pVnode->vgId, pMsg); code = taosWriteQitem(pVnode->pFetchQ, pMsg); @@ -301,6 +333,8 @@ int32_t vmPutMsgToFetchQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsg int32_t vmPutMsgToStreamQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsgToQueue(pMgmt, pMsg, STREAM_QUEUE); } +int32_t vmPutMsgToStreamCtrlQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsgToQueue(pMgmt, pMsg, STREAM_CTRL_QUEUE); } + int32_t vmPutMsgToMultiMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { const STraceId *trace = &pMsg->info.traceId; dGTrace("msg:%p, put into vnode-multi-mgmt queue", pMsg); @@ -373,6 +407,8 @@ int32_t vmGetQueueSize(SVnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype) { case STREAM_QUEUE: size = taosQueueItemSize(pVnode->pStreamQ); break; + case STREAM_CTRL_QUEUE: + size = taosQueueItemSize(pVnode->pStreamCtrlQ); default: break; } @@ -417,9 +453,11 @@ int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { pVnode->pQueryQ = tQueryAutoQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FItem)vmProcessQueryQueue); pVnode->pStreamQ = tAutoQWorkerAllocQueue(&pMgmt->streamPool, pVnode, (FItem)vmProcessStreamQueue); pVnode->pFetchQ = tWWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FItems)vmProcessFetchQueue); + pVnode->pStreamCtrlQ = tWWorkerAllocQueue(&pMgmt->streamCtrlPool, pVnode, (FItems)vmProcessStreamCtrlQueue); if (pVnode->pWriteW.queue == NULL || pVnode->pSyncW.queue == NULL || pVnode->pSyncRdW.queue == NULL || - pVnode->pApplyW.queue == NULL || pVnode->pQueryQ == NULL || pVnode->pStreamQ == NULL || pVnode->pFetchQ == NULL) { + pVnode->pApplyW.queue == NULL || pVnode->pQueryQ == NULL || pVnode->pStreamQ == NULL || pVnode->pFetchQ == NULL + || pVnode->pStreamCtrlQ == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } @@ -435,15 +473,19 @@ int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { dInfo("vgId:%d, fetch-queue:%p is alloced, thread:%08" PRId64, pVnode->vgId, pVnode->pFetchQ, taosQueueGetThreadId(pVnode->pFetchQ)); dInfo("vgId:%d, stream-queue:%p is alloced", pVnode->vgId, pVnode->pStreamQ); + dInfo("vgId:%d, stream-ctrl-queue:%p is alloced, thread:%08" PRId64, pVnode->vgId, pVnode->pStreamCtrlQ, + taosQueueGetThreadId(pVnode->pStreamCtrlQ)); return 0; } void vmFreeQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { tQueryAutoQWorkerFreeQueue(&pMgmt->queryPool, pVnode->pQueryQ); tAutoQWorkerFreeQueue(&pMgmt->streamPool, pVnode->pStreamQ); + tWWorkerFreeQueue(&pMgmt->streamCtrlPool, pVnode->pStreamCtrlQ); tWWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ); pVnode->pQueryQ = NULL; pVnode->pStreamQ = NULL; + pVnode->pStreamCtrlQ = NULL; pVnode->pFetchQ = NULL; dDebug("vgId:%d, queue is freed", pVnode->vgId); } @@ -463,6 +505,11 @@ int32_t vmStartWorker(SVnodeMgmt *pMgmt) { pStreamPool->ratio = tsRatioOfVnodeStreamThreads; if ((code = tAutoQWorkerInit(pStreamPool)) != 0) return code; + SWWorkerPool *pStreamCtrlPool = &pMgmt->streamCtrlPool; + pStreamCtrlPool->name = "vnode-ctrl-stream"; + pStreamCtrlPool->max = 1; + if ((code = tWWorkerInit(pStreamCtrlPool)) != 0) return code; + SWWorkerPool *pFPool = &pMgmt->fetchPool; pFPool->name = "vnode-fetch"; pFPool->max = tsNumOfVnodeFetchThreads; diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 475f8c9814..50d75c4838 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -112,6 +112,7 @@ int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo); int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo); int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo); +int32_t vnodeProcessStreamCtrlMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo); void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs); void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs); void vnodeProposeCommitOnNeed(SVnode *pVnode, bool atExit); diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index c7b0caf286..b8b58ceae5 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -930,7 +930,7 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { } int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { - vTrace("vgId:%d, msg:%p in fetch queue is processing", pVnode->config.vgId, pMsg); + vTrace("vgId:%d, msg:%p in stream queue is processing", pVnode->config.vgId, pMsg); if ((pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_VND_TABLE_META || pMsg->msgType == TDMT_VND_TABLE_CFG || pMsg->msgType == TDMT_VND_BATCH_META) && !syncIsReadyForRead(pVnode->sync)) { @@ -963,8 +963,6 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) return tqProcessTaskRetrieveTriggerReq(pVnode->pTq, pMsg); case TDMT_STREAM_RETRIEVE_TRIGGER_RSP: return tqProcessTaskRetrieveTriggerRsp(pVnode->pTq, pMsg); - case TDMT_MND_STREAM_HEARTBEAT_RSP: - return tqProcessStreamHbRsp(pVnode->pTq, pMsg); case TDMT_MND_STREAM_REQ_CHKPT_RSP: return tqProcessStreamReqCheckpointRsp(pVnode->pTq, pMsg); case TDMT_VND_GET_STREAM_PROGRESS: @@ -977,6 +975,24 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) } } +int32_t vnodeProcessStreamCtrlMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { + vTrace("vgId:%d, msg:%p in stream ctrl queue is processing", pVnode->config.vgId, pMsg); + if ((pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_VND_TABLE_META || pMsg->msgType == TDMT_VND_TABLE_CFG || + pMsg->msgType == TDMT_VND_BATCH_META) && + !syncIsReadyForRead(pVnode->sync)) { + vnodeRedirectRpcMsg(pVnode, pMsg, terrno); + return 0; + } + + switch (pMsg->msgType) { + case TDMT_MND_STREAM_HEARTBEAT_RSP: + return tqProcessStreamHbRsp(pVnode->pTq, pMsg); + default: + vError("unknown msg type:%d in stream ctrl queue", pMsg->msgType); + return TSDB_CODE_APP_ERROR; + } +} + void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) { int32_t code = tdProcessTSmaInsert(((SVnode *)pVnode)->pSma, smaId, (const char *)data); if (code) { From fa2229c45320760d6b67a6ab59cf071756f446bb Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 25 Jan 2025 18:50:17 +0800 Subject: [PATCH 2/7] refactor(stream): dispatch rsp handled in ctrl queue. --- source/dnode/mgmt/mgmt_vnode/src/vmHandle.c | 2 +- source/dnode/vnode/src/vnd/vnodeSvr.c | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 927a679056..3829b3204c 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -1007,7 +1007,7 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DROP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RUN, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index b8b58ceae5..078ab6a876 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -943,8 +943,6 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) return tqProcessTaskRunReq(pVnode->pTq, pMsg); case TDMT_STREAM_TASK_DISPATCH: return tqProcessTaskDispatchReq(pVnode->pTq, pMsg); - case TDMT_STREAM_TASK_DISPATCH_RSP: - return tqProcessTaskDispatchRsp(pVnode->pTq, pMsg); case TDMT_VND_STREAM_TASK_CHECK: return tqProcessTaskCheckReq(pVnode->pTq, pMsg); case TDMT_VND_STREAM_TASK_CHECK_RSP: @@ -987,6 +985,8 @@ int32_t vnodeProcessStreamCtrlMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pIn switch (pMsg->msgType) { case TDMT_MND_STREAM_HEARTBEAT_RSP: return tqProcessStreamHbRsp(pVnode->pTq, pMsg); + case TDMT_STREAM_TASK_DISPATCH_RSP: + return tqProcessTaskDispatchRsp(pVnode->pTq, pMsg); default: vError("unknown msg type:%d in stream ctrl queue", pMsg->msgType); return TSDB_CODE_APP_ERROR; From 0f04f473377e3cf4826cdeb21192983aecc4f0b6 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 25 Jan 2025 19:03:31 +0800 Subject: [PATCH 3/7] refactor(stream): dispatch req handled in ctrl queue. --- source/dnode/mgmt/mgmt_vnode/src/vmHandle.c | 2 +- source/dnode/vnode/src/vnd/vnodeSvr.c | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 3829b3204c..9d0e553b40 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -1006,7 +1006,7 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DROP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RUN, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 078ab6a876..f5a028f8c3 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -941,8 +941,6 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) switch (pMsg->msgType) { case TDMT_STREAM_TASK_RUN: return tqProcessTaskRunReq(pVnode->pTq, pMsg); - case TDMT_STREAM_TASK_DISPATCH: - return tqProcessTaskDispatchReq(pVnode->pTq, pMsg); case TDMT_VND_STREAM_TASK_CHECK: return tqProcessTaskCheckReq(pVnode->pTq, pMsg); case TDMT_VND_STREAM_TASK_CHECK_RSP: @@ -985,6 +983,8 @@ int32_t vnodeProcessStreamCtrlMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pIn switch (pMsg->msgType) { case TDMT_MND_STREAM_HEARTBEAT_RSP: return tqProcessStreamHbRsp(pVnode->pTq, pMsg); + case TDMT_STREAM_TASK_DISPATCH: + return tqProcessTaskDispatchReq(pVnode->pTq, pMsg); case TDMT_STREAM_TASK_DISPATCH_RSP: return tqProcessTaskDispatchRsp(pVnode->pTq, pMsg); default: From a5cdf3320c84cd8c95375c19f993687ddc3e4e2f Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 26 Jan 2025 01:00:22 +0800 Subject: [PATCH 4/7] refactor(stream): handle stream check in ctrl queue. --- source/dnode/mgmt/mgmt_vnode/src/vmHandle.c | 4 ++-- source/dnode/vnode/src/vnd/vnodeSvr.c | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 9d0e553b40..3677fc5616 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -1010,8 +1010,8 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_STOP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index f5a028f8c3..43b3d8676d 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -941,10 +941,6 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) switch (pMsg->msgType) { case TDMT_STREAM_TASK_RUN: return tqProcessTaskRunReq(pVnode->pTq, pMsg); - case TDMT_VND_STREAM_TASK_CHECK: - return tqProcessTaskCheckReq(pVnode->pTq, pMsg); - case TDMT_VND_STREAM_TASK_CHECK_RSP: - return tqProcessTaskCheckRsp(pVnode->pTq, pMsg); case TDMT_STREAM_RETRIEVE: return tqProcessTaskRetrieveReq(pVnode->pTq, pMsg); case TDMT_STREAM_RETRIEVE_RSP: @@ -987,6 +983,10 @@ int32_t vnodeProcessStreamCtrlMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pIn return tqProcessTaskDispatchReq(pVnode->pTq, pMsg); case TDMT_STREAM_TASK_DISPATCH_RSP: return tqProcessTaskDispatchRsp(pVnode->pTq, pMsg); + case TDMT_VND_STREAM_TASK_CHECK: + return tqProcessTaskCheckReq(pVnode->pTq, pMsg); + case TDMT_VND_STREAM_TASK_CHECK_RSP: + return tqProcessTaskCheckRsp(pVnode->pTq, pMsg); default: vError("unknown msg type:%d in stream ctrl queue", pMsg->msgType); return TSDB_CODE_APP_ERROR; From 68faa300af68e73fe52d20ff7c59cfb0132219ed Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 7 Feb 2025 11:11:22 +0800 Subject: [PATCH 5/7] fix(stream): fix memory leak. --- source/dnode/mgmt/mgmt_vnode/src/vmWorker.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index 14b4b0ec93..b398bdf242 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -541,6 +541,7 @@ int32_t vmStartWorker(SVnodeMgmt *pMgmt) { void vmStopWorker(SVnodeMgmt *pMgmt) { tQueryAutoQWorkerCleanup(&pMgmt->queryPool); tAutoQWorkerCleanup(&pMgmt->streamPool); + tWWorkerCleanup(&pMgmt->streamCtrlPool); tWWorkerCleanup(&pMgmt->fetchPool); dDebug("vnode workers are closed"); } From 3ff56696aebcd4512c8fb5cec8a7e4e85ab2ac53 Mon Sep 17 00:00:00 2001 From: chenhaoran Date: Fri, 7 Feb 2025 11:40:56 +0800 Subject: [PATCH 6/7] fix: exit if branch checkout fails --- cmake/taosws_CMakeLists.txt.in | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/cmake/taosws_CMakeLists.txt.in b/cmake/taosws_CMakeLists.txt.in index b013d45911..dc32dead52 100644 --- a/cmake/taosws_CMakeLists.txt.in +++ b/cmake/taosws_CMakeLists.txt.in @@ -2,7 +2,7 @@ # taosws-rs ExternalProject_Add(taosws-rs GIT_REPOSITORY https://github.com/taosdata/taos-connector-rust.git - GIT_TAG main + GIT_TAG 3.0 SOURCE_DIR "${TD_SOURCE_DIR}/tools/taosws-rs" BINARY_DIR "" #BUILD_IN_SOURCE TRUE @@ -10,4 +10,13 @@ ExternalProject_Add(taosws-rs BUILD_COMMAND "" INSTALL_COMMAND "" TEST_COMMAND "" + # Fix: Exit if branch checkout fails + UPDATE_COMMAND " + cd ${TD_SOURCE_DIR}/tools/taosws-rs && + git checkout -f 3.0 + if [ $(git rev-parse --abbrev-ref HEAD) != '3.0' ]; then + echo 'Failed to switch to branch 3.0 in ${TD_SOURCE_DIR}/tools/taosws-rs'; + exit 1; + fi + " ) From 4edd569591f834b89d69fa157a700bda2646c035 Mon Sep 17 00:00:00 2001 From: chenhaoran Date: Fri, 7 Feb 2025 11:54:34 +0800 Subject: [PATCH 7/7] Revert "fix: exit if branch checkout fails" This reverts commit 3ff56696aebcd4512c8fb5cec8a7e4e85ab2ac53. --- cmake/taosws_CMakeLists.txt.in | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/cmake/taosws_CMakeLists.txt.in b/cmake/taosws_CMakeLists.txt.in index dc32dead52..b013d45911 100644 --- a/cmake/taosws_CMakeLists.txt.in +++ b/cmake/taosws_CMakeLists.txt.in @@ -2,7 +2,7 @@ # taosws-rs ExternalProject_Add(taosws-rs GIT_REPOSITORY https://github.com/taosdata/taos-connector-rust.git - GIT_TAG 3.0 + GIT_TAG main SOURCE_DIR "${TD_SOURCE_DIR}/tools/taosws-rs" BINARY_DIR "" #BUILD_IN_SOURCE TRUE @@ -10,13 +10,4 @@ ExternalProject_Add(taosws-rs BUILD_COMMAND "" INSTALL_COMMAND "" TEST_COMMAND "" - # Fix: Exit if branch checkout fails - UPDATE_COMMAND " - cd ${TD_SOURCE_DIR}/tools/taosws-rs && - git checkout -f 3.0 - if [ $(git rev-parse --abbrev-ref HEAD) != '3.0' ]; then - echo 'Failed to switch to branch 3.0 in ${TD_SOURCE_DIR}/tools/taosws-rs'; - exit 1; - fi - " )