diff --git a/include/common/tmsgcb.h b/include/common/tmsgcb.h index cdff5aaba2..4af223b537 100644 --- a/include/common/tmsgcb.h +++ b/include/common/tmsgcb.h @@ -40,6 +40,7 @@ typedef enum { ARB_QUEUE, STREAM_CTRL_QUEUE, STREAM_LONG_EXEC_QUEUE, + STREAM_CHKPT_QUEUE, QUEUE_MAX, } EQueueType; diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 13a26910c1..2b367939ff 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -355,6 +355,7 @@ TD_DEF_MSG_TYPE(TDMT_STREAM_DROP, "stream-drop", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_RETRIEVE_TRIGGER, "stream-retri-trigger", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_CONSEN_CHKPT, "stream-consen-chkpt", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_STREAM_CHKPT_EXEC, "stream-exec-chkpt", NULL, NULL) TD_CLOSE_MSG_SEG(TDMT_STREAM_MSG) TD_NEW_MSG_SEG(TDMT_MON_MSG) //5 << 8 diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 372322c0b8..bdb8bde2da 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -702,8 +702,8 @@ int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, int32_t clearRelHalt); int32_t streamExecTask(SStreamTask* pTask); int32_t streamResumeTask(SStreamTask* pTask); -int32_t streamTrySchedExec(SStreamTask* pTask); -int32_t streamTaskSchedTask(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int32_t taskId, int32_t execType); +int32_t streamTrySchedExec(SStreamTask* pTask, bool chkptExec); +int32_t streamTaskSchedTask(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int32_t taskId, int32_t execType, bool chkptExec); void streamTaskResumeInFuture(SStreamTask* pTask); void streamTaskClearSchedIdleInfo(SStreamTask* pTask); void streamTaskSetIdleInfo(SStreamTask* pTask, int32_t idleTime); diff --git a/source/dnode/mgmt/mgmt_snode/inc/smInt.h b/source/dnode/mgmt/mgmt_snode/inc/smInt.h index 9d519e88f0..0df4b1c58a 100644 --- a/source/dnode/mgmt/mgmt_snode/inc/smInt.h +++ b/source/dnode/mgmt/mgmt_snode/inc/smInt.h @@ -47,7 +47,7 @@ int32_t smPutMsgToQueue(SSnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pMsg); int32_t smPutNodeMsgToMgmtQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t smPutNodeMsgToWriteQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t smPutNodeMsgToStreamQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg); -void sndEnqueueStreamDispatch(SSnode *pSnode, SRpcMsg *pMsg); +int32_t smPutNodeMsgToChkptQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/mgmt_snode/src/smHandle.c b/source/dnode/mgmt/mgmt_snode/src/smHandle.c index 024e2e4e99..11710d7b39 100644 --- a/source/dnode/mgmt/mgmt_snode/src/smHandle.c +++ b/source/dnode/mgmt/mgmt_snode/src/smHandle.c @@ -102,6 +102,8 @@ SArray *smGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_REPORT_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_CHKPT_EXEC, smPutNodeMsgToStreamQueue, 0) == NULL) goto _OVER; + code = 0; _OVER: if (code != 0) { diff --git a/source/dnode/mgmt/mgmt_snode/src/smWorker.c b/source/dnode/mgmt/mgmt_snode/src/smWorker.c index 1e882fc656..1255542454 100644 --- a/source/dnode/mgmt/mgmt_snode/src/smWorker.c +++ b/source/dnode/mgmt/mgmt_snode/src/smWorker.c @@ -162,6 +162,9 @@ int32_t smPutMsgToQueue(SSnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) { case WRITE_QUEUE: code = smPutNodeMsgToWriteQueue(pMgmt, pMsg); break; + case STREAM_CHKPT_QUEUE: + code = smPutNodeMsgToStreamQueue(pMgmt, pMsg); + break; default: code = TSDB_CODE_INVALID_PARA; rpcFreeCont(pMsg->pCont); @@ -172,7 +175,6 @@ int32_t smPutMsgToQueue(SSnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) { } int32_t smPutNodeMsgToMgmtQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg) { - int32_t code = 0; SMultiWorker *pWorker = taosArrayGetP(pMgmt->writeWroker, 0); if (pWorker == NULL) { return TSDB_CODE_INVALID_MSG; @@ -198,3 +200,10 @@ int32_t smPutNodeMsgToStreamQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg) { dTrace("msg:%p, put into worker %s", pMsg, pWorker->name); return taosWriteQitem(pWorker->queue, pMsg); } + +//int32_t smPutNodeMsgToChkptQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg) { +// SSingleWorker *pWorker = &pMgmt->chkptWorker; +// +// dTrace("msg:%p, put into worker %s", pMsg, pWorker->name); +// return taosWriteQitem(pWorker->queue, pMsg); +//} diff --git a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h index e33730130d..cb41bc0ea7 100644 --- a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h +++ b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h @@ -34,6 +34,7 @@ typedef struct SVnodeMgmt { SAutoQWorkerPool streamPool; SAutoQWorkerPool streamLongExecPool; SWWorkerPool streamCtrlPool; + SWWorkerPool streamChkPool; SWWorkerPool fetchPool; SSingleWorker mgmtWorker; SSingleWorker mgmtMultiWorker; @@ -77,6 +78,7 @@ typedef struct { STaosQueue *pStreamQ; STaosQueue *pStreamCtrlQ; STaosQueue *pStreamLongExecQ; + STaosQueue *pStreamChkQ; STaosQueue *pFetchQ; STaosQueue *pMultiMgmQ; } SVnodeObj; @@ -141,6 +143,7 @@ int32_t vmPutMsgToStreamQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmPutMsgToStreamCtrlQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmPutMsgToStreamLongExecQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); +int32_t vmPutMsgToStreamChkQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmPutMsgToMergeQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmPutMsgToMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmPutMsgToMultiMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index fc8ff3133a..8a18727b99 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -1022,6 +1022,7 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_REPORT_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_SCAN_HISTORY, vmPutMsgToStreamLongExecQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_CHKPT_EXEC, vmPutMsgToStreamChkQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_GET_STREAM_PROGRESS, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c index 8871cd575f..5da3b2ce9a 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c @@ -407,6 +407,9 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal, pVnode->pStreamLongExecQ, taosQueueItemSize(pVnode->pStreamLongExecQ)); while (!taosQueueEmpty(pVnode->pStreamLongExecQ)) taosMsleep(50); + dInfo("vgId:%d, wait for vnode stream chkpt queue:%p is empty", pVnode->vgId, pVnode->pStreamChkQ); + while (!taosQueueEmpty(pVnode->pStreamChkQ)) taosMsleep(10); + dInfo("vgId:%d, all vnode queues is empty", pVnode->vgId); dInfo("vgId:%d, post close", pVnode->vgId); diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index 5acd06bbda..41f3c64e7d 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -165,6 +165,34 @@ static void vmProcessStreamCtrlQueue(SQueueInfo *pInfo, STaosQall* pQall, int32_ } } +static void vmProcessStreamChkptQueue(SQueueInfo *pInfo, STaosQall* pQall, int32_t numOfItems) { + SVnodeObj *pVnode = pInfo->ahandle; + void *pItem = NULL; + int32_t code = 0; + + while (1) { + if (taosGetQitem(pQall, &pItem) == 0) { + break; + } + + SRpcMsg *pMsg = pItem; + const STraceId *trace = &pMsg->info.traceId; + + dGTrace("vgId:%d, msg:%p get from vnode-stream-chkpt queue", pVnode->vgId, pMsg); + code = vnodeProcessStreamChkptMsg(pVnode->pImpl, pMsg, pInfo); + if (code != 0) { + terrno = code; + dGError("vgId:%d, msg:%p failed to process stream chkpt msg %s since %s", pVnode->vgId, pMsg, + TMSG_INFO(pMsg->msgType), tstrerror(code)); + vmSendRsp(pMsg, code); + } + + dGTrace("vgId:%d, msg:%p is freed, code:0x%x", pVnode->vgId, pMsg, code); + rpcFreeCont(pMsg->pCont); + taosFreeQitem(pMsg); + } +} + static void vmProcessStreamLongExecQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { SVnodeObj *pVnode = pInfo->ahandle; const STraceId *trace = &pMsg->info.traceId; @@ -301,6 +329,10 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp dGTrace("vgId:%d, msg:%p put into vnode-stream-long-exec queue", pVnode->vgId, pMsg); code = taosWriteQitem(pVnode->pStreamLongExecQ, pMsg); break; + case STREAM_CHKPT_QUEUE: + dGTrace("vgId:%d, msg:%p put into vnode-stream-chkpt queue", pVnode->vgId, pMsg); + code = taosWriteQitem(pVnode->pStreamChkQ, pMsg); + break; case FETCH_QUEUE: dGTrace("vgId:%d, msg:%p put into vnode-fetch queue", pVnode->vgId, pMsg); code = taosWriteQitem(pVnode->pFetchQ, pMsg); @@ -361,6 +393,8 @@ int32_t vmPutMsgToStreamCtrlQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmP int32_t vmPutMsgToStreamLongExecQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsgToQueue(pMgmt, pMsg, STREAM_LONG_EXEC_QUEUE); } +int32_t vmPutMsgToStreamChkQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsgToQueue(pMgmt, pMsg, STREAM_CHKPT_QUEUE); } + int32_t vmPutMsgToMultiMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { const STraceId *trace = &pMsg->info.traceId; dGTrace("msg:%p, put into vnode-multi-mgmt queue", pMsg); @@ -439,6 +473,8 @@ int32_t vmGetQueueSize(SVnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype) { case STREAM_LONG_EXEC_QUEUE: size = taosQueueItemSize(pVnode->pStreamLongExecQ); break; + case STREAM_CHKPT_QUEUE: + size = taosQueueItemSize(pVnode->pStreamChkQ); default: break; } @@ -487,10 +523,11 @@ int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { pVnode->pStreamQ = tAutoQWorkerAllocQueue(&pMgmt->streamPool, pVnode, (FItem)vmProcessStreamQueue, 2); pVnode->pStreamCtrlQ = tWWorkerAllocQueue(&pMgmt->streamCtrlPool, pVnode, (FItems)vmProcessStreamCtrlQueue); pVnode->pStreamLongExecQ = tAutoQWorkerAllocQueue(&pMgmt->streamLongExecPool, pVnode, (FItem)vmProcessStreamLongExecQueue, 1); + pVnode->pStreamChkQ = tWWorkerAllocQueue(&pMgmt->streamChkPool, pVnode, (FItems)vmProcessStreamChkptQueue); if (pVnode->pWriteW.queue == NULL || pVnode->pSyncW.queue == NULL || pVnode->pSyncRdW.queue == NULL || pVnode->pApplyW.queue == NULL || pVnode->pQueryQ == NULL || pVnode->pStreamQ == NULL || pVnode->pFetchQ == NULL - || pVnode->pStreamCtrlQ == NULL || pVnode->pStreamLongExecQ == NULL) { + || pVnode->pStreamCtrlQ == NULL || pVnode->pStreamLongExecQ == NULL || pVnode->pStreamChkQ == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } @@ -509,6 +546,8 @@ int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { dInfo("vgId:%d, stream-long-exec-queue:%p is alloced", pVnode->vgId, pVnode->pStreamLongExecQ); dInfo("vgId:%d, stream-ctrl-queue:%p is alloced, thread:%08" PRId64, pVnode->vgId, pVnode->pStreamCtrlQ, taosQueueGetThreadId(pVnode->pStreamCtrlQ)); + dInfo("vgId:%d, stream-chk-queue:%p is alloced, thread:%08" PRId64, pVnode->vgId, pVnode->pStreamChkQ, + taosQueueGetThreadId(pVnode->pStreamChkQ)); return 0; } @@ -517,6 +556,7 @@ void vmFreeQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { tAutoQWorkerFreeQueue(&pMgmt->streamPool, pVnode->pStreamQ); tAutoQWorkerFreeQueue(&pMgmt->streamLongExecPool, pVnode->pStreamLongExecQ); tWWorkerFreeQueue(&pMgmt->streamCtrlPool, pVnode->pStreamCtrlQ); + tWWorkerFreeQueue(&pMgmt->streamChkPool, pVnode->pStreamChkQ); tWWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ); pVnode->pQueryQ = NULL; pVnode->pFetchQ = NULL; @@ -525,6 +565,8 @@ void vmFreeQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { pVnode->pStreamCtrlQ = NULL; pVnode->pStreamLongExecQ = NULL; + pVnode->pStreamChkQ = NULL; + pVnode->pFetchQ = NULL; dDebug("vgId:%d, queue is freed", pVnode->vgId); } @@ -554,6 +596,11 @@ int32_t vmStartWorker(SVnodeMgmt *pMgmt) { pStreamCtrlPool->max = 1; if ((code = tWWorkerInit(pStreamCtrlPool)) != 0) return code; + SWWorkerPool *pStreamChkPool = &pMgmt->streamChkPool; + pStreamChkPool->name = "vnode-stream-chkpt"; + pStreamChkPool->max = 1; + if ((code = tWWorkerInit(pStreamChkPool)) != 0) return code; + SWWorkerPool *pFPool = &pMgmt->fetchPool; pFPool->name = "vnode-fetch"; pFPool->max = tsNumOfVnodeFetchThreads; @@ -587,6 +634,7 @@ void vmStopWorker(SVnodeMgmt *pMgmt) { tAutoQWorkerCleanup(&pMgmt->streamPool); tAutoQWorkerCleanup(&pMgmt->streamLongExecPool); tWWorkerCleanup(&pMgmt->streamCtrlPool); + tWWorkerCleanup(&pMgmt->streamChkPool); tWWorkerCleanup(&pMgmt->fetchPool); dDebug("vnode workers are closed"); } diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 851bf25665..dcdc70da68 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -92,7 +92,7 @@ FAIL: } int32_t sndInit(SSnode *pSnode) { - if (streamTaskSchedTask(&pSnode->msgCb, pSnode->pMeta->vgId, 0, 0, STREAM_EXEC_T_START_ALL_TASKS) != 0) { + if (streamTaskSchedTask(&pSnode->msgCb, pSnode->pMeta->vgId, 0, 0, STREAM_EXEC_T_START_ALL_TASKS, false) != 0) { sndError("failed to start all tasks"); } return 0; @@ -138,6 +138,8 @@ int32_t sndProcessStreamMsg(SSnode *pSnode, SRpcMsg *pMsg) { return tqStreamTaskProcessRetrieveTriggerReq(pSnode->pMeta, pMsg); case TDMT_STREAM_RETRIEVE_TRIGGER_RSP: return tqStreamTaskProcessRetrieveTriggerRsp(pSnode->pMeta, pMsg); + case TDMT_STREAM_CHKPT_EXEC: + return tqStreamTaskProcessRunReq(pSnode->pMeta, pMsg, true); default: sndError("invalid snode msg:%d", pMsg->msgType); return TSDB_CODE_INVALID_MSG; diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index d224f9a411..a85dca05d5 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -114,6 +114,7 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo); int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo); int32_t vnodeProcessStreamCtrlMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo); int32_t vnodeProcessStreamLongExecMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo); +int32_t vnodeProcessStreamChkptMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo); void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs); void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs); void vnodeProposeCommitOnNeed(SVnode *pVnode, bool atExit); diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index b9cbb33746..6a6b3574db 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -165,7 +165,7 @@ static void doStartScanWal(void* param, void* tmrId) { taosMsleep(10000); #endif - code = streamTaskSchedTask(&pParam->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA); + code = streamTaskSchedTask(&pParam->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA, false); if (code) { tqError("vgId:%d failed sched task to scan wal, code:%s", vgId, tstrerror(code)); } @@ -216,7 +216,7 @@ void tqScanWalAsync(STQ* pTq) { } int32_t tqStopStreamAllTasksAsync(SStreamMeta* pMeta, SMsgCb* pMsgCb) { - return streamTaskSchedTask(pMsgCb, pMeta->vgId, 0, 0, STREAM_EXEC_T_STOP_ALL_TASKS); + return streamTaskSchedTask(pMsgCb, pMeta->vgId, 0, 0, STREAM_EXEC_T_STOP_ALL_TASKS, false); } int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId) { @@ -322,7 +322,7 @@ bool taskReadyForDataFromWal(SStreamTask* pTask) { // check whether input queue is full or not if (streamQueueIsFull(pTask->inputq.queue)) { tqTrace("s-task:%s input queue is full, launch task without scanning wal", pTask->id.idStr); - int32_t code = streamTrySchedExec(pTask); + int32_t code = streamTrySchedExec(pTask, false); if (code) { tqError("s-task:%s failed to start task while inputQ is full", pTask->id.idStr); } @@ -461,7 +461,7 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, int32_t* pNumOfTasks) { streamMutexUnlock(&pTask->lock); if ((numOfItems > 0) || hasNewData) { - code = streamTrySchedExec(pTask); + code = streamTrySchedExec(pTask, false); if (code != TSDB_CODE_SUCCESS) { streamMetaReleaseTask(pStreamMeta, pTask); taosArrayDestroy(pTaskList); diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 4295969c85..64072bbda1 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -131,7 +131,7 @@ int32_t tqStreamTaskStartAsync(SStreamMeta* pMeta, SMsgCb* cb, bool restart) { tqDebug("vgId:%d start all %d stream task(s) async", vgId, numOfTasks); int32_t type = restart ? STREAM_EXEC_T_RESTART_ALL_TASKS : STREAM_EXEC_T_START_ALL_TASKS; - return streamTaskSchedTask(cb, vgId, 0, 0, type); + return streamTaskSchedTask(cb, vgId, 0, 0, type, false); } int32_t tqStreamStartOneTaskAsync(SStreamMeta* pMeta, SMsgCb* cb, int64_t streamId, int32_t taskId) { @@ -143,7 +143,7 @@ int32_t tqStreamStartOneTaskAsync(SStreamMeta* pMeta, SMsgCb* cb, int64_t stream } tqDebug("vgId:%d start task:0x%x async", vgId, taskId); - return streamTaskSchedTask(cb, vgId, streamId, taskId, STREAM_EXEC_T_START_ONE_TASK); + return streamTaskSchedTask(cb, vgId, streamId, taskId, STREAM_EXEC_T_START_ONE_TASK, false); } // this is to process request from transaction, always return true. @@ -1222,7 +1222,7 @@ static int32_t tqProcessTaskResumeImpl(void* handle, SStreamTask* pTask, int64_t } else if (level == TASK_LEVEL__SOURCE && (streamQueueGetNumOfItems(pTask->inputq.queue) == 0)) { // code = tqScanWalAsync((STQ*)handle, false); } else { - code = streamTrySchedExec(pTask); + code = streamTrySchedExec(pTask, false); } } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 22d9c2657d..ea4ca896ab 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -940,6 +940,8 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { vTrace("vgId:%d, msg:%p in stream queue is processing", pVnode->config.vgId, pMsg); + + // todo: NOTE: some command needs to run on follower, such as, stop_all_tasks if ((pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_VND_TABLE_META || pMsg->msgType == TDMT_VND_TABLE_CFG || pMsg->msgType == TDMT_VND_BATCH_META) && !syncIsReadyForRead(pVnode->sync)) { @@ -1016,6 +1018,24 @@ int32_t vnodeProcessStreamLongExecMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo } } +int32_t vnodeProcessStreamChkptMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { + vTrace("vgId:%d, msg:%p in stream chkpt queue is processing", pVnode->config.vgId, pMsg); + if ((pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_VND_TABLE_META || pMsg->msgType == TDMT_VND_TABLE_CFG || + pMsg->msgType == TDMT_VND_BATCH_META) && + !syncIsReadyForRead(pVnode->sync)) { + vnodeRedirectRpcMsg(pVnode, pMsg, terrno); + return 0; + } + + switch (pMsg->msgType) { + case TDMT_STREAM_CHKPT_EXEC: + return tqProcessTaskRunReq(pVnode->pTq, pMsg); + default: + vError("unknown msg type:%d in stream chkpt queue", pMsg->msgType); + return TSDB_CODE_APP_ERROR; + } +} + void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) { int32_t code = tdProcessTSmaInsert(((SVnode *)pVnode)->pSma, smaId, (const char *)data); if (code) { diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 2a06686a31..317fd3307e 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -597,7 +597,7 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) streamMetaWUnLock(pMeta); tqInfo("vgId:%d stream task already loaded, start them", vgId); - int32_t code = streamTaskSchedTask(&pVnode->msgCb, TD_VID(pVnode), 0, 0, STREAM_EXEC_T_START_ALL_TASKS); + int32_t code = streamTaskSchedTask(&pVnode->msgCb, TD_VID(pVnode), 0, 0, STREAM_EXEC_T_START_ALL_TASKS, false); if (code != 0) { tqError("vgId:%d failed to sched stream task, code:%s", vgId, tstrerror(code)); } diff --git a/source/libs/stream/src/streamCheckStatus.c b/source/libs/stream/src/streamCheckStatus.c index ebb13654b7..29da3fc88e 100644 --- a/source/libs/stream/src/streamCheckStatus.c +++ b/source/libs/stream/src/streamCheckStatus.c @@ -717,7 +717,7 @@ void handleNotReadyDownstreamTask(SStreamTask* pTask, SArray* pNotReadyList) { // The restart of all tasks requires that all tasks should not have active timer for now. Therefore, the execution // of restart in timer thread will result in a deadlock. int32_t addDownstreamFailedStatusResultAsync(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int32_t taskId) { - return streamTaskSchedTask(pMsgCb, vgId, streamId, taskId, STREAM_EXEC_T_ADD_FAILED_TASK); + return streamTaskSchedTask(pMsgCb, vgId, streamId, taskId, STREAM_EXEC_T_ADD_FAILED_TASK, false); } static void doCleanup(SStreamTask* pTask, SArray* pNotReadyList, SArray* pTimeoutList, void* param) { diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index f07d6f4cc1..afda288b90 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -93,7 +93,7 @@ int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpointType, i return TSDB_CODE_OUT_OF_MEMORY; } - return streamTrySchedExec(pTask); + return streamTrySchedExec(pTask, true); } int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq) { diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 24ac193937..f08b4a7922 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -1834,6 +1834,7 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S int32_t status = 0; SStreamMeta* pMeta = pTask->pMeta; const char* id = pTask->id.idStr; + bool chkptMsg = false; stDebug("s-task:%s receive dispatch msg from taskId:0x%x(vgId:%d), msgLen:%" PRId64 ", msgId:%d", id, pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->totalLen, pReq->msgId); @@ -1863,6 +1864,7 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S // blocked. Note that there is no race condition here. if (pReq->type == STREAM_INPUT__CHECKPOINT_TRIGGER) { streamTaskCloseUpstreamInput(pTask, pReq->upstreamTaskId); + chkptMsg = true; stDebug("s-task:%s close inputQ for upstream:0x%x, msgId:%d", id, pReq->upstreamTaskId, pReq->msgId); } else if (pReq->type == STREAM_INPUT__TRANS_STATE) { stDebug("s-task:%s recv trans-state msgId:%d from upstream:0x%x", id, pReq->msgId, pReq->upstreamTaskId); @@ -1890,5 +1892,5 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S tmsgSendRsp(pRsp); } - return streamTrySchedExec(pTask); + return streamTrySchedExec(pTask, chkptMsg); } diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 7c7ef83d6d..5a507358ee 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -144,11 +144,20 @@ void streamQueueNextItemInSourceQ(SStreamQueue* pQueue, SStreamQueueItem** pItem // let's try the ordinary input q pQueue->qItem = NULL; - int32_t num = taosGetQitem(pQueue->qall, &pQueue->qItem); + int32_t code = taosGetQitem(pQueue->qall, &pQueue->qItem); + if (code) { + stError("s-task:%s failed to extract data from inputQ, code:%s", id, tstrerror(code)); + } if (pQueue->qItem == NULL) { - num = taosReadAllQitems(pQueue->pQueue, pQueue->qall); - num = taosGetQitem(pQueue->qall, &pQueue->qItem); + code = taosReadAllQitems(pQueue->pQueue, pQueue->qall); + if (code) { + stError("s-task:%s failed to read qitem into qall, code:%s", id, tstrerror(code)); + } + code = taosGetQitem(pQueue->qall, &pQueue->qItem); + if (code) { + stError("s-task:%s failed to extract data from inputQ(qall), code:%s", id, tstrerror(code)); + } } *pItem = streamQueueCurItem(pQueue); diff --git a/source/libs/stream/src/streamSched.c b/source/libs/stream/src/streamSched.c index 1f76f349ae..367e54d1a1 100644 --- a/source/libs/stream/src/streamSched.c +++ b/source/libs/stream/src/streamSched.c @@ -93,17 +93,21 @@ void streamSetupScheduleTrigger(SStreamTask* pTask) { } } -int32_t streamTrySchedExec(SStreamTask* pTask) { +int32_t streamTrySchedExec(SStreamTask* pTask, bool chkptQueue) { if (streamTaskSetSchedStatusWait(pTask)) { - return streamTaskSchedTask(pTask->pMsgCb, pTask->info.nodeId, pTask->id.streamId, pTask->id.taskId, 0); + return streamTaskSchedTask(pTask->pMsgCb, pTask->info.nodeId, pTask->id.streamId, pTask->id.taskId, 0, chkptQueue); } else { - stTrace("s-task:%s not launch task since sched status:%d", pTask->id.idStr, pTask->status.schedStatus); + if (chkptQueue) { + stWarn("s-task:%s not launch task in chkpt queue, may delay checkpoint procedure", pTask->id.idStr); + } else { + stTrace("s-task:%s not launch task since sched status:%d", pTask->id.idStr, pTask->status.schedStatus); + } } return 0; } -int32_t streamTaskSchedTask(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int32_t taskId, int32_t execType) { +int32_t streamTaskSchedTask(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int32_t taskId, int32_t execType, bool chkptExec) { int32_t code = 0; int32_t tlen = 0; @@ -142,10 +146,18 @@ int32_t streamTaskSchedTask(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int3 stDebug("vgId:%d create msg to exec, type:%d, %s", vgId, execType, streamTaskGetExecType(execType)); } - SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = buf, .contLen = tlen + sizeof(SMsgHead)}; - code = tmsgPutToQueue(pMsgCb, STREAM_QUEUE, &msg); - if (code) { - stError("vgId:%d failed to put msg into stream queue, code:%s, %x", vgId, tstrerror(code), taskId); + if (chkptExec) { + SRpcMsg msg = {.msgType = TDMT_STREAM_CHKPT_EXEC, .pCont = buf, .contLen = tlen + sizeof(SMsgHead)}; + code = tmsgPutToQueue(pMsgCb, STREAM_CHKPT_QUEUE, &msg); + if (code) { + stError("vgId:%d failed to put msg into stream chkpt queue, code:%s, %x", vgId, tstrerror(code), taskId); + } + } else { + SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = buf, .contLen = tlen + sizeof(SMsgHead)}; + code = tmsgPutToQueue(pMsgCb, STREAM_QUEUE, &msg); + if (code) { + stError("vgId:%d failed to put msg into stream queue, code:%s, %x", vgId, tstrerror(code), taskId); + } } return code; } @@ -191,12 +203,17 @@ void streamTaskResumeHelper(void* param, void* tmrId) { return; } - code = streamTaskSchedTask(pTask->pMsgCb, pTask->info.nodeId, pId->streamId, pId->taskId, STREAM_EXEC_T_RESUME_TASK); + code = streamTaskSchedTask(pTask->pMsgCb, pTask->info.nodeId, pId->streamId, pId->taskId, STREAM_EXEC_T_RESUME_TASK, + (p.state == TASK_STATUS__CK)); if (code) { stError("s-task:%s sched task failed, code:%s", pId->idStr, tstrerror(code)); } else { - stDebug("trigger to resume s-task:%s after idled for %dms", pId->idStr, pTask->status.schedIdleTime); - + if (p.state == TASK_STATUS__CK) { + stDebug("trigger to resume s-task:%s in stream chkpt queue after idled for %dms", pId->idStr, + pTask->status.schedIdleTime); + } else { + stDebug("trigger to resume s-task:%s after idled for %dms", pId->idStr, pTask->status.schedIdleTime); + } // release the task ref count streamTaskClearSchedIdleInfo(pTask); } @@ -339,7 +356,7 @@ void streamTaskSchedHelper(void* param, void* tmrId) { } } - code = streamTrySchedExec(pTask); + code = streamTrySchedExec(pTask, false); if (code != TSDB_CODE_SUCCESS) { stError("s-task:%s failed to sched to run, wait for next time", pTask->id.idStr); } diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 378aaa27d0..f7ae4915ec 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -1194,7 +1194,7 @@ int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq) { if (code != 0) { return code; } - return streamTrySchedExec(pTask); + return streamTrySchedExec(pTask, false); } void streamTaskSetRemoveBackendFiles(SStreamTask* pTask) { pTask->status.removeBackendFiles = true; } diff --git a/tests/system-test/8-stream/checkpoint_info2.py b/tests/system-test/8-stream/checkpoint_info2.py index 3dc57477f7..f4c8da8c9d 100644 --- a/tests/system-test/8-stream/checkpoint_info2.py +++ b/tests/system-test/8-stream/checkpoint_info2.py @@ -22,6 +22,8 @@ from util.cluster import * # should be used by -N option class TDTestCase: updatecfgDict = {'checkpointInterval': 60 , + 'vdebugflag':143, + 'ddebugflag':143 } def init(self, conn, logSql, replicaVar=1): self.replicaVar = int(replicaVar)