refactor(stream): add checkpoint queue to handle chkpt.

This commit is contained in:
Haojun Liao 2025-02-11 00:57:18 +08:00
parent 5f9ce8826d
commit 6766e6db53
18 changed files with 119 additions and 28 deletions

View File

@ -39,6 +39,7 @@ typedef enum {
ARB_QUEUE,
STREAM_CTRL_QUEUE,
STREAM_LONG_EXEC_QUEUE,
STREAM_CHKPT_QUEUE,
QUEUE_MAX,
} EQueueType;

View File

@ -354,6 +354,7 @@
TD_DEF_MSG_TYPE(TDMT_STREAM_DROP, "stream-drop", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_RETRIEVE_TRIGGER, "stream-retri-trigger", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_CONSEN_CHKPT, "stream-consen-chkpt", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_CHKPT_EXEC, "stream-exec-chkpt", NULL, NULL)
TD_CLOSE_MSG_SEG(TDMT_STREAM_MSG)
TD_NEW_MSG_SEG(TDMT_MON_MSG) //5 << 8

View File

@ -684,8 +684,8 @@ int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, int32_t clearRelHalt);
int32_t streamExecTask(SStreamTask* pTask);
int32_t streamResumeTask(SStreamTask* pTask);
int32_t streamTrySchedExec(SStreamTask* pTask);
int32_t streamTaskSchedTask(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int32_t taskId, int32_t execType);
int32_t streamTrySchedExec(SStreamTask* pTask, bool chkptExec);
int32_t streamTaskSchedTask(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int32_t taskId, int32_t execType, bool chkptExec);
void streamTaskResumeInFuture(SStreamTask* pTask);
void streamTaskClearSchedIdleInfo(SStreamTask* pTask);
void streamTaskSetIdleInfo(SStreamTask* pTask, int32_t idleTime);

View File

@ -34,6 +34,7 @@ typedef struct SVnodeMgmt {
SAutoQWorkerPool streamPool;
SAutoQWorkerPool streamLongExecPool;
SWWorkerPool streamCtrlPool;
SWWorkerPool streamChkPool;
SWWorkerPool fetchPool;
SSingleWorker mgmtWorker;
SSingleWorker mgmtMultiWorker;
@ -77,6 +78,7 @@ typedef struct {
STaosQueue *pStreamQ;
STaosQueue *pStreamCtrlQ;
STaosQueue *pStreamLongExecQ;
STaosQueue *pStreamChkQ;
STaosQueue *pFetchQ;
STaosQueue *pMultiMgmQ;
} SVnodeObj;
@ -141,6 +143,7 @@ int32_t vmPutMsgToStreamQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmPutMsgToStreamCtrlQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmPutMsgToStreamLongExecQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmPutMsgToStreamChkQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmPutMsgToMergeQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmPutMsgToMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmPutMsgToMultiMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);

View File

@ -1019,6 +1019,7 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_REPORT_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_SCAN_HISTORY, vmPutMsgToStreamLongExecQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_CHKPT_EXEC, vmPutMsgToStreamChkQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_GET_STREAM_PROGRESS, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;

View File

@ -407,6 +407,9 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal,
pVnode->pStreamLongExecQ, taosQueueItemSize(pVnode->pStreamLongExecQ));
while (!taosQueueEmpty(pVnode->pStreamLongExecQ)) taosMsleep(50);
dInfo("vgId:%d, wait for vnode stream chkpt queue:%p is empty", pVnode->vgId, pVnode->pStreamChkQ);
while (!taosQueueEmpty(pVnode->pStreamChkQ)) taosMsleep(10);
dInfo("vgId:%d, all vnode queues is empty", pVnode->vgId);
dInfo("vgId:%d, post close", pVnode->vgId);

View File

