diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index a8a695cc5b..1db62abfc0 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -799,8 +799,11 @@ SStreamChildEpInfo* streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t void streamTaskInputFail(SStreamTask* pTask); int32_t streamExecTask(SStreamTask* pTask); int32_t streamResumeTask(SStreamTask* pTask); -int32_t streamSchedExec(SStreamTask* pTask); +int32_t streamTrySchedExec(SStreamTask* pTask); int32_t streamTaskSchedTask(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int32_t taskId, int32_t execType); +int32_t streamTaskResumeInFuture(SStreamTask* pTask); +void streamTaskClearSchedIdleInfo(SStreamTask* pTask); +void streamTaskSetIdleInfo(SStreamTask* pTask, int32_t idleTime); bool streamTaskShouldStop(const SStreamTask* pStatus); bool streamTaskShouldPause(const SStreamTask* pStatus); diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index e22ebb9d84..5a29f67ae3 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -62,29 +62,14 @@ typedef struct SBuildScanWalMsgParam { } SBuildScanWalMsgParam; static void doStartScanWal(void* param, void* tmrId) { - SBuildScanWalMsgParam* pParam = (SBuildScanWalMsgParam*) param; - - int32_t vgId = pParam->pTq->pStreamMeta->vgId; - - SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq)); - if (pRunReq == NULL) { - taosMemoryFree(pParam); - terrno = TSDB_CODE_OUT_OF_MEMORY; - tqError("vgId:%d failed to create msg to start wal scanning to launch stream tasks, code:%s", vgId, terrstr()); - return; - } + SBuildScanWalMsgParam* pParam = (SBuildScanWalMsgParam*)param; + STQ* pTq = pParam->pTq; + int32_t vgId = pTq->pStreamMeta->vgId; tqDebug("vgId:%d create msg to start wal scan, numOfTasks:%d, vnd restored:%d", vgId, pParam->numOfTasks, - pParam->pTq->pVnode->restored); - - pRunReq->head.vgId = vgId; - pRunReq->streamId = 0; - pRunReq->taskId = 0; - pRunReq->reqType = STREAM_EXEC_T_EXTRACT_WAL_DATA; - - SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)}; - tmsgPutToQueue(&pParam->pTq->pVnode->msgCb, STREAM_QUEUE, &msg); + pTq->pVnode->restored); + /*int32_t code = */ streamTaskSchedTask(&pTq->pVnode->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA); taosMemoryFree(pParam); } @@ -161,24 +146,7 @@ int32_t tqScanWalAsync(STQ* pTq, bool ckPause) { int32_t tqStopStreamTasksAsync(STQ* pTq) { SStreamMeta* pMeta = pTq->pStreamMeta; int32_t vgId = pMeta->vgId; - - SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq)); - if (pRunReq == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - tqError("vgId:%d failed to create msg to stop tasks async, code:%s", vgId, terrstr()); - return -1; - } - - tqDebug("vgId:%d create msg to stop all tasks async", vgId); - - pRunReq->head.vgId = vgId; - pRunReq->streamId = 0; - pRunReq->taskId = 0; - pRunReq->reqType = STREAM_EXEC_T_STOP_ALL_TASKS; - - SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)}; - tmsgPutToQueue(&pTq->pVnode->msgCb, STREAM_QUEUE, &msg); - return 0; + return streamTaskSchedTask(&pTq->pVnode->msgCb, vgId, 0, 0, STREAM_EXEC_T_STOP_ALL_TASKS); } int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId) { @@ -394,7 +362,7 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) { if ((numOfItems > 0) || hasNewData) { noDataInWal = false; - code = streamSchedExec(pTask); + code = streamTrySchedExec(pTask); if (code != TSDB_CODE_SUCCESS) { streamMetaReleaseTask(pStreamMeta, pTask); return -1; diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 45dec1d859..be3023122f 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -946,7 +946,7 @@ static int32_t tqProcessTaskResumeImpl(void* handle, SStreamTask* pTask, int64_t } else if (level == TASK_LEVEL__SOURCE && (streamQueueGetNumOfItems(pTask->inputq.queue) == 0)) { tqScanWalAsync((STQ*)handle, false); } else { - streamSchedExec(pTask); + streamTrySchedExec(pTask); } } else if (status == TASK_STATUS__UNINIT) { // todo: fill-history task init ? diff --git a/source/libs/stream/src/streamCheckStatus.c b/source/libs/stream/src/streamCheckStatus.c index 7a4c6becf8..f0f12cae2b 100644 --- a/source/libs/stream/src/streamCheckStatus.c +++ b/source/libs/stream/src/streamCheckStatus.c @@ -341,7 +341,7 @@ int32_t streamTaskCompleteCheckRsp(STaskCheckInfo* pInfo, bool lock, const char* taosThreadMutexLock(&pInfo->checkInfoLock); } - if (!pInfo->inCheckProcess) { + if (pInfo->inCheckProcess) { int64_t el = (pInfo->startTs != 0) ? (taosGetTimestampMs() - pInfo->startTs) : 0; stDebug("s-task:%s clear the in check-rsp flag, set the check-rsp done, elapsed time:%" PRId64 " ms", id, el); diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index e66458ca1d..d2f9a0cbc3 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -157,7 +157,7 @@ static int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpoint return TSDB_CODE_OUT_OF_MEMORY; } - streamSchedExec(pTask); + streamTrySchedExec(pTask); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 43d9beb537..d56b347f4c 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -1255,7 +1255,7 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S tmsgSendRsp(pRsp); } - streamSchedExec(pTask); + streamTrySchedExec(pTask); return 0; } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 250866005e..047b169ec9 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -533,8 +533,7 @@ int32_t streamProcessTransstateBlock(SStreamTask* pTask, SStreamDataBlock* pBloc return code; } -static void setTaskSchedInfo(SStreamTask* pTask, int32_t idleTime) { pTask->status.schedIdleTime = idleTime; } -static void clearTaskSchedInfo(SStreamTask* pTask) { pTask->status.schedIdleTime = 0; } +//static void streamTaskSetIdleInfo(SStreamTask* pTask, int32_t idleTime) { pTask->status.schedIdleTime = idleTime; } static void setLastExecTs(SStreamTask* pTask, int64_t ts) { pTask->status.lastExecTs = ts; } /** @@ -559,26 +558,26 @@ static int32_t doStreamExecTask(SStreamTask* pTask) { if (streamQueueIsFull(pTask->outputq.queue)) { stWarn("s-task:%s outputQ is full, idle for 500ms and retry", id); - setTaskSchedInfo(pTask, 500); + streamTaskSetIdleInfo(pTask, 500); return 0; } if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED) { stWarn("s-task:%s downstream task inputQ blocked, idle for 1sec and retry", id); - setTaskSchedInfo(pTask, 1000); + streamTaskSetIdleInfo(pTask, 1000); return 0; } if (taosGetTimestampMs() - pTask->status.lastExecTs < MIN_INVOKE_INTERVAL) { stDebug("s-task:%s invoke with high frequency, idle and retry exec in 50ms", id); - setTaskSchedInfo(pTask, MIN_INVOKE_INTERVAL); + streamTaskSetIdleInfo(pTask, MIN_INVOKE_INTERVAL); return 0; } EExtractDataCode ret = streamTaskGetDataFromInputQ(pTask, &pInput, &numOfBlocks, &blockSize); if (ret == EXEC_AFTER_IDLE) { ASSERT(pInput == NULL && numOfBlocks == 0); - setTaskSchedInfo(pTask, MIN_INVOKE_INTERVAL); + streamTaskSetIdleInfo(pTask, MIN_INVOKE_INTERVAL); return 0; } else { if (pInput == NULL) { @@ -720,66 +719,6 @@ bool streamTaskReadyToRun(const SStreamTask* pTask, char** pStatus) { } } -static void doStreamExecTaskHelper(void* param, void* tmrId) { - SStreamTask* pTask = (SStreamTask*)param; - - SStreamTaskState* p = streamTaskGetStatus(pTask); - if (p->state == TASK_STATUS__DROPPING || p->state == TASK_STATUS__STOP) { - streamTaskSetSchedStatusInactive(pTask); - - int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); - stDebug("s-task:%s status:%s not resume task, ref:%d", pTask->id.idStr, p->name, ref); - - streamMetaReleaseTask(pTask->pMeta, pTask); - return; - } - - // task resume running - SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq)); - if (pRunReq == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - /*int8_t status = */streamTaskSetSchedStatusInactive(pTask); - - int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); - stError("failed to create msg to resume s-task:%s, reason out of memory, ref:%d", pTask->id.idStr, ref); - - streamMetaReleaseTask(pTask->pMeta, pTask); - return; - } - - pRunReq->head.vgId = pTask->info.nodeId; - pRunReq->streamId = pTask->id.streamId; - pRunReq->taskId = pTask->id.taskId; - pRunReq->reqType = STREAM_EXEC_T_RESUME_TASK; - - int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); - stDebug("trigger to resume s-task:%s after being idled for %dms, ref:%d", pTask->id.idStr, pTask->status.schedIdleTime, ref); - - SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)}; - tmsgPutToQueue(pTask->pMsgCb, STREAM_QUEUE, &msg); - - // release the task ref count - clearTaskSchedInfo(pTask); - streamMetaReleaseTask(pTask->pMeta, pTask); -} - -static int32_t schedTaskInFuture(SStreamTask* pTask) { - int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); - stDebug("s-task:%s task should idle, add into timer to retry in %dms, ref:%d", pTask->id.idStr, - pTask->status.schedIdleTime, ref); - - // add one ref count for task - /*SStreamTask* pAddRefTask = */streamMetaAcquireOneTask(pTask); - - if (pTask->schedInfo.pIdleTimer == NULL) { - pTask->schedInfo.pIdleTimer = taosTmrStart(doStreamExecTaskHelper, pTask->status.schedIdleTime, pTask, streamTimer); - } else { - taosTmrReset(doStreamExecTaskHelper, pTask->status.schedIdleTime, pTask, streamTimer, &pTask->schedInfo.pIdleTimer); - } - - return TSDB_CODE_SUCCESS; -} - int32_t streamResumeTask(SStreamTask* pTask) { ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__ACTIVE); const char* id = pTask->id.idStr; @@ -793,7 +732,7 @@ int32_t streamResumeTask(SStreamTask* pTask) { int32_t numOfItems = streamQueueGetNumOfItems(pTask->inputq.queue); if ((numOfItems == 0) || streamTaskShouldStop(pTask) || streamTaskShouldPause(pTask)) { atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); - clearTaskSchedInfo(pTask); + streamTaskClearSchedIdleInfo(pTask); taosThreadMutexUnlock(&pTask->lock); setLastExecTs(pTask, taosGetTimestampMs()); @@ -806,7 +745,7 @@ int32_t streamResumeTask(SStreamTask* pTask) { } else { // check if this task needs to be idle for a while if (pTask->status.schedIdleTime > 0) { - schedTaskInFuture(pTask); + streamTaskResumeInFuture(pTask); taosThreadMutexUnlock(&pTask->lock); setLastExecTs(pTask, taosGetTimestampMs()); diff --git a/source/libs/stream/src/streamSched.c b/source/libs/stream/src/streamSched.c index f8ef4e4f71..2e337234b6 100644 --- a/source/libs/stream/src/streamSched.c +++ b/source/libs/stream/src/streamSched.c @@ -16,7 +16,103 @@ #include "streamInt.h" #include "ttimer.h" -static void streamSchedByTimer(void* param, void* tmrId) { +static void streamTaskResumeHelper(void* param, void* tmrId); +static void streamSchedByTimer(void* param, void* tmrId); + +int32_t streamSetupScheduleTrigger(SStreamTask* pTask) { + if (pTask->info.triggerParam != 0 && pTask->info.fillHistory == 0) { + int32_t ref = atomic_add_fetch_32(&pTask->refCnt, 1); + ASSERT(ref == 2 && pTask->schedInfo.pDelayTimer == NULL); + + stDebug("s-task:%s setup scheduler trigger, delay:%" PRId64 " ms", pTask->id.idStr, pTask->info.triggerParam); + + pTask->schedInfo.pDelayTimer = taosTmrStart(streamSchedByTimer, (int32_t)pTask->info.triggerParam, pTask, streamTimer); + pTask->schedInfo.status = TASK_TRIGGER_STATUS__INACTIVE; + } + + return 0; +} + +int32_t streamTrySchedExec(SStreamTask* pTask) { + if (streamTaskSetSchedStatusWait(pTask)) { + streamTaskSchedTask(pTask->pMsgCb, pTask->info.nodeId, pTask->id.streamId, pTask->id.taskId, 0); + } else { + stTrace("s-task:%s not launch task since sched status:%d", pTask->id.idStr, pTask->status.schedStatus); + } + + return 0; +} + +int32_t streamTaskSchedTask(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int32_t taskId, int32_t execType) { + SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq)); + if (pRunReq == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + stError("vgId:%d failed to create msg to start stream task:0x%x exec, type:%d, code:%s", vgId, taskId, execType, + terrstr()); + return -1; + } + + stDebug("vgId:%d create msg to start stream task:0x%x, exec type:%d", vgId, taskId, execType); + + pRunReq->head.vgId = vgId; + pRunReq->streamId = streamId; + pRunReq->taskId = taskId; + pRunReq->reqType = execType; + + SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)}; + tmsgPutToQueue(pMsgCb, STREAM_QUEUE, &msg); + return TSDB_CODE_SUCCESS; +} + +void streamTaskClearSchedIdleInfo(SStreamTask* pTask) { pTask->status.schedIdleTime = 0; } + +void streamTaskSetIdleInfo(SStreamTask* pTask, int32_t idleTime) { pTask->status.schedIdleTime = idleTime; } + +int32_t streamTaskResumeInFuture(SStreamTask* pTask) { + int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); + stDebug("s-task:%s task should idle, add into timer to retry in %dms, ref:%d", pTask->id.idStr, + pTask->status.schedIdleTime, ref); + + // add one ref count for task + /*SStreamTask* pAddRefTask = */streamMetaAcquireOneTask(pTask); + + if (pTask->schedInfo.pIdleTimer == NULL) { + pTask->schedInfo.pIdleTimer = taosTmrStart(streamTaskResumeHelper, pTask->status.schedIdleTime, pTask, streamTimer); + } else { + taosTmrReset(streamTaskResumeHelper, pTask->status.schedIdleTime, pTask, streamTimer, &pTask->schedInfo.pIdleTimer); + } + + return TSDB_CODE_SUCCESS; +} + +////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +void streamTaskResumeHelper(void* param, void* tmrId) { + SStreamTask* pTask = (SStreamTask*)param; + SStreamTaskId* pId = &pTask->id; + SStreamTaskState* p = streamTaskGetStatus(pTask); + + if (p->state == TASK_STATUS__DROPPING || p->state == TASK_STATUS__STOP) { + streamTaskSetSchedStatusInactive(pTask); + + int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + stDebug("s-task:%s status:%s not resume task, ref:%d", pId->idStr, p->name, ref); + + streamMetaReleaseTask(pTask->pMeta, pTask); + return; + } + + streamTaskSchedTask(pTask->pMsgCb, pTask->info.nodeId, pId->streamId, pId->taskId, STREAM_EXEC_T_RESUME_TASK); + + int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + stDebug("trigger to resume s-task:%s after being idled for %dms, ref:%d", pId->idStr, pTask->status.schedIdleTime, + ref); + + // release the task ref count + streamTaskClearSchedIdleInfo(pTask); + streamMetaReleaseTask(pTask->pMeta, pTask); +} + +void streamSchedByTimer(void* param, void* tmrId) { SStreamTask* pTask = (void*)param; const char* id = pTask->id.idStr; int32_t nextTrigger = (int32_t)pTask->info.triggerParam; @@ -61,69 +157,9 @@ static void streamSchedByTimer(void* param, void* tmrId) { return; } - streamSchedExec(pTask); + streamTrySchedExec(pTask); } } taosTmrReset(streamSchedByTimer, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer); } - -int32_t streamSetupScheduleTrigger(SStreamTask* pTask) { - if (pTask->info.triggerParam != 0 && pTask->info.fillHistory == 0) { - int32_t ref = atomic_add_fetch_32(&pTask->refCnt, 1); - ASSERT(ref == 2 && pTask->schedInfo.pDelayTimer == NULL); - - stDebug("s-task:%s setup scheduler trigger, delay:%" PRId64 " ms", pTask->id.idStr, pTask->info.triggerParam); - - pTask->schedInfo.pDelayTimer = taosTmrStart(streamSchedByTimer, (int32_t)pTask->info.triggerParam, pTask, streamTimer); - pTask->schedInfo.status = TASK_TRIGGER_STATUS__INACTIVE; - } - - return 0; -} - -int32_t streamSchedExec(SStreamTask* pTask) { - if (streamTaskSetSchedStatusWait(pTask)) { - SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq)); - if (pRunReq == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - /*int8_t status = */streamTaskSetSchedStatusInactive(pTask); - stError("failed to create msg to aunch s-task:%s, reason out of memory", pTask->id.idStr); - return -1; - } - - pRunReq->head.vgId = pTask->info.nodeId; - pRunReq->streamId = pTask->id.streamId; - pRunReq->taskId = pTask->id.taskId; - - stDebug("trigger to run s-task:%s", pTask->id.idStr); - - SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)}; - tmsgPutToQueue(pTask->pMsgCb, STREAM_QUEUE, &msg); - } else { - stTrace("s-task:%s not launch task since sched status:%d", pTask->id.idStr, pTask->status.schedStatus); - } - - return 0; -} - -int32_t streamTaskSchedTask(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int32_t taskId, int32_t execType) { - SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq)); - if (pRunReq == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - stError("vgId:%d failed to create msg to start stream task:0x%x, type:%d, code:%s", vgId, taskId, execType, - terrstr()); - return -1; - } - - stDebug("vgId:%d create msg to start stream task:0x%x", vgId, taskId); - - pRunReq->head.vgId = vgId; - pRunReq->streamId = streamId; - pRunReq->taskId = taskId; - pRunReq->reqType = execType; - - SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)}; - tmsgPutToQueue(pMsgCb, STREAM_QUEUE, &msg); - return TSDB_CODE_SUCCESS; -} diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 1f0cbc1695..c7e34987ab 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -1042,5 +1042,5 @@ int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq) { if(code != 0){ return code; } - return streamSchedExec(pTask); + return streamTrySchedExec(pTask); } \ No newline at end of file