From d34422824214af1aedcbc72f0d361488b5ae280f Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 8 Jan 2025 19:11:41 +0800 Subject: [PATCH 1/8] enh(stream): reduce the in-queue size to be 10MiB. --- source/libs/stream/inc/streamInt.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 7af64c041d..bb0b5ee89b 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -38,7 +38,7 @@ extern "C" { #define META_HB_SEND_IDLE_COUNTER 25 // send hb every 5 sec #define STREAM_TASK_KEY_LEN ((sizeof(int64_t)) << 1) #define STREAM_TASK_QUEUE_CAPACITY 5120 -#define STREAM_TASK_QUEUE_CAPACITY_IN_SIZE (30) +#define STREAM_TASK_QUEUE_CAPACITY_IN_SIZE (10) // clang-format off #define stFatal(...) do { if (stDebugFlag & DEBUG_FATAL) { taosPrintLog("STM FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0) From 30039bb53b771b7d0b2a6a7ee98666762f20ad61 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 9 Jan 2025 09:17:08 +0800 Subject: [PATCH 2/8] enh(stream): add ctrl queue to handle hb rsp, to avoid hb rsp not being confirmed if all stream threads are occupied. --- include/common/tmsgcb.h | 1 + source/dnode/mgmt/mgmt_vnode/inc/vmInt.h | 3 ++ source/dnode/mgmt/mgmt_vnode/src/vmHandle.c | 2 +- source/dnode/mgmt/mgmt_vnode/src/vmInt.c | 4 ++ source/dnode/mgmt/mgmt_vnode/src/vmWorker.c | 49 ++++++++++++++++++++- source/dnode/vnode/inc/vnode.h | 1 + source/dnode/vnode/src/vnd/vnodeSvr.c | 22 +++++++-- 7 files changed, 77 insertions(+), 5 deletions(-) diff --git a/include/common/tmsgcb.h b/include/common/tmsgcb.h index 6752287ed1..c934cb6961 100644 --- a/include/common/tmsgcb.h +++ b/include/common/tmsgcb.h @@ -37,6 +37,7 @@ typedef enum { SYNC_RD_QUEUE, STREAM_QUEUE, ARB_QUEUE, + STREAM_CTRL_QUEUE, QUEUE_MAX, } EQueueType; diff --git a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h index 164be636a9..84f5149624 100644 --- a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h +++ b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h @@ -32,6 +32,7 @@ typedef struct SVnodeMgmt { const char *name; SQueryAutoQWorkerPool queryPool; SAutoQWorkerPool streamPool; + SWWorkerPool streamCtrlPool; SWWorkerPool fetchPool; SSingleWorker mgmtWorker; SSingleWorker mgmtMultiWorker; @@ -73,6 +74,7 @@ typedef struct { SMultiWorker pApplyW; STaosQueue *pQueryQ; STaosQueue *pStreamQ; + STaosQueue *pStreamCtrlQ; STaosQueue *pFetchQ; STaosQueue *pMultiMgmQ; } SVnodeObj; @@ -134,6 +136,7 @@ int32_t vmPutMsgToSyncRdQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmPutMsgToQueryQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmPutMsgToFetchQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmPutMsgToStreamQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); +int32_t vmPutMsgToStreamCtrlQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmPutMsgToMergeQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmPutMsgToMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmPutMsgToMultiMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 0f0dbfb3f1..927a679056 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -1022,7 +1022,7 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_TRIGGER_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_REPORT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_GET_STREAM_PROGRESS, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c index 6d5e117181..4400d7fac0 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c @@ -395,9 +395,13 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal, while (!taosQueueEmpty(pVnode->pQueryQ)) taosMsleep(10); tqNotifyClose(pVnode->pImpl->pTq); + dInfo("vgId:%d, wait for vnode stream queue:%p is empty", pVnode->vgId, pVnode->pStreamQ); while (!taosQueueEmpty(pVnode->pStreamQ)) taosMsleep(10); + dInfo("vgId:%d, wait for vnode stream ctrl queue:%p is empty", pVnode->vgId, pVnode->pStreamCtrlQ); + while (!taosQueueEmpty(pVnode->pStreamCtrlQ)) taosMsleep(10); + dInfo("vgId:%d, all vnode queues is empty", pVnode->vgId); dInfo("vgId:%d, post close", pVnode->vgId); diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index c22adec9b4..4b278d533d 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -137,6 +137,34 @@ static void vmProcessStreamQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { taosFreeQitem(pMsg); } +static void vmProcessStreamCtrlQueue(SQueueInfo *pInfo, STaosQall* pQall, int32_t numOfItems) { + SVnodeObj *pVnode = pInfo->ahandle; + void *pItem = NULL; + int32_t code = 0; + + while (1) { + if (taosGetQitem(pQall, &pItem) == 0) { + break; + } + + SRpcMsg *pMsg = pItem; + const STraceId *trace = &pMsg->info.traceId; + + dGTrace("vgId:%d, msg:%p get from vnode-ctrl-stream queue", pVnode->vgId, pMsg); + code = vnodeProcessStreamCtrlMsg(pVnode->pImpl, pMsg, pInfo); + if (code != 0) { + terrno = code; + dGError("vgId:%d, msg:%p failed to process stream ctrl msg %s since %s", pVnode->vgId, pMsg, + TMSG_INFO(pMsg->msgType), tstrerror(code)); + vmSendRsp(pMsg, code); + } + + dGTrace("vgId:%d, msg:%p is freed, code:0x%x", pVnode->vgId, pMsg, code); + rpcFreeCont(pMsg->pCont); + taosFreeQitem(pMsg); + } +} + static void vmProcessFetchQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { SVnodeObj *pVnode = pInfo->ahandle; SRpcMsg *pMsg = NULL; @@ -245,6 +273,10 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp dGTrace("vgId:%d, msg:%p put into vnode-stream queue", pVnode->vgId, pMsg); code = taosWriteQitem(pVnode->pStreamQ, pMsg); break; + case STREAM_CTRL_QUEUE: + dGTrace("vgId:%d, msg:%p put into vnode-ctrl-stream queue", pVnode->vgId, pMsg); + code = taosWriteQitem(pVnode->pStreamCtrlQ, pMsg); + break; case FETCH_QUEUE: dGTrace("vgId:%d, msg:%p put into vnode-fetch queue", pVnode->vgId, pMsg); code = taosWriteQitem(pVnode->pFetchQ, pMsg); @@ -301,6 +333,8 @@ int32_t vmPutMsgToFetchQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsg int32_t vmPutMsgToStreamQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsgToQueue(pMgmt, pMsg, STREAM_QUEUE); } +int32_t vmPutMsgToStreamCtrlQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsgToQueue(pMgmt, pMsg, STREAM_CTRL_QUEUE); } + int32_t vmPutMsgToMultiMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { const STraceId *trace = &pMsg->info.traceId; dGTrace("msg:%p, put into vnode-multi-mgmt queue", pMsg); @@ -373,6 +407,8 @@ int32_t vmGetQueueSize(SVnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype) { case STREAM_QUEUE: size = taosQueueItemSize(pVnode->pStreamQ); break; + case STREAM_CTRL_QUEUE: + size = taosQueueItemSize(pVnode->pStreamCtrlQ); default: break; } @@ -417,9 +453,11 @@ int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { pVnode->pQueryQ = tQueryAutoQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FItem)vmProcessQueryQueue); pVnode->pStreamQ = tAutoQWorkerAllocQueue(&pMgmt->streamPool, pVnode, (FItem)vmProcessStreamQueue); pVnode->pFetchQ = tWWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FItems)vmProcessFetchQueue); + pVnode->pStreamCtrlQ = tWWorkerAllocQueue(&pMgmt->streamCtrlPool, pVnode, (FItems)vmProcessStreamCtrlQueue); if (pVnode->pWriteW.queue == NULL || pVnode->pSyncW.queue == NULL || pVnode->pSyncRdW.queue == NULL || - pVnode->pApplyW.queue == NULL || pVnode->pQueryQ == NULL || pVnode->pStreamQ == NULL || pVnode->pFetchQ == NULL) { + pVnode->pApplyW.queue == NULL || pVnode->pQueryQ == NULL || pVnode->pStreamQ == NULL || pVnode->pFetchQ == NULL + || pVnode->pStreamCtrlQ == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } @@ -435,15 +473,19 @@ int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { dInfo("vgId:%d, fetch-queue:%p is alloced, thread:%08" PRId64, pVnode->vgId, pVnode->pFetchQ, taosQueueGetThreadId(pVnode->pFetchQ)); dInfo("vgId:%d, stream-queue:%p is alloced", pVnode->vgId, pVnode->pStreamQ); + dInfo("vgId:%d, stream-ctrl-queue:%p is alloced, thread:%08" PRId64, pVnode->vgId, pVnode->pStreamCtrlQ, + taosQueueGetThreadId(pVnode->pStreamCtrlQ)); return 0; } void vmFreeQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { tQueryAutoQWorkerFreeQueue(&pMgmt->queryPool, pVnode->pQueryQ); tAutoQWorkerFreeQueue(&pMgmt->streamPool, pVnode->pStreamQ); + tWWorkerFreeQueue(&pMgmt->streamCtrlPool, pVnode->pStreamCtrlQ); tWWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ); pVnode->pQueryQ = NULL; pVnode->pStreamQ = NULL; + pVnode->pStreamCtrlQ = NULL; pVnode->pFetchQ = NULL; dDebug("vgId:%d, queue is freed", pVnode->vgId); } @@ -463,6 +505,11 @@ int32_t vmStartWorker(SVnodeMgmt *pMgmt) { pStreamPool->ratio = tsRatioOfVnodeStreamThreads; if ((code = tAutoQWorkerInit(pStreamPool)) != 0) return code; + SWWorkerPool *pStreamCtrlPool = &pMgmt->streamCtrlPool; + pStreamCtrlPool->name = "vnode-ctrl-stream"; + pStreamCtrlPool->max = 1; + if ((code = tWWorkerInit(pStreamCtrlPool)) != 0) return code; + SWWorkerPool *pFPool = &pMgmt->fetchPool; pFPool->name = "vnode-fetch"; pFPool->max = tsNumOfVnodeFetchThreads; diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index b33bdb0976..3052d0042c 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -112,6 +112,7 @@ int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo); int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo); int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo); +int32_t vnodeProcessStreamCtrlMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo); void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs); void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs); void vnodeProposeCommitOnNeed(SVnode *pVnode, bool atExit); diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 4eac1cd5c9..708a675353 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -921,7 +921,7 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { } int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { - vTrace("vgId:%d, msg:%p in fetch queue is processing", pVnode->config.vgId, pMsg); + vTrace("vgId:%d, msg:%p in stream queue is processing", pVnode->config.vgId, pMsg); if ((pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_VND_TABLE_META || pMsg->msgType == TDMT_VND_TABLE_CFG || pMsg->msgType == TDMT_VND_BATCH_META) && !syncIsReadyForRead(pVnode->sync)) { @@ -954,8 +954,6 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) return tqProcessTaskRetrieveTriggerReq(pVnode->pTq, pMsg); case TDMT_STREAM_RETRIEVE_TRIGGER_RSP: return tqProcessTaskRetrieveTriggerRsp(pVnode->pTq, pMsg); - case TDMT_MND_STREAM_HEARTBEAT_RSP: - return tqProcessStreamHbRsp(pVnode->pTq, pMsg); case TDMT_MND_STREAM_REQ_CHKPT_RSP: return tqProcessStreamReqCheckpointRsp(pVnode->pTq, pMsg); case TDMT_VND_GET_STREAM_PROGRESS: @@ -968,6 +966,24 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) } } +int32_t vnodeProcessStreamCtrlMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { + vTrace("vgId:%d, msg:%p in stream ctrl queue is processing", pVnode->config.vgId, pMsg); + if ((pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_VND_TABLE_META || pMsg->msgType == TDMT_VND_TABLE_CFG || + pMsg->msgType == TDMT_VND_BATCH_META) && + !syncIsReadyForRead(pVnode->sync)) { + vnodeRedirectRpcMsg(pVnode, pMsg, terrno); + return 0; + } + + switch (pMsg->msgType) { + case TDMT_MND_STREAM_HEARTBEAT_RSP: + return tqProcessStreamHbRsp(pVnode->pTq, pMsg); + default: + vError("unknown msg type:%d in stream ctrl queue", pMsg->msgType); + return TSDB_CODE_APP_ERROR; + } +} + void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) { int32_t code = tdProcessTSmaInsert(((SVnode *)pVnode)->pSma, smaId, (const char *)data); if (code) { From 3223cc0287561e35fbd27deb0b54d08d1b03215c Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 9 Jan 2025 18:37:05 +0800 Subject: [PATCH 3/8] other: add todo --- source/dnode/vnode/src/tqCommon/tqCommon.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 06b7b33cd8..6417023779 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -1179,6 +1179,7 @@ static int32_t tqProcessTaskResumeImpl(void* handle, SStreamTask* pTask, int64_t return code; } +// todo: add test case for invalid rsp for resume: injection error for always return error int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* msg, bool fromVnode) { SVResumeStreamTaskReq* pReq = (SVResumeStreamTaskReq*)msg; From 737138856c8a2cd7832715cb11922f18a9f54850 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 10 Jan 2025 11:29:17 +0800 Subject: [PATCH 4/8] refactor(stream): track the msgId for each upstream tasks. --- include/common/streamMsg.h | 1 + source/libs/stream/src/streamDispatch.c | 12 +++++++++++- source/libs/stream/src/streamTask.c | 1 + 3 files changed, 13 insertions(+), 1 deletion(-) diff --git a/include/common/streamMsg.h b/include/common/streamMsg.h index d410bd17e0..85646a328a 100644 --- a/include/common/streamMsg.h +++ b/include/common/streamMsg.h @@ -41,6 +41,7 @@ typedef struct SStreamUpstreamEpInfo { SEpSet epSet; bool dataAllowed; // denote if the data from this upstream task is allowed to put into inputQ, not serialize it int64_t stage; // upstream task stage value, to denote if the upstream node has restart/replica changed/transfer + int64_t lastMsgId; } SStreamUpstreamEpInfo; int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamUpstreamEpInfo* pInfo); diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 42d7f44b62..ee2d337ff2 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -1842,6 +1842,9 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S return TSDB_CODE_STREAM_TASK_NOT_EXIST; } + stDebug("s-task:%s lastMsgId:%"PRId64 " for upstream taskId:0x%x(vgId:%d)", id, pInfo->lastMsgId, pReq->upstreamTaskId, + pReq->upstreamNodeId); + if (pMeta->role == NODE_ROLE_FOLLOWER) { stError("s-task:%s task on follower received dispatch msgs, dispatch msg rejected", id); status = TASK_INPUT_STATUS__REFUSED; @@ -1866,7 +1869,14 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S stDebug("s-task:%s recv trans-state msgId:%d from upstream:0x%x", id, pReq->msgId, pReq->upstreamTaskId); } - status = streamTaskAppendInputBlocks(pTask, pReq); + if (pReq->msgId > pInfo->lastMsgId) { + status = streamTaskAppendInputBlocks(pTask, pReq); + pInfo->lastMsgId = pReq->msgId; + } else { + stWarn("s-task:%s duplicate msgId:%d from upstream:0x%x, from vgId:%d already recv msgId:%" PRId64, id, + pReq->msgId, pReq->upstreamTaskId, pReq->srcVgId, pInfo->lastMsgId); + status = TASK_INPUT_STATUS__NORMAL; // still return success + } } } } diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index d27ed520c6..dd33ee9613 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -100,6 +100,7 @@ static SStreamUpstreamEpInfo* createStreamTaskEpInfo(const SStreamTask* pTask) { pEpInfo->nodeId = pTask->info.nodeId; pEpInfo->taskId = pTask->id.taskId; pEpInfo->stage = -1; + pEpInfo->lastMsgId = -1; return pEpInfo; } From a3eea6fe1c76ec068592e3fc907b1e342b0802ac Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 10 Jan 2025 14:11:15 +0800 Subject: [PATCH 5/8] fix(stream): fix race condition. --- source/libs/stream/src/streamDispatch.c | 139 +++++++++++--------- source/libs/stream/src/streamStartHistory.c | 2 - 2 files changed, 76 insertions(+), 65 deletions(-) diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index ee2d337ff2..3d979f3d11 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -527,6 +527,76 @@ static void cleanupInMonitor(int32_t taskId, int64_t taskRefId, void* param) { streamTaskFreeRefId(param); } +static int32_t sendFailedDispatchData(SStreamTask* pTask, int64_t now) { + int32_t code = 0; + const char* id = pTask->id.idStr; + SDispatchMsgInfo* pMsgInfo = &pTask->msgInfo; + + streamMutexLock(&pMsgInfo->lock); + + int32_t msgId = pMsgInfo->msgId; + SStreamDispatchReq* pReq = pTask->msgInfo.pData; + + if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { + stDebug("s-task:%s (child taskId:%d) retry shuffle-dispatch to down streams, msgId:%d", id, pTask->info.selfChildId, + msgId); + + int32_t numOfRetry = 0; + for (int32_t i = 0; i < taosArrayGetSize(pTask->msgInfo.pSendInfo); ++i) { + SDispatchEntry* pEntry = taosArrayGet(pTask->msgInfo.pSendInfo, i); + if (pEntry == NULL) { + continue; + } + + if (pEntry->status == TSDB_CODE_SUCCESS && pEntry->rspTs > 0) { + continue; + } + + // downstream not rsp yet beyond threshold that is 10s + if (isDispatchRspTimeout(pEntry, now)) { // not respond yet beyonds 30s, re-send data + doSendFailedDispatch(pTask, pEntry, now, "timeout"); + numOfRetry += 1; + continue; + } + + // downstream inputQ is closed + if (pEntry->status == TASK_INPUT_STATUS__BLOCKED) { + doSendFailedDispatch(pTask, pEntry, now, "downstream inputQ blocked"); + numOfRetry += 1; + continue; + } + + // handle other errors + if (pEntry->status != TSDB_CODE_SUCCESS) { + doSendFailedDispatch(pTask, pEntry, now, "downstream error"); + numOfRetry += 1; + } + } + + stDebug("s-task:%s complete retry shuffle-dispatch blocks to all %d vnodes, msgId:%d", pTask->id.idStr, numOfRetry, + msgId); + } else { + int32_t dstVgId = pTask->outputInfo.fixedDispatcher.nodeId; + SEpSet* pEpSet = &pTask->outputInfo.fixedDispatcher.epSet; + int32_t downstreamTaskId = pTask->outputInfo.fixedDispatcher.taskId; + + int32_t s = taosArrayGetSize(pTask->msgInfo.pSendInfo); + SDispatchEntry* pEntry = taosArrayGet(pTask->msgInfo.pSendInfo, 0); + if (pEntry != NULL) { + setResendInfo(pEntry, now); + code = doSendDispatchMsg(pTask, pReq, dstVgId, pEpSet); + + stDebug("s-task:%s (child taskId:%d) fix-dispatch %d block(s) to s-task:0x%x (vgId:%d), msgId:%d, code:%s", id, + pTask->info.selfChildId, 1, downstreamTaskId, dstVgId, msgId, tstrerror(code)); + } else { + stError("s-task:%s invalid index 0, size:%d", id, s); + } + } + + streamMutexUnlock(&pMsgInfo->lock); + return code; +} + static void doMonitorDispatchData(void* param, void* tmrId) { int32_t code = 0; int64_t now = taosGetTimestampMs(); @@ -590,65 +660,7 @@ static void doMonitorDispatchData(void* param, void* tmrId) { return; } - { - SStreamDispatchReq* pReq = pTask->msgInfo.pData; - - if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { - stDebug("s-task:%s (child taskId:%d) retry shuffle-dispatch to down streams, msgId:%d", id, - pTask->info.selfChildId, msgId); - - int32_t numOfRetry = 0; - for (int32_t i = 0; i < taosArrayGetSize(pTask->msgInfo.pSendInfo); ++i) { - SDispatchEntry* pEntry = taosArrayGet(pTask->msgInfo.pSendInfo, i); - if (pEntry == NULL) { - continue; - } - - if (pEntry->status == TSDB_CODE_SUCCESS && pEntry->rspTs > 0) { - continue; - } - - // downstream not rsp yet beyond threshold that is 10s - if (isDispatchRspTimeout(pEntry, now)) { // not respond yet beyonds 30s, re-send data - doSendFailedDispatch(pTask, pEntry, now, "timeout"); - numOfRetry += 1; - continue; - } - - // downstream inputQ is closed - if (pEntry->status == TASK_INPUT_STATUS__BLOCKED) { - doSendFailedDispatch(pTask, pEntry, now, "downstream inputQ blocked"); - numOfRetry += 1; - continue; - } - - // handle other errors - if (pEntry->status != TSDB_CODE_SUCCESS) { - doSendFailedDispatch(pTask, pEntry, now, "downstream error"); - numOfRetry += 1; - } - } - - stDebug("s-task:%s complete retry shuffle-dispatch blocks to all %d vnodes, msgId:%d", pTask->id.idStr, - numOfRetry, msgId); - } else { - int32_t dstVgId = pTask->outputInfo.fixedDispatcher.nodeId; - SEpSet* pEpSet = &pTask->outputInfo.fixedDispatcher.epSet; - int32_t downstreamTaskId = pTask->outputInfo.fixedDispatcher.taskId; - - int32_t s = taosArrayGetSize(pTask->msgInfo.pSendInfo); - SDispatchEntry* pEntry = taosArrayGet(pTask->msgInfo.pSendInfo, 0); - if (pEntry != NULL) { - setResendInfo(pEntry, now); - code = doSendDispatchMsg(pTask, pReq, dstVgId, pEpSet); - - stDebug("s-task:%s (child taskId:%d) fix-dispatch %d block(s) to s-task:0x%x (vgId:%d), msgId:%d, code:%s", id, - pTask->info.selfChildId, 1, downstreamTaskId, dstVgId, msgId, tstrerror(code)); - } else { - stError("s-task:%s invalid index 0, size:%d", id, s); - } - } - } + code = sendFailedDispatchData(pTask, now); if (streamTaskShouldStop(pTask)) { stDebug("s-task:%s should stop, abort from timer", pTask->id.idStr); @@ -880,7 +892,7 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) { code = sendDispatchMsg(pTask, pTask->msgInfo.pData); - // todo: secure the timerActive and start timer in after lock pTask->lock + // todo: start timer in after lock pTask->lock streamMutexLock(&pTask->lock); bool shouldStop = streamTaskShouldStop(pTask); streamMutexUnlock(&pTask->lock); @@ -890,7 +902,6 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) { } else { streamMutexLock(&pTask->msgInfo.lock); if (pTask->msgInfo.inMonitor == 0) { -// int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); stDebug("s-task:%s start dispatch monitor tmr in %dms, dispatch code:%s", id, DISPATCH_RETRY_INTERVAL_MS, tstrerror(code)); streamStartMonitorDispatchData(pTask, DISPATCH_RETRY_INTERVAL_MS); @@ -1873,8 +1884,10 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S status = streamTaskAppendInputBlocks(pTask, pReq); pInfo->lastMsgId = pReq->msgId; } else { - stWarn("s-task:%s duplicate msgId:%d from upstream:0x%x, from vgId:%d already recv msgId:%" PRId64, id, - pReq->msgId, pReq->upstreamTaskId, pReq->srcVgId, pInfo->lastMsgId); + stWarn( + "s-task:%s duplicate msgId:%d from upstream:0x%x discard and return succ, from vgId:%d already recv " + "msgId:%" PRId64, + id, pReq->msgId, pReq->upstreamTaskId, pReq->upstreamNodeId, pInfo->lastMsgId); status = TASK_INPUT_STATUS__NORMAL; // still return success } } diff --git a/source/libs/stream/src/streamStartHistory.c b/source/libs/stream/src/streamStartHistory.c index 54a8929123..4eb2e2e4e1 100644 --- a/source/libs/stream/src/streamStartHistory.c +++ b/source/libs/stream/src/streamStartHistory.c @@ -281,7 +281,6 @@ void notRetryLaunchFillHistoryTask(SStreamTask* pTask, SLaunchHTaskInfo* pInfo, SStreamMeta* pMeta = pTask->pMeta; SHistoryTaskInfo* pHTaskInfo = &pTask->hTaskInfo; -// int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); int32_t code = streamMetaAddTaskLaunchResult(pMeta, pInfo->hTaskId.streamId, pInfo->hTaskId.taskId, 0, now, false); if (code) { @@ -300,7 +299,6 @@ void doRetryLaunchFillHistoryTask(SStreamTask* pTask, SLaunchHTaskInfo* pInfo, i SHistoryTaskInfo* pHTaskInfo = &pTask->hTaskInfo; if (streamTaskShouldStop(pTask)) { // record the failure -// int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); stDebug("s-task:0x%" PRIx64 " stopped, not launch rel history task:0x%" PRIx64, pInfo->id.taskId, pInfo->hTaskId.taskId); From 05a27eb7b06a956ffd55f1e99a28520352099283 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 12 Jan 2025 01:50:54 +0800 Subject: [PATCH 6/8] fix(stream): fix memory leak. --- source/dnode/mgmt/mgmt_vnode/src/vmWorker.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index 4b278d533d..4cdf77c7e2 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -542,5 +542,6 @@ void vmStopWorker(SVnodeMgmt *pMgmt) { tQueryAutoQWorkerCleanup(&pMgmt->queryPool); tAutoQWorkerCleanup(&pMgmt->streamPool); tWWorkerCleanup(&pMgmt->fetchPool); + tWWorkerCleanup(&pMgmt->streamCtrlPool); dDebug("vnode workers are closed"); } From eff1aff58eb1f31ee8ec17d13048fafb56448952 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 16 Jan 2025 17:16:31 +0800 Subject: [PATCH 7/8] fix(stream): update the task last msgId when putting into input queue succ. --- source/libs/stream/src/streamDispatch.c | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 3d979f3d11..93f6c60da4 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -1882,7 +1882,12 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S if (pReq->msgId > pInfo->lastMsgId) { status = streamTaskAppendInputBlocks(pTask, pReq); - pInfo->lastMsgId = pReq->msgId; + if (status == TASK_INPUT_STATUS__NORMAL) { + stDebug("s-task:%s update the lastMsgId from %" PRId64 " to %" PRId64, id, pInfo->lastMsgId, pReq->msgId); + pInfo->lastMsgId = pReq->msgId; + } else { + stDebug("s-task:%s not update the lastMsgId, remain:%" PRId64, id, pInfo->lastMsgId); + } } else { stWarn( "s-task:%s duplicate msgId:%d from upstream:0x%x discard and return succ, from vgId:%d already recv " From 29421c17d87f14d62396b646a152ba62dd971ca7 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 16 Jan 2025 17:48:26 +0800 Subject: [PATCH 8/8] fix(stream): fix syntax error. --- source/libs/stream/src/streamDispatch.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 93f6c60da4..a411c757fc 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -1883,7 +1883,7 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S if (pReq->msgId > pInfo->lastMsgId) { status = streamTaskAppendInputBlocks(pTask, pReq); if (status == TASK_INPUT_STATUS__NORMAL) { - stDebug("s-task:%s update the lastMsgId from %" PRId64 " to %" PRId64, id, pInfo->lastMsgId, pReq->msgId); + stDebug("s-task:%s update the lastMsgId from %" PRId64 " to %d", id, pInfo->lastMsgId, pReq->msgId); pInfo->lastMsgId = pReq->msgId; } else { stDebug("s-task:%s not update the lastMsgId, remain:%" PRId64, id, pInfo->lastMsgId);