@ -165,6 +165,34 @@ static void vmProcessStreamCtrlQueue(SQueueInfo *pInfo, STaosQall* pQall, int32_
}
}
static void vmProcessStreamChkptQueue(SQueueInfo *pInfo, STaosQall* pQall, int32_t numOfItems) {
SVnodeObj *pVnode = pInfo->ahandle;
void *pItem = NULL;
int32_t code = 0;
while (1) {
if (taosGetQitem(pQall, &pItem) == 0) {
break;
}
SRpcMsg *pMsg = pItem;
const STraceId *trace = &pMsg->info.traceId;
dGTrace("vgId:%d, msg:%p get from vnode-stream-chkpt queue", pVnode->vgId, pMsg);
code = vnodeProcessStreamChkptMsg(pVnode->pImpl, pMsg, pInfo);
if (code != 0) {
terrno = code;
dGError("vgId:%d, msg:%p failed to process stream chkpt msg %s since %s", pVnode->vgId, pMsg,
TMSG_INFO(pMsg->msgType), tstrerror(code));
vmSendRsp(pMsg, code);
}
dGTrace("vgId:%d, msg:%p is freed, code:0x%x", pVnode->vgId, pMsg, code);
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
}
}
static void vmProcessStreamLongExecQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
SVnodeObj *pVnode = pInfo->ahandle;
const STraceId *trace = &pMsg->info.traceId;
@ -301,6 +329,10 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp
dGTrace("vgId:%d, msg:%p put into vnode-stream-long-exec queue", pVnode->vgId, pMsg);
code = taosWriteQitem(pVnode->pStreamLongExecQ, pMsg);
break;
case STREAM_CHKPT_QUEUE:
dGTrace("vgId:%d, msg:%p put into vnode-stream-chkpt queue", pVnode->vgId, pMsg);
code = taosWriteQitem(pVnode->pStreamChkQ, pMsg);
break;
case FETCH_QUEUE:
dGTrace("vgId:%d, msg:%p put into vnode-fetch queue", pVnode->vgId, pMsg);
code = taosWriteQitem(pVnode->pFetchQ, pMsg);
@ -361,6 +393,8 @@ int32_t vmPutMsgToStreamCtrlQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmP
int32_t vmPutMsgToStreamLongExecQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsgToQueue(pMgmt, pMsg, STREAM_LONG_EXEC_QUEUE); }
int32_t vmPutMsgToStreamChkQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsgToQueue(pMgmt, pMsg, STREAM_CHKPT_QUEUE); }
int32_t vmPutMsgToMultiMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
const STraceId *trace = &pMsg->info.traceId;
dGTrace("msg:%p, put into vnode-multi-mgmt queue", pMsg);
@ -439,6 +473,8 @@ int32_t vmGetQueueSize(SVnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype) {
case STREAM_LONG_EXEC_QUEUE:
size = taosQueueItemSize(pVnode->pStreamLongExecQ);
break;
case STREAM_CHKPT_QUEUE:
size = taosQueueItemSize(pVnode->pStreamChkQ);
default:
break;
}
@ -487,10 +523,11 @@ int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
pVnode->pStreamQ = tAutoQWorkerAllocQueue(&pMgmt->streamPool, pVnode, (FItem)vmProcessStreamQueue, 2);
pVnode->pStreamCtrlQ = tWWorkerAllocQueue(&pMgmt->streamCtrlPool, pVnode, (FItems)vmProcessStreamCtrlQueue);
pVnode->pStreamLongExecQ = tAutoQWorkerAllocQueue(&pMgmt->streamLongExecPool, pVnode, (FItem)vmProcessStreamLongExecQueue, 1);
pVnode->pStreamChkQ = tWWorkerAllocQueue(&pMgmt->streamChkPool, pVnode, (FItems)vmProcessStreamChkptQueue);
if (pVnode->pWriteW.queue == NULL || pVnode->pSyncW.queue == NULL || pVnode->pSyncRdW.queue == NULL ||
pVnode->pApplyW.queue == NULL || pVnode->pQueryQ == NULL || pVnode->pStreamQ == NULL || pVnode->pFetchQ == NULL
|| pVnode->pStreamCtrlQ == NULL || pVnode->pStreamLongExecQ == NULL) {
|| pVnode->pStreamCtrlQ == NULL || pVnode->pStreamLongExecQ == NULL || pVnode->pStreamChkQ == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
@ -509,6 +546,8 @@ int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
dInfo("vgId:%d, stream-long-exec-queue:%p is alloced", pVnode->vgId, pVnode->pStreamLongExecQ);
dInfo("vgId:%d, stream-ctrl-queue:%p is alloced, thread:%08" PRId64, pVnode->vgId, pVnode->pStreamCtrlQ,
taosQueueGetThreadId(pVnode->pStreamCtrlQ));
dInfo("vgId:%d, stream-chk-queue:%p is alloced, thread:%08" PRId64, pVnode->vgId, pVnode->pStreamChkQ,
taosQueueGetThreadId(pVnode->pStreamChkQ));
return 0;
}
@ -517,6 +556,7 @@ void vmFreeQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
tAutoQWorkerFreeQueue(&pMgmt->streamPool, pVnode->pStreamQ);
tAutoQWorkerFreeQueue(&pMgmt->streamLongExecPool, pVnode->pStreamLongExecQ);
tWWorkerFreeQueue(&pMgmt->streamCtrlPool, pVnode->pStreamCtrlQ);
tWWorkerFreeQueue(&pMgmt->streamChkPool, pVnode->pStreamChkQ);
tWWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ);
pVnode->pQueryQ = NULL;
pVnode->pFetchQ = NULL;
@ -525,6 +565,8 @@ void vmFreeQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
pVnode->pStreamCtrlQ = NULL;
pVnode->pStreamLongExecQ = NULL;
pVnode->pStreamChkQ = NULL;
pVnode->pFetchQ = NULL;
dDebug("vgId:%d, queue is freed", pVnode->vgId);
}
@ -554,6 +596,11 @@ int32_t vmStartWorker(SVnodeMgmt *pMgmt) {
pStreamCtrlPool->max = 1;
if ((code = tWWorkerInit(pStreamCtrlPool)) != 0) return code;
SWWorkerPool *pStreamChkPool = &pMgmt->streamChkPool;
pStreamChkPool->name = "vnode-stream-chkpt";
pStreamChkPool->max = 1;
if ((code = tWWorkerInit(pStreamChkPool)) != 0) return code;
SWWorkerPool *pFPool = &pMgmt->fetchPool;
pFPool->name = "vnode-fetch";
pFPool->max = tsNumOfVnodeFetchThreads;
@ -587,6 +634,7 @@ void vmStopWorker(SVnodeMgmt *pMgmt) {
tAutoQWorkerCleanup(&pMgmt->streamPool);
tAutoQWorkerCleanup(&pMgmt->streamLongExecPool);
tWWorkerCleanup(&pMgmt->streamCtrlPool);
tWWorkerCleanup(&pMgmt->streamChkPool);
tWWorkerCleanup(&pMgmt->fetchPool);
dDebug("vnode workers are closed");
}

View File

@ -92,7 +92,7 @@ FAIL:
}
int32_t sndInit(SSnode *pSnode) {
if (streamTaskSchedTask(&pSnode->msgCb, pSnode->pMeta->vgId, 0, 0, STREAM_EXEC_T_START_ALL_TASKS) != 0) {
if (streamTaskSchedTask(&pSnode->msgCb, pSnode->pMeta->vgId, 0, 0, STREAM_EXEC_T_START_ALL_TASKS, false) != 0) {
sndError("failed to start all tasks");
}
return 0;

View File

@ -114,6 +114,7 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo);
int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo);
int32_t vnodeProcessStreamCtrlMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo);
int32_t vnodeProcessStreamLongExecMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo);
int32_t vnodeProcessStreamChkptMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo);
void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs);
void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs);
void vnodeProposeCommitOnNeed(SVnode *pVnode, bool atExit);

