From 6766e6db5398364ec24801079b4c496433dc1cd1 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 11 Feb 2025 00:57:18 +0800 Subject: [PATCH] refactor(stream): add checkpoint queue to handle chkpt. --- include/common/tmsgcb.h | 1 + include/common/tmsgdef.h | 1 + include/libs/stream/tstream.h | 4 +- source/dnode/mgmt/mgmt_vnode/inc/vmInt.h | 3 ++ source/dnode/mgmt/mgmt_vnode/src/vmHandle.c | 1 + source/dnode/mgmt/mgmt_vnode/src/vmInt.c | 3 ++ source/dnode/mgmt/mgmt_vnode/src/vmWorker.c | 50 ++++++++++++++++++++- source/dnode/snode/src/snode.c | 2 +- source/dnode/vnode/inc/vnode.h | 1 + source/dnode/vnode/src/tq/tqStreamTask.c | 10 ++--- source/dnode/vnode/src/tqCommon/tqCommon.c | 6 +-- source/dnode/vnode/src/vnd/vnodeSvr.c | 18 ++++++++ source/dnode/vnode/src/vnd/vnodeSync.c | 2 +- source/libs/stream/src/streamCheckStatus.c | 2 +- source/libs/stream/src/streamCheckpoint.c | 2 +- source/libs/stream/src/streamDispatch.c | 4 +- source/libs/stream/src/streamSched.c | 35 ++++++++++----- source/libs/stream/src/streamTask.c | 2 +- 18 files changed, 119 insertions(+), 28 deletions(-) diff --git a/include/common/tmsgcb.h b/include/common/tmsgcb.h index 2847f4278a..3ce03d292f 100644 --- a/include/common/tmsgcb.h +++ b/include/common/tmsgcb.h @@ -39,6 +39,7 @@ typedef enum { ARB_QUEUE, STREAM_CTRL_QUEUE, STREAM_LONG_EXEC_QUEUE, + STREAM_CHKPT_QUEUE, QUEUE_MAX, } EQueueType; diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 8bdc9a9346..3ab7044e83 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -354,6 +354,7 @@ TD_DEF_MSG_TYPE(TDMT_STREAM_DROP, "stream-drop", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_RETRIEVE_TRIGGER, "stream-retri-trigger", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_CONSEN_CHKPT, "stream-consen-chkpt", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_STREAM_CHKPT_EXEC, "stream-exec-chkpt", NULL, NULL) TD_CLOSE_MSG_SEG(TDMT_STREAM_MSG) TD_NEW_MSG_SEG(TDMT_MON_MSG) //5 << 8 diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 5b418e6e85..3262130bd4 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -684,8 +684,8 @@ int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, int32_t clearRelHalt); int32_t streamExecTask(SStreamTask* pTask); int32_t streamResumeTask(SStreamTask* pTask); -int32_t streamTrySchedExec(SStreamTask* pTask); -int32_t streamTaskSchedTask(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int32_t taskId, int32_t execType); +int32_t streamTrySchedExec(SStreamTask* pTask, bool chkptExec); +int32_t streamTaskSchedTask(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int32_t taskId, int32_t execType, bool chkptExec); void streamTaskResumeInFuture(SStreamTask* pTask); void streamTaskClearSchedIdleInfo(SStreamTask* pTask); void streamTaskSetIdleInfo(SStreamTask* pTask, int32_t idleTime); diff --git a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h index 9b4c11d6ae..583883590a 100644 --- a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h +++ b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h @@ -34,6 +34,7 @@ typedef struct SVnodeMgmt { SAutoQWorkerPool streamPool; SAutoQWorkerPool streamLongExecPool; SWWorkerPool streamCtrlPool; + SWWorkerPool streamChkPool; SWWorkerPool fetchPool; SSingleWorker mgmtWorker; SSingleWorker mgmtMultiWorker; @@ -77,6 +78,7 @@ typedef struct { STaosQueue *pStreamQ; STaosQueue *pStreamCtrlQ; STaosQueue *pStreamLongExecQ; + STaosQueue *pStreamChkQ; STaosQueue *pFetchQ; STaosQueue *pMultiMgmQ; } SVnodeObj; @@ -141,6 +143,7 @@ int32_t vmPutMsgToStreamQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmPutMsgToStreamCtrlQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmPutMsgToStreamLongExecQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); +int32_t vmPutMsgToStreamChkQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmPutMsgToMergeQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmPutMsgToMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmPutMsgToMultiMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 1dea7d3cad..7ead7f8e8d 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -1019,6 +1019,7 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_REPORT_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_SCAN_HISTORY, vmPutMsgToStreamLongExecQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_CHKPT_EXEC, vmPutMsgToStreamChkQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_GET_STREAM_PROGRESS, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c index 6f30977e10..e872e1f972 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c @@ -407,6 +407,9 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal, pVnode->pStreamLongExecQ, taosQueueItemSize(pVnode->pStreamLongExecQ)); while (!taosQueueEmpty(pVnode->pStreamLongExecQ)) taosMsleep(50); + dInfo("vgId:%d, wait for vnode stream chkpt queue:%p is empty", pVnode->vgId, pVnode->pStreamChkQ); + while (!taosQueueEmpty(pVnode->pStreamChkQ)) taosMsleep(10); + dInfo("vgId:%d, all vnode queues is empty", pVnode->vgId); dInfo("vgId:%d, post close", pVnode->vgId); diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index 5acd06bbda..41f3c64e7d 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -165,6 +165,34 @@ static void vmProcessStreamCtrlQueue(SQueueInfo *pInfo, STaosQall* pQall, int32_ } } +static void vmProcessStreamChkptQueue(SQueueInfo *pInfo, STaosQall* pQall, int32_t numOfItems) { + SVnodeObj *pVnode = pInfo->ahandle; + void *pItem = NULL; + int32_t code = 0; + + while (1) { + if (taosGetQitem(pQall, &pItem) == 0) { + break; + } + + SRpcMsg *pMsg = pItem; + const STraceId *trace = &pMsg->info.traceId; + + dGTrace("vgId:%d, msg:%p get from vnode-stream-chkpt queue", pVnode->vgId, pMsg); + code = vnodeProcessStreamChkptMsg(pVnode->pImpl, pMsg, pInfo); + if (code != 0) { + terrno = code; + dGError("vgId:%d, msg:%p failed to process stream chkpt msg %s since %s", pVnode->vgId, pMsg, + TMSG_INFO(pMsg->msgType), tstrerror(code)); + vmSendRsp(pMsg, code); + } + + dGTrace("vgId:%d, msg:%p is freed, code:0x%x", pVnode->vgId, pMsg, code); + rpcFreeCont(pMsg->pCont); + taosFreeQitem(pMsg); + } +} + static void vmProcessStreamLongExecQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { SVnodeObj *pVnode = pInfo->ahandle; const STraceId *trace = &pMsg->info.traceId; @@ -301,6 +329,10 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp dGTrace("vgId:%d, msg:%p put into vnode-stream-long-exec queue", pVnode->vgId, pMsg); code = taosWriteQitem(pVnode->pStreamLongExecQ, pMsg); break; + case STREAM_CHKPT_QUEUE: + dGTrace("vgId:%d, msg:%p put into vnode-stream-chkpt queue", pVnode->vgId, pMsg); + code = taosWriteQitem(pVnode->pStreamChkQ, pMsg); + break; case FETCH_QUEUE: dGTrace("vgId:%d, msg:%p put into vnode-fetch queue", pVnode->vgId, pMsg); code = taosWriteQitem(pVnode->pFetchQ, pMsg); @@ -361,6 +393,8 @@ int32_t vmPutMsgToStreamCtrlQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmP int32_t vmPutMsgToStreamLongExecQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsgToQueue(pMgmt, pMsg, STREAM_LONG_EXEC_QUEUE); } +int32_t vmPutMsgToStreamChkQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsgToQueue(pMgmt, pMsg, STREAM_CHKPT_QUEUE); } + int32_t vmPutMsgToMultiMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { const STraceId *trace = &pMsg->info.traceId; dGTrace("msg:%p, put into vnode-multi-mgmt queue", pMsg); @@ -439,6 +473,8 @@ int32_t vmGetQueueSize(SVnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype) { case STREAM_LONG_EXEC_QUEUE: size = taosQueueItemSize(pVnode->pStreamLongExecQ); break; + case STREAM_CHKPT_QUEUE: + size = taosQueueItemSize(pVnode->pStreamChkQ); default: break; } @@ -487,10 +523,11 @@ int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { pVnode->pStreamQ = tAutoQWorkerAllocQueue(&pMgmt->streamPool, pVnode, (FItem)vmProcessStreamQueue, 2); pVnode->pStreamCtrlQ = tWWorkerAllocQueue(&pMgmt->streamCtrlPool, pVnode, (FItems)vmProcessStreamCtrlQueue); pVnode->pStreamLongExecQ = tAutoQWorkerAllocQueue(&pMgmt->streamLongExecPool, pVnode, (FItem)vmProcessStreamLongExecQueue, 1); + pVnode->pStreamChkQ = tWWorkerAllocQueue(&pMgmt->streamChkPool, pVnode, (FItems)vmProcessStreamChkptQueue); if (pVnode->pWriteW.queue == NULL || pVnode->pSyncW.queue == NULL || pVnode->pSyncRdW.queue == NULL || pVnode->pApplyW.queue == NULL || pVnode->pQueryQ == NULL || pVnode->pStreamQ == NULL || pVnode->pFetchQ == NULL - || pVnode->pStreamCtrlQ == NULL || pVnode->pStreamLongExecQ == NULL) { + || pVnode->pStreamCtrlQ == NULL || pVnode->pStreamLongExecQ == NULL || pVnode->pStreamChkQ == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } @@ -509,6 +546,8 @@ int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { dInfo("vgId:%d, stream-long-exec-queue:%p is alloced", pVnode->vgId, pVnode->pStreamLongExecQ); dInfo("vgId:%d, stream-ctrl-queue:%p is alloced, thread:%08" PRId64, pVnode->vgId, pVnode->pStreamCtrlQ, taosQueueGetThreadId(pVnode->pStreamCtrlQ)); + dInfo("vgId:%d, stream-chk-queue:%p is alloced, thread:%08" PRId64, pVnode->vgId, pVnode->pStreamChkQ, + taosQueueGetThreadId(pVnode->pStreamChkQ)); return 0; } @@ -517,6 +556,7 @@ void vmFreeQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { tAutoQWorkerFreeQueue(&pMgmt->streamPool, pVnode->pStreamQ); tAutoQWorkerFreeQueue(&pMgmt->streamLongExecPool, pVnode->pStreamLongExecQ); tWWorkerFreeQueue(&pMgmt->streamCtrlPool, pVnode->pStreamCtrlQ); + tWWorkerFreeQueue(&pMgmt->streamChkPool, pVnode->pStreamChkQ); tWWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ); pVnode->pQueryQ = NULL; pVnode->pFetchQ = NULL; @@ -525,6 +565,8 @@ void vmFreeQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { pVnode->pStreamCtrlQ = NULL; pVnode->pStreamLongExecQ = NULL; + pVnode->pStreamChkQ = NULL; + pVnode->pFetchQ = NULL; dDebug("vgId:%d, queue is freed", pVnode->vgId); } @@ -554,6 +596,11 @@ int32_t vmStartWorker(SVnodeMgmt *pMgmt) { pStreamCtrlPool->max = 1; if ((code = tWWorkerInit(pStreamCtrlPool)) != 0) return code; + SWWorkerPool *pStreamChkPool = &pMgmt->streamChkPool; + pStreamChkPool->name = "vnode-stream-chkpt"; + pStreamChkPool->max = 1; + if ((code = tWWorkerInit(pStreamChkPool)) != 0) return code; + SWWorkerPool *pFPool = &pMgmt->fetchPool; pFPool->name = "vnode-fetch"; pFPool->max = tsNumOfVnodeFetchThreads; @@ -587,6 +634,7 @@ void vmStopWorker(SVnodeMgmt *pMgmt) { tAutoQWorkerCleanup(&pMgmt->streamPool); tAutoQWorkerCleanup(&pMgmt->streamLongExecPool); tWWorkerCleanup(&pMgmt->streamCtrlPool); + tWWorkerCleanup(&pMgmt->streamChkPool); tWWorkerCleanup(&pMgmt->fetchPool); dDebug("vnode workers are closed"); } diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 6eee8c510b..12609e2c06 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -92,7 +92,7 @@ FAIL: } int32_t sndInit(SSnode *pSnode) { - if (streamTaskSchedTask(&pSnode->msgCb, pSnode->pMeta->vgId, 0, 0, STREAM_EXEC_T_START_ALL_TASKS) != 0) { + if (streamTaskSchedTask(&pSnode->msgCb, pSnode->pMeta->vgId, 0, 0, STREAM_EXEC_T_START_ALL_TASKS, false) != 0) { sndError("failed to start all tasks"); } return 0; diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 871a8c06e1..ddd9878eba 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -114,6 +114,7 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo); int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo); int32_t vnodeProcessStreamCtrlMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo); int32_t vnodeProcessStreamLongExecMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo); +int32_t vnodeProcessStreamChkptMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo); void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs); void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs); void vnodeProposeCommitOnNeed(SVnode *pVnode, bool atExit); diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index d98ce58161..645dbb2a1a 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -109,7 +109,7 @@ static void doStartScanWal(void* param, void* tmrId) { taosMsleep(10000); #endif - code = streamTaskSchedTask(&pParam->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA); + code = streamTaskSchedTask(&pParam->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA, false); if (code) { tqError("vgId:%d failed sched task to scan wal, code:%s", vgId, tstrerror(code)); } @@ -170,7 +170,7 @@ int32_t tqScanWalAsync(STQ* pTq, bool ckPause) { int32_t tqStopStreamTasksAsync(STQ* pTq) { SStreamMeta* pMeta = pTq->pStreamMeta; int32_t vgId = pMeta->vgId; - return streamTaskSchedTask(&pTq->pVnode->msgCb, vgId, 0, 0, STREAM_EXEC_T_STOP_ALL_TASKS); + return streamTaskSchedTask(&pTq->pVnode->msgCb, vgId, 0, 0, STREAM_EXEC_T_STOP_ALL_TASKS, false); } int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId) { @@ -276,7 +276,7 @@ bool taskReadyForDataFromWal(SStreamTask* pTask) { // check whether input queue is full or not if (streamQueueIsFull(pTask->inputq.queue)) { tqTrace("s-task:%s input queue is full, launch task without scanning wal", pTask->id.idStr); - streamTrySchedExec(pTask); + streamTrySchedExec(pTask, false); return false; } @@ -412,7 +412,7 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta) { streamMutexUnlock(&pTask->lock); if ((numOfItems > 0) || hasNewData) { - code = streamTrySchedExec(pTask); + code = streamTrySchedExec(pTask, false); if (code != TSDB_CODE_SUCCESS) { streamMetaReleaseTask(pStreamMeta, pTask); taosArrayDestroy(pTaskList); @@ -465,7 +465,7 @@ int32_t doScanWalAsync(STQ* pTq, bool ckPause) { tqDebug("vgId:%d create msg to start wal scan to launch stream tasks, numOfTasks:%d, vnd restored:%d", vgId, numOfTasks, alreadyRestored); - return streamTaskSchedTask(&pTq->pVnode->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA); + return streamTaskSchedTask(&pTq->pVnode->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA, false); } void streamMetaFreeTQDuringScanWalError(STQ* pTq) { diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 76f0fd6bc6..0b206110b3 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -123,7 +123,7 @@ int32_t tqStreamTaskStartAsync(SStreamMeta* pMeta, SMsgCb* cb, bool restart) { tqDebug("vgId:%d start all %d stream task(s) async", vgId, numOfTasks); int32_t type = restart ? STREAM_EXEC_T_RESTART_ALL_TASKS : STREAM_EXEC_T_START_ALL_TASKS; - return streamTaskSchedTask(cb, vgId, 0, 0, type); + return streamTaskSchedTask(cb, vgId, 0, 0, type, false); } int32_t tqStreamStartOneTaskAsync(SStreamMeta* pMeta, SMsgCb* cb, int64_t streamId, int32_t taskId) { @@ -135,7 +135,7 @@ int32_t tqStreamStartOneTaskAsync(SStreamMeta* pMeta, SMsgCb* cb, int64_t stream } tqDebug("vgId:%d start task:0x%x async", vgId, taskId); - return streamTaskSchedTask(cb, vgId, streamId, taskId, STREAM_EXEC_T_START_ONE_TASK); + return streamTaskSchedTask(cb, vgId, streamId, taskId, STREAM_EXEC_T_START_ONE_TASK, false); } // this is to process request from transaction, always return true. @@ -1177,7 +1177,7 @@ static int32_t tqProcessTaskResumeImpl(void* handle, SStreamTask* pTask, int64_t } else if (level == TASK_LEVEL__SOURCE && (streamQueueGetNumOfItems(pTask->inputq.queue) == 0)) { code = tqScanWalAsync((STQ*)handle, false); } else { - code = streamTrySchedExec(pTask); + code = streamTrySchedExec(pTask, false); } } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index f37dd94106..aa55885f15 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -1005,6 +1005,24 @@ int32_t vnodeProcessStreamLongExecMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo } } +int32_t vnodeProcessStreamChkptMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { + vTrace("vgId:%d, msg:%p in stream chkpt queue is processing", pVnode->config.vgId, pMsg); + if ((pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_VND_TABLE_META || pMsg->msgType == TDMT_VND_TABLE_CFG || + pMsg->msgType == TDMT_VND_BATCH_META) && + !syncIsReadyForRead(pVnode->sync)) { + vnodeRedirectRpcMsg(pVnode, pMsg, terrno); + return 0; + } + + switch (pMsg->msgType) { + case TDMT_STREAM_CHKPT_EXEC: + return tqProcessTaskRunReq(pVnode->pTq, pMsg); + default: + vError("unknown msg type:%d in stream chkpt queue", pMsg->msgType); + return TSDB_CODE_APP_ERROR; + } +} + void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) { int32_t code = tdProcessTSmaInsert(((SVnode *)pVnode)->pSma, smaId, (const char *)data); if (code) { diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 068f4dec3d..74ffb69716 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -594,7 +594,7 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) streamMetaWUnLock(pMeta); tqInfo("vgId:%d stream task already loaded, start them", vgId); - int32_t code = streamTaskSchedTask(&pVnode->msgCb, TD_VID(pVnode), 0, 0, STREAM_EXEC_T_START_ALL_TASKS); + int32_t code = streamTaskSchedTask(&pVnode->msgCb, TD_VID(pVnode), 0, 0, STREAM_EXEC_T_START_ALL_TASKS, false); if (code != 0) { tqError("vgId:%d failed to sched stream task, code:%s", vgId, tstrerror(code)); } diff --git a/source/libs/stream/src/streamCheckStatus.c b/source/libs/stream/src/streamCheckStatus.c index f880526541..2185df5ec0 100644 --- a/source/libs/stream/src/streamCheckStatus.c +++ b/source/libs/stream/src/streamCheckStatus.c @@ -717,7 +717,7 @@ void handleNotReadyDownstreamTask(SStreamTask* pTask, SArray* pNotReadyList) { // The restart of all tasks requires that all tasks should not have active timer for now. Therefore, the execution // of restart in timer thread will result in a deadlock. int32_t addDownstreamFailedStatusResultAsync(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int32_t taskId) { - return streamTaskSchedTask(pMsgCb, vgId, streamId, taskId, STREAM_EXEC_T_ADD_FAILED_TASK); + return streamTaskSchedTask(pMsgCb, vgId, streamId, taskId, STREAM_EXEC_T_ADD_FAILED_TASK, false); } static void doCleanup(SStreamTask* pTask, SArray* pNotReadyList, SArray* pTimeoutList, void* param) { diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index a43cdd0b85..020a02db20 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -93,7 +93,7 @@ int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpointType, i return TSDB_CODE_OUT_OF_MEMORY; } - return streamTrySchedExec(pTask); + return streamTrySchedExec(pTask, true); } int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq) { diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index b093f808c0..6cb68b7604 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -1834,6 +1834,7 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S int32_t status = 0; SStreamMeta* pMeta = pTask->pMeta; const char* id = pTask->id.idStr; + bool chkptMsg = false; stDebug("s-task:%s receive dispatch msg from taskId:0x%x(vgId:%d), msgLen:%" PRId64 ", msgId:%d", id, pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->totalLen, pReq->msgId); @@ -1863,6 +1864,7 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S // blocked. Note that there is no race condition here. if (pReq->type == STREAM_INPUT__CHECKPOINT_TRIGGER) { streamTaskCloseUpstreamInput(pTask, pReq->upstreamTaskId); + chkptMsg = true; stDebug("s-task:%s close inputQ for upstream:0x%x, msgId:%d", id, pReq->upstreamTaskId, pReq->msgId); } else if (pReq->type == STREAM_INPUT__TRANS_STATE) { stDebug("s-task:%s recv trans-state msgId:%d from upstream:0x%x", id, pReq->msgId, pReq->upstreamTaskId); @@ -1890,5 +1892,5 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S tmsgSendRsp(pRsp); } - return streamTrySchedExec(pTask); + return streamTrySchedExec(pTask, chkptMsg); } diff --git a/source/libs/stream/src/streamSched.c b/source/libs/stream/src/streamSched.c index 1f76f349ae..7d3d5dcc49 100644 --- a/source/libs/stream/src/streamSched.c +++ b/source/libs/stream/src/streamSched.c @@ -93,9 +93,9 @@ void streamSetupScheduleTrigger(SStreamTask* pTask) { } } -int32_t streamTrySchedExec(SStreamTask* pTask) { +int32_t streamTrySchedExec(SStreamTask* pTask, bool chkptQueue) { if (streamTaskSetSchedStatusWait(pTask)) { - return streamTaskSchedTask(pTask->pMsgCb, pTask->info.nodeId, pTask->id.streamId, pTask->id.taskId, 0); + return streamTaskSchedTask(pTask->pMsgCb, pTask->info.nodeId, pTask->id.streamId, pTask->id.taskId, 0, chkptQueue); } else { stTrace("s-task:%s not launch task since sched status:%d", pTask->id.idStr, pTask->status.schedStatus); } @@ -103,7 +103,7 @@ int32_t streamTrySchedExec(SStreamTask* pTask) { return 0; } -int32_t streamTaskSchedTask(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int32_t taskId, int32_t execType) { +int32_t streamTaskSchedTask(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int32_t taskId, int32_t execType, bool chkptExec) { int32_t code = 0; int32_t tlen = 0; @@ -142,10 +142,18 @@ int32_t streamTaskSchedTask(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int3 stDebug("vgId:%d create msg to exec, type:%d, %s", vgId, execType, streamTaskGetExecType(execType)); } - SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = buf, .contLen = tlen + sizeof(SMsgHead)}; - code = tmsgPutToQueue(pMsgCb, STREAM_QUEUE, &msg); - if (code) { - stError("vgId:%d failed to put msg into stream queue, code:%s, %x", vgId, tstrerror(code), taskId); + if (chkptExec) { + SRpcMsg msg = {.msgType = TDMT_STREAM_CHKPT_EXEC, .pCont = buf, .contLen = tlen + sizeof(SMsgHead)}; + code = tmsgPutToQueue(pMsgCb, STREAM_CHKPT_QUEUE, &msg); + if (code) { + stError("vgId:%d failed to put msg into stream chkpt queue, code:%s, %x", vgId, tstrerror(code), taskId); + } + } else { + SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = buf, .contLen = tlen + sizeof(SMsgHead)}; + code = tmsgPutToQueue(pMsgCb, STREAM_QUEUE, &msg); + if (code) { + stError("vgId:%d failed to put msg into stream queue, code:%s, %x", vgId, tstrerror(code), taskId); + } } return code; } @@ -191,12 +199,17 @@ void streamTaskResumeHelper(void* param, void* tmrId) { return; } - code = streamTaskSchedTask(pTask->pMsgCb, pTask->info.nodeId, pId->streamId, pId->taskId, STREAM_EXEC_T_RESUME_TASK); + code = streamTaskSchedTask(pTask->pMsgCb, pTask->info.nodeId, pId->streamId, pId->taskId, STREAM_EXEC_T_RESUME_TASK, + (p.state == TASK_STATUS__CK)); if (code) { stError("s-task:%s sched task failed, code:%s", pId->idStr, tstrerror(code)); } else { - stDebug("trigger to resume s-task:%s after idled for %dms", pId->idStr, pTask->status.schedIdleTime); - + if (p.state == TASK_STATUS__CK) { + stDebug("trigger to resume s-task:%s in stream chkpt queue after idled for %dms", pId->idStr, + pTask->status.schedIdleTime); + } else { + stDebug("trigger to resume s-task:%s after idled for %dms", pId->idStr, pTask->status.schedIdleTime); + } // release the task ref count streamTaskClearSchedIdleInfo(pTask); } @@ -339,7 +352,7 @@ void streamTaskSchedHelper(void* param, void* tmrId) { } } - code = streamTrySchedExec(pTask); + code = streamTrySchedExec(pTask, false); if (code != TSDB_CODE_SUCCESS) { stError("s-task:%s failed to sched to run, wait for next time", pTask->id.idStr); } diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index e4e8a37b37..edada9a050 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -1185,7 +1185,7 @@ int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq) { if (code != 0) { return code; } - return streamTrySchedExec(pTask); + return streamTrySchedExec(pTask, false); } void streamTaskSetRemoveBackendFiles(SStreamTask* pTask) { pTask->status.removeBackendFiles = true; }