refactor: do some internal refactor.

This commit is contained in:
Haojun Liao 2024-05-06 18:17:38 +08:00
parent c982f6ac80
commit 18c66a5491
9 changed files with 121 additions and 175 deletions

View File

@ -799,8 +799,11 @@ SStreamChildEpInfo* streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t
void streamTaskInputFail(SStreamTask* pTask);
int32_t streamExecTask(SStreamTask* pTask);
int32_t streamResumeTask(SStreamTask* pTask);
int32_t streamSchedExec(SStreamTask* pTask);
int32_t streamTrySchedExec(SStreamTask* pTask);
int32_t streamTaskSchedTask(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int32_t taskId, int32_t execType);
int32_t streamTaskResumeInFuture(SStreamTask* pTask);
void streamTaskClearSchedIdleInfo(SStreamTask* pTask);
void streamTaskSetIdleInfo(SStreamTask* pTask, int32_t idleTime);
bool streamTaskShouldStop(const SStreamTask* pStatus);
bool streamTaskShouldPause(const SStreamTask* pStatus);

View File

@ -62,29 +62,14 @@ typedef struct SBuildScanWalMsgParam {
} SBuildScanWalMsgParam;
static void doStartScanWal(void* param, void* tmrId) {
SBuildScanWalMsgParam* pParam = (SBuildScanWalMsgParam*) param;
int32_t vgId = pParam->pTq->pStreamMeta->vgId;
SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq));
if (pRunReq == NULL) {
taosMemoryFree(pParam);
terrno = TSDB_CODE_OUT_OF_MEMORY;
tqError("vgId:%d failed to create msg to start wal scanning to launch stream tasks, code:%s", vgId, terrstr());
return;
}
SBuildScanWalMsgParam* pParam = (SBuildScanWalMsgParam*)param;
STQ* pTq = pParam->pTq;
int32_t vgId = pTq->pStreamMeta->vgId;
tqDebug("vgId:%d create msg to start wal scan, numOfTasks:%d, vnd restored:%d", vgId, pParam->numOfTasks,
pParam->pTq->pVnode->restored);
pRunReq->head.vgId = vgId;
pRunReq->streamId = 0;
pRunReq->taskId = 0;
pRunReq->reqType = STREAM_EXEC_T_EXTRACT_WAL_DATA;
SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)};
tmsgPutToQueue(&pParam->pTq->pVnode->msgCb, STREAM_QUEUE, &msg);
pTq->pVnode->restored);
/*int32_t code = */ streamTaskSchedTask(&pTq->pVnode->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA);
taosMemoryFree(pParam);
}
@ -161,24 +146,7 @@ int32_t tqScanWalAsync(STQ* pTq, bool ckPause) {
int32_t tqStopStreamTasksAsync(STQ* pTq) {
SStreamMeta* pMeta = pTq->pStreamMeta;
int32_t vgId = pMeta->vgId;
SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq));
if (pRunReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tqError("vgId:%d failed to create msg to stop tasks async, code:%s", vgId, terrstr());
return -1;
}
tqDebug("vgId:%d create msg to stop all tasks async", vgId);
pRunReq->head.vgId = vgId;
pRunReq->streamId = 0;
pRunReq->taskId = 0;
pRunReq->reqType = STREAM_EXEC_T_STOP_ALL_TASKS;
SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)};
tmsgPutToQueue(&pTq->pVnode->msgCb, STREAM_QUEUE, &msg);
return 0;
return streamTaskSchedTask(&pTq->pVnode->msgCb, vgId, 0, 0, STREAM_EXEC_T_STOP_ALL_TASKS);
}
int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId) {
@ -394,7 +362,7 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) {
if ((numOfItems > 0) || hasNewData) {
noDataInWal = false;
code = streamSchedExec(pTask);
code = streamTrySchedExec(pTask);
if (code != TSDB_CODE_SUCCESS) {
streamMetaReleaseTask(pStreamMeta, pTask);
return -1;

View File

@ -946,7 +946,7 @@ static int32_t tqProcessTaskResumeImpl(void* handle, SStreamTask* pTask, int64_t
} else if (level == TASK_LEVEL__SOURCE && (streamQueueGetNumOfItems(pTask->inputq.queue) == 0)) {
tqScanWalAsync((STQ*)handle, false);
} else {
streamSchedExec(pTask);
streamTrySchedExec(pTask);
}
} else if (status == TASK_STATUS__UNINIT) {
// todo: fill-history task init ?

View File

@ -341,7 +341,7 @@ int32_t streamTaskCompleteCheckRsp(STaskCheckInfo* pInfo, bool lock, const char*
taosThreadMutexLock(&pInfo->checkInfoLock);
}
if (!pInfo->inCheckProcess) {
if (pInfo->inCheckProcess) {
int64_t el = (pInfo->startTs != 0) ? (taosGetTimestampMs() - pInfo->startTs) : 0;
stDebug("s-task:%s clear the in check-rsp flag, set the check-rsp done, elapsed time:%" PRId64 " ms", id, el);

View File

@ -157,7 +157,7 @@ static int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpoint
return TSDB_CODE_OUT_OF_MEMORY;
}
streamSchedExec(pTask);
streamTrySchedExec(pTask);
return TSDB_CODE_SUCCESS;
}

View File

@ -1255,7 +1255,7 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S
tmsgSendRsp(pRsp);
}
streamSchedExec(pTask);
streamTrySchedExec(pTask);
return 0;
}

View File

@ -533,8 +533,7 @@ int32_t streamProcessTransstateBlock(SStreamTask* pTask, SStreamDataBlock* pBloc
return code;
}
static void setTaskSchedInfo(SStreamTask* pTask, int32_t idleTime) { pTask->status.schedIdleTime = idleTime; }
static void clearTaskSchedInfo(SStreamTask* pTask) { pTask->status.schedIdleTime = 0; }
//static void streamTaskSetIdleInfo(SStreamTask* pTask, int32_t idleTime) { pTask->status.schedIdleTime = idleTime; }
static void setLastExecTs(SStreamTask* pTask, int64_t ts) { pTask->status.lastExecTs = ts; }
/**
@ -559,26 +558,26 @@ static int32_t doStreamExecTask(SStreamTask* pTask) {
if (streamQueueIsFull(pTask->outputq.queue)) {
stWarn("s-task:%s outputQ is full, idle for 500ms and retry", id);
setTaskSchedInfo(pTask, 500);
streamTaskSetIdleInfo(pTask, 500);
return 0;
}
if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED) {
stWarn("s-task:%s downstream task inputQ blocked, idle for 1sec and retry", id);
setTaskSchedInfo(pTask, 1000);
streamTaskSetIdleInfo(pTask, 1000);
return 0;
}
if (taosGetTimestampMs() - pTask->status.lastExecTs < MIN_INVOKE_INTERVAL) {
stDebug("s-task:%s invoke with high frequency, idle and retry exec in 50ms", id);
setTaskSchedInfo(pTask, MIN_INVOKE_INTERVAL);
streamTaskSetIdleInfo(pTask, MIN_INVOKE_INTERVAL);
return 0;
}
EExtractDataCode ret = streamTaskGetDataFromInputQ(pTask, &pInput, &numOfBlocks, &blockSize);
if (ret == EXEC_AFTER_IDLE) {
ASSERT(pInput == NULL && numOfBlocks == 0);
setTaskSchedInfo(pTask, MIN_INVOKE_INTERVAL);
streamTaskSetIdleInfo(pTask, MIN_INVOKE_INTERVAL);
return 0;
} else {
if (pInput == NULL) {
@ -720,66 +719,6 @@ bool streamTaskReadyToRun(const SStreamTask* pTask, char** pStatus) {
}
}
static void doStreamExecTaskHelper(void* param, void* tmrId) {
SStreamTask* pTask = (SStreamTask*)param;
SStreamTaskState* p = streamTaskGetStatus(pTask);
if (p->state == TASK_STATUS__DROPPING || p->state == TASK_STATUS__STOP) {
streamTaskSetSchedStatusInactive(pTask);
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s status:%s not resume task, ref:%d", pTask->id.idStr, p->name, ref);
streamMetaReleaseTask(pTask->pMeta, pTask);
return;
}
// task resume running
SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq));
if (pRunReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
/*int8_t status = */streamTaskSetSchedStatusInactive(pTask);
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stError("failed to create msg to resume s-task:%s, reason out of memory, ref:%d", pTask->id.idStr, ref);
streamMetaReleaseTask(pTask->pMeta, pTask);
return;
}
pRunReq->head.vgId = pTask->info.nodeId;
pRunReq->streamId = pTask->id.streamId;
pRunReq->taskId = pTask->id.taskId;
pRunReq->reqType = STREAM_EXEC_T_RESUME_TASK;
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("trigger to resume s-task:%s after being idled for %dms, ref:%d", pTask->id.idStr, pTask->status.schedIdleTime, ref);
SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)};
tmsgPutToQueue(pTask->pMsgCb, STREAM_QUEUE, &msg);
// release the task ref count
clearTaskSchedInfo(pTask);
streamMetaReleaseTask(pTask->pMeta, pTask);
}
static int32_t schedTaskInFuture(SStreamTask* pTask) {
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s task should idle, add into timer to retry in %dms, ref:%d", pTask->id.idStr,
pTask->status.schedIdleTime, ref);
// add one ref count for task
/*SStreamTask* pAddRefTask = */streamMetaAcquireOneTask(pTask);
if (pTask->schedInfo.pIdleTimer == NULL) {
pTask->schedInfo.pIdleTimer = taosTmrStart(doStreamExecTaskHelper, pTask->status.schedIdleTime, pTask, streamTimer);
} else {
taosTmrReset(doStreamExecTaskHelper, pTask->status.schedIdleTime, pTask, streamTimer, &pTask->schedInfo.pIdleTimer);
}
return TSDB_CODE_SUCCESS;
}
int32_t streamResumeTask(SStreamTask* pTask) {
ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__ACTIVE);
const char* id = pTask->id.idStr;
@ -793,7 +732,7 @@ int32_t streamResumeTask(SStreamTask* pTask) {
int32_t numOfItems = streamQueueGetNumOfItems(pTask->inputq.queue);
if ((numOfItems == 0) || streamTaskShouldStop(pTask) || streamTaskShouldPause(pTask)) {
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
clearTaskSchedInfo(pTask);
streamTaskClearSchedIdleInfo(pTask);
taosThreadMutexUnlock(&pTask->lock);
setLastExecTs(pTask, taosGetTimestampMs());
@ -806,7 +745,7 @@ int32_t streamResumeTask(SStreamTask* pTask) {
} else {
// check if this task needs to be idle for a while
if (pTask->status.schedIdleTime > 0) {
schedTaskInFuture(pTask);
streamTaskResumeInFuture(pTask);
taosThreadMutexUnlock(&pTask->lock);
setLastExecTs(pTask, taosGetTimestampMs());

View File

@ -16,7 +16,103 @@
#include "streamInt.h"
#include "ttimer.h"
static void streamSchedByTimer(void* param, void* tmrId) {
static void streamTaskResumeHelper(void* param, void* tmrId);
static void streamSchedByTimer(void* param, void* tmrId);
int32_t streamSetupScheduleTrigger(SStreamTask* pTask) {
if (pTask->info.triggerParam != 0 && pTask->info.fillHistory == 0) {
int32_t ref = atomic_add_fetch_32(&pTask->refCnt, 1);
ASSERT(ref == 2 && pTask->schedInfo.pDelayTimer == NULL);
stDebug("s-task:%s setup scheduler trigger, delay:%" PRId64 " ms", pTask->id.idStr, pTask->info.triggerParam);
pTask->schedInfo.pDelayTimer = taosTmrStart(streamSchedByTimer, (int32_t)pTask->info.triggerParam, pTask, streamTimer);
pTask->schedInfo.status = TASK_TRIGGER_STATUS__INACTIVE;
}
return 0;
}
int32_t streamTrySchedExec(SStreamTask* pTask) {
if (streamTaskSetSchedStatusWait(pTask)) {
streamTaskSchedTask(pTask->pMsgCb, pTask->info.nodeId, pTask->id.streamId, pTask->id.taskId, 0);
} else {
stTrace("s-task:%s not launch task since sched status:%d", pTask->id.idStr, pTask->status.schedStatus);
}
return 0;
}
int32_t streamTaskSchedTask(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int32_t taskId, int32_t execType) {
SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq));
if (pRunReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
stError("vgId:%d failed to create msg to start stream task:0x%x exec, type:%d, code:%s", vgId, taskId, execType,
terrstr());
return -1;
}
stDebug("vgId:%d create msg to start stream task:0x%x, exec type:%d", vgId, taskId, execType);
pRunReq->head.vgId = vgId;
pRunReq->streamId = streamId;
pRunReq->taskId = taskId;
pRunReq->reqType = execType;
SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)};
tmsgPutToQueue(pMsgCb, STREAM_QUEUE, &msg);
return TSDB_CODE_SUCCESS;
}
void streamTaskClearSchedIdleInfo(SStreamTask* pTask) { pTask->status.schedIdleTime = 0; }
void streamTaskSetIdleInfo(SStreamTask* pTask, int32_t idleTime) { pTask->status.schedIdleTime = idleTime; }
int32_t streamTaskResumeInFuture(SStreamTask* pTask) {
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s task should idle, add into timer to retry in %dms, ref:%d", pTask->id.idStr,
pTask->status.schedIdleTime, ref);
// add one ref count for task
/*SStreamTask* pAddRefTask = */streamMetaAcquireOneTask(pTask);
if (pTask->schedInfo.pIdleTimer == NULL) {
pTask->schedInfo.pIdleTimer = taosTmrStart(streamTaskResumeHelper, pTask->status.schedIdleTime, pTask, streamTimer);
} else {
taosTmrReset(streamTaskResumeHelper, pTask->status.schedIdleTime, pTask, streamTimer, &pTask->schedInfo.pIdleTimer);
}
return TSDB_CODE_SUCCESS;
}
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
void streamTaskResumeHelper(void* param, void* tmrId) {
SStreamTask* pTask = (SStreamTask*)param;
SStreamTaskId* pId = &pTask->id;
SStreamTaskState* p = streamTaskGetStatus(pTask);
if (p->state == TASK_STATUS__DROPPING || p->state == TASK_STATUS__STOP) {
streamTaskSetSchedStatusInactive(pTask);
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s status:%s not resume task, ref:%d", pId->idStr, p->name, ref);
streamMetaReleaseTask(pTask->pMeta, pTask);
return;
}
streamTaskSchedTask(pTask->pMsgCb, pTask->info.nodeId, pId->streamId, pId->taskId, STREAM_EXEC_T_RESUME_TASK);
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("trigger to resume s-task:%s after being idled for %dms, ref:%d", pId->idStr, pTask->status.schedIdleTime,
ref);
// release the task ref count
streamTaskClearSchedIdleInfo(pTask);
streamMetaReleaseTask(pTask->pMeta, pTask);
}
void streamSchedByTimer(void* param, void* tmrId) {
SStreamTask* pTask = (void*)param;
const char* id = pTask->id.idStr;
int32_t nextTrigger = (int32_t)pTask->info.triggerParam;
@ -61,69 +157,9 @@ static void streamSchedByTimer(void* param, void* tmrId) {
return;
}
streamSchedExec(pTask);
streamTrySchedExec(pTask);
}
}
taosTmrReset(streamSchedByTimer, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer);
}
int32_t streamSetupScheduleTrigger(SStreamTask* pTask) {
if (pTask->info.triggerParam != 0 && pTask->info.fillHistory == 0) {
int32_t ref = atomic_add_fetch_32(&pTask->refCnt, 1);
ASSERT(ref == 2 && pTask->schedInfo.pDelayTimer == NULL);
stDebug("s-task:%s setup scheduler trigger, delay:%" PRId64 " ms", pTask->id.idStr, pTask->info.triggerParam);
pTask->schedInfo.pDelayTimer = taosTmrStart(streamSchedByTimer, (int32_t)pTask->info.triggerParam, pTask, streamTimer);
pTask->schedInfo.status = TASK_TRIGGER_STATUS__INACTIVE;
}
return 0;
}
int32_t streamSchedExec(SStreamTask* pTask) {
if (streamTaskSetSchedStatusWait(pTask)) {
SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq));
if (pRunReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
/*int8_t status = */streamTaskSetSchedStatusInactive(pTask);
stError("failed to create msg to aunch s-task:%s, reason out of memory", pTask->id.idStr);
return -1;
}
pRunReq->head.vgId = pTask->info.nodeId;
pRunReq->streamId = pTask->id.streamId;
pRunReq->taskId = pTask->id.taskId;
stDebug("trigger to run s-task:%s", pTask->id.idStr);
SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)};
tmsgPutToQueue(pTask->pMsgCb, STREAM_QUEUE, &msg);
} else {
stTrace("s-task:%s not launch task since sched status:%d", pTask->id.idStr, pTask->status.schedStatus);
}
return 0;
}
int32_t streamTaskSchedTask(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int32_t taskId, int32_t execType) {
SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq));
if (pRunReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
stError("vgId:%d failed to create msg to start stream task:0x%x, type:%d, code:%s", vgId, taskId, execType,
terrstr());
return -1;
}
stDebug("vgId:%d create msg to start stream task:0x%x", vgId, taskId);
pRunReq->head.vgId = vgId;
pRunReq->streamId = streamId;
pRunReq->taskId = taskId;
pRunReq->reqType = execType;
SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)};
tmsgPutToQueue(pMsgCb, STREAM_QUEUE, &msg);
return TSDB_CODE_SUCCESS;
}

View File

@ -1042,5 +1042,5 @@ int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq) {
if(code != 0){
return code;
}
return streamSchedExec(pTask);
return streamTrySchedExec(pTask);
}