View File

@ -109,7 +109,7 @@ static void doStartScanWal(void* param, void* tmrId) {
taosMsleep(10000);
#endif
code = streamTaskSchedTask(&pParam->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA);
code = streamTaskSchedTask(&pParam->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA, false);
if (code) {
tqError("vgId:%d failed sched task to scan wal, code:%s", vgId, tstrerror(code));
}
@ -170,7 +170,7 @@ int32_t tqScanWalAsync(STQ* pTq, bool ckPause) {
int32_t tqStopStreamTasksAsync(STQ* pTq) {
SStreamMeta* pMeta = pTq->pStreamMeta;
int32_t vgId = pMeta->vgId;
return streamTaskSchedTask(&pTq->pVnode->msgCb, vgId, 0, 0, STREAM_EXEC_T_STOP_ALL_TASKS);
return streamTaskSchedTask(&pTq->pVnode->msgCb, vgId, 0, 0, STREAM_EXEC_T_STOP_ALL_TASKS, false);
}
int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId) {
@ -276,7 +276,7 @@ bool taskReadyForDataFromWal(SStreamTask* pTask) {
// check whether input queue is full or not
if (streamQueueIsFull(pTask->inputq.queue)) {
tqTrace("s-task:%s input queue is full, launch task without scanning wal", pTask->id.idStr);
streamTrySchedExec(pTask);
streamTrySchedExec(pTask, false);
return false;
}
@ -412,7 +412,7 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta) {
streamMutexUnlock(&pTask->lock);
if ((numOfItems > 0) || hasNewData) {
code = streamTrySchedExec(pTask);
code = streamTrySchedExec(pTask, false);
if (code != TSDB_CODE_SUCCESS) {
streamMetaReleaseTask(pStreamMeta, pTask);
taosArrayDestroy(pTaskList);
@ -465,7 +465,7 @@ int32_t doScanWalAsync(STQ* pTq, bool ckPause) {
tqDebug("vgId:%d create msg to start wal scan to launch stream tasks, numOfTasks:%d, vnd restored:%d", vgId,
numOfTasks, alreadyRestored);
return streamTaskSchedTask(&pTq->pVnode->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA);
return streamTaskSchedTask(&pTq->pVnode->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA, false);
}
void streamMetaFreeTQDuringScanWalError(STQ* pTq) {

View File

@ -123,7 +123,7 @@ int32_t tqStreamTaskStartAsync(SStreamMeta* pMeta, SMsgCb* cb, bool restart) {
tqDebug("vgId:%d start all %d stream task(s) async", vgId, numOfTasks);
int32_t type = restart ? STREAM_EXEC_T_RESTART_ALL_TASKS : STREAM_EXEC_T_START_ALL_TASKS;
return streamTaskSchedTask(cb, vgId, 0, 0, type);
return streamTaskSchedTask(cb, vgId, 0, 0, type, false);
}
int32_t tqStreamStartOneTaskAsync(SStreamMeta* pMeta, SMsgCb* cb, int64_t streamId, int32_t taskId) {
@ -135,7 +135,7 @@ int32_t tqStreamStartOneTaskAsync(SStreamMeta* pMeta, SMsgCb* cb, int64_t stream
}
tqDebug("vgId:%d start task:0x%x async", vgId, taskId);
return streamTaskSchedTask(cb, vgId, streamId, taskId, STREAM_EXEC_T_START_ONE_TASK);
return streamTaskSchedTask(cb, vgId, streamId, taskId, STREAM_EXEC_T_START_ONE_TASK, false);
}
// this is to process request from transaction, always return true.
@ -1177,7 +1177,7 @@ static int32_t tqProcessTaskResumeImpl(void* handle, SStreamTask* pTask, int64_t
} else if (level == TASK_LEVEL__SOURCE && (streamQueueGetNumOfItems(pTask->inputq.queue) == 0)) {
code = tqScanWalAsync((STQ*)handle, false);
} else {
code = streamTrySchedExec(pTask);
code = streamTrySchedExec(pTask, false);
}
}

View File

@ -1005,6 +1005,24 @@ int32_t vnodeProcessStreamLongExecMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo
}
}
int32_t vnodeProcessStreamChkptMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
vTrace("vgId:%d, msg:%p in stream chkpt queue is processing", pVnode->config.vgId, pMsg);
if ((pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_VND_TABLE_META || pMsg->msgType == TDMT_VND_TABLE_CFG ||
pMsg->msgType == TDMT_VND_BATCH_META) &&
!syncIsReadyForRead(pVnode->sync)) {
vnodeRedirectRpcMsg(pVnode, pMsg, terrno);
return 0;
}
switch (pMsg->msgType) {
case TDMT_STREAM_CHKPT_EXEC:
return tqProcessTaskRunReq(pVnode->pTq, pMsg);
default:
vError("unknown msg type:%d in stream chkpt queue", pMsg->msgType);
return TSDB_CODE_APP_ERROR;
}
}
void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) {
int32_t code = tdProcessTSmaInsert(((SVnode *)pVnode)->pSma, smaId, (const char *)data);
if (code) {

View File

@ -594,7 +594,7 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx)
streamMetaWUnLock(pMeta);
tqInfo("vgId:%d stream task already loaded, start them", vgId);
int32_t code = streamTaskSchedTask(&pVnode->msgCb, TD_VID(pVnode), 0, 0, STREAM_EXEC_T_START_ALL_TASKS);
int32_t code = streamTaskSchedTask(&pVnode->msgCb, TD_VID(pVnode), 0, 0, STREAM_EXEC_T_START_ALL_TASKS, false);
if (code != 0) {
tqError("vgId:%d failed to sched stream task, code:%s", vgId, tstrerror(code));
}

View File

@ -717,7 +717,7 @@ void handleNotReadyDownstreamTask(SStreamTask* pTask, SArray* pNotReadyList) {
// The restart of all tasks requires that all tasks should not have active timer for now. Therefore, the execution
// of restart in timer thread will result in a deadlock.
int32_t addDownstreamFailedStatusResultAsync(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int32_t taskId) {
return streamTaskSchedTask(pMsgCb, vgId, streamId, taskId, STREAM_EXEC_T_ADD_FAILED_TASK);
return streamTaskSchedTask(pMsgCb, vgId, streamId, taskId, STREAM_EXEC_T_ADD_FAILED_TASK, false);
}
static void doCleanup(SStreamTask* pTask, SArray* pNotReadyList, SArray* pTimeoutList, void* param) {

View File

@ -93,7 +93,7 @@ int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpointType, i
return TSDB_CODE_OUT_OF_MEMORY;
}
return streamTrySchedExec(pTask);
return streamTrySchedExec(pTask, true);
}
int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq) {

View File

@ -1834,6 +1834,7 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S
int32_t status = 0;
SStreamMeta* pMeta = pTask->pMeta;
const char* id = pTask->id.idStr;
bool chkptMsg = false;
stDebug("s-task:%s receive dispatch msg from taskId:0x%x(vgId:%d), msgLen:%" PRId64 ", msgId:%d", id,
pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->totalLen, pReq->msgId);
@ -1863,6 +1864,7 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S
// blocked. Note that there is no race condition here.
if (pReq->type == STREAM_INPUT__CHECKPOINT_TRIGGER) {
streamTaskCloseUpstreamInput(pTask, pReq->upstreamTaskId);
chkptMsg = true;
stDebug("s-task:%s close inputQ for upstream:0x%x, msgId:%d", id, pReq->upstreamTaskId, pReq->msgId);
} else if (pReq->type == STREAM_INPUT__TRANS_STATE) {
stDebug("s-task:%s recv trans-state msgId:%d from upstream:0x%x", id, pReq->msgId, pReq->upstreamTaskId);
@ -1890,5 +1892,5 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S
tmsgSendRsp(pRsp);
}
return streamTrySchedExec(pTask);
return streamTrySchedExec(pTask, chkptMsg);
}

View File

@ -93,9 +93,9 @@ void streamSetupScheduleTrigger(SStreamTask* pTask) {
}
}
int32_t streamTrySchedExec(SStreamTask* pTask) {
int32_t streamTrySchedExec(SStreamTask* pTask, bool chkptQueue) {
if (streamTaskSetSchedStatusWait(pTask)) {
return streamTaskSchedTask(pTask->pMsgCb, pTask->info.nodeId, pTask->id.streamId, pTask->id.taskId, 0);
return streamTaskSchedTask(pTask->pMsgCb, pTask->info.nodeId, pTask->id.streamId, pTask->id.taskId, 0, chkptQueue);
} else {
stTrace("s-task:%s not launch task since sched status:%d", pTask->id.idStr, pTask->status.schedStatus);
}
@ -103,7 +103,7 @@ int32_t streamTrySchedExec(SStreamTask* pTask) {
return 0;
}
int32_t streamTaskSchedTask(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int32_t taskId, int32_t execType) {
int32_t streamTaskSchedTask(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int32_t taskId, int32_t execType, bool chkptExec) {
int32_t code = 0;
int32_t tlen = 0;
@ -142,10 +142,18 @@ int32_t streamTaskSchedTask(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int3
stDebug("vgId:%d create msg to exec, type:%d, %s", vgId, execType, streamTaskGetExecType(execType));
}
SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = buf, .contLen = tlen + sizeof(SMsgHead)};
code = tmsgPutToQueue(pMsgCb, STREAM_QUEUE, &msg);
if (code) {
stError("vgId:%d failed to put msg into stream queue, code:%s, %x", vgId, tstrerror(code), taskId);
if (chkptExec) {
SRpcMsg msg = {.msgType = TDMT_STREAM_CHKPT_EXEC, .pCont = buf, .contLen = tlen + sizeof(SMsgHead)};
code = tmsgPutToQueue(pMsgCb, STREAM_CHKPT_QUEUE, &msg);
if (code) {
stError("vgId:%d failed to put msg into stream chkpt queue, code:%s, %x", vgId, tstrerror(code), taskId);
}
} else {
SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = buf, .contLen = tlen + sizeof(SMsgHead)};
code = tmsgPutToQueue(pMsgCb, STREAM_QUEUE, &msg);
if (code) {
stError("vgId:%d failed to put msg into stream queue, code:%s, %x", vgId, tstrerror(code), taskId);
}
}
return code;
}
@ -191,12 +199,17 @@ void streamTaskResumeHelper(void* param, void* tmrId) {
return;
}
code = streamTaskSchedTask(pTask->pMsgCb, pTask->info.nodeId, pId->streamId, pId->taskId, STREAM_EXEC_T_RESUME_TASK);
code = streamTaskSchedTask(pTask->pMsgCb, pTask->info.nodeId, pId->streamId, pId->taskId, STREAM_EXEC_T_RESUME_TASK,
(p.state == TASK_STATUS__CK));
if (code) {
stError("s-task:%s sched task failed, code:%s", pId->idStr, tstrerror(code));
} else {
stDebug("trigger to resume s-task:%s after idled for %dms", pId->idStr, pTask->status.schedIdleTime);
if (p.state == TASK_STATUS__CK) {
stDebug("trigger to resume s-task:%s in stream chkpt queue after idled for %dms", pId->idStr,
pTask->status.schedIdleTime);
} else {
stDebug("trigger to resume s-task:%s after idled for %dms", pId->idStr, pTask->status.schedIdleTime);
}
// release the task ref count
streamTaskClearSchedIdleInfo(pTask);
}
@ -339,7 +352,7 @@ void streamTaskSchedHelper(void* param, void* tmrId) {
}
}
code = streamTrySchedExec(pTask);
code = streamTrySchedExec(pTask, false);
if (code != TSDB_CODE_SUCCESS) {
stError("s-task:%s failed to sched to run, wait for next time", pTask->id.idStr);
}

View File

@ -1185,7 +1185,7 @@ int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq) {
if (code != 0) {
return code;
}
return streamTrySchedExec(pTask);
return streamTrySchedExec(pTask, false);
}
void streamTaskSetRemoveBackendFiles(SStreamTask* pTask) { pTask->status.removeBackendFiles = true; }