fix(stream): start scheduler task after set the refId.

This commit is contained in:
Haojun Liao 2024-10-29 19:14:37 +08:00
parent 0bc5f8cb88
commit 9bae0adba6
8 changed files with 28 additions and 13 deletions

View File

@ -38,7 +38,6 @@ int32_t sndBuildStreamTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProce
streamTaskOpenAllUpstreamInput(pTask);
streamTaskResetUpstreamStageInfo(pTask);
streamSetupScheduleTrigger(pTask);
SCheckpointInfo *pChkInfo = &pTask->chkInfo;
tqSetRestoreVersionInfo(pTask);

View File

@ -765,7 +765,6 @@ int32_t tqBuildStreamTask(void* pTqObj, SStreamTask* pTask, int64_t nextProcessV
}
streamTaskResetUpstreamStageInfo(pTask);
streamSetupScheduleTrigger(pTask);
SCheckpointInfo* pChkInfo = &pTask->chkInfo;
tqSetRestoreVersionInfo(pTask);

View File

@ -750,7 +750,7 @@ void rspMonitorFn(void* param, void* tmrId) {
SStreamTask* pTask = taosAcquireRef(streamTaskRefPool, taskRefId);
if (pTask == NULL) {
stError("invalid task rid:%" PRId64 " failed to acquired stream-task", taskRefId);
stError("invalid task rid:%" PRId64 " failed to acquired stream-task at %s", taskRefId, __func__);
streamTaskFreeRefId(param);
return;
}

View File

@ -1008,7 +1008,7 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) {
SStreamTask* pTask = taosAcquireRef(streamTaskRefPool, taskRefId);
if (pTask == NULL) {
stError("invalid task rid:%" PRId64 " failed to acquired stream-task", taskRefId);
stError("invalid task rid:%" PRId64 " failed to acquired stream-task at %s", taskRefId, __func__);
streamTaskFreeRefId(param);
return;
}

View File

@ -539,7 +539,7 @@ static void doMonitorDispatchData(void* param, void* tmrId) {
pTask = taosAcquireRef(streamTaskRefPool, taskRefId);
if (pTask == NULL) {
stError("invalid task rid:%" PRId64 " failed to acquired stream-task", taskRefId);
stError("invalid task rid:%" PRId64 " failed to acquired stream-task at %s", taskRefId, __func__);
streamTaskFreeRefId(param);
return;
}
@ -1082,7 +1082,7 @@ static void chkptReadyMsgSendMonitorFn(void* param, void* tmrId) {
SStreamTask* pTask = taosAcquireRef(streamTaskRefPool, taskRefId);
if (pTask == NULL) {
stError("invalid task rid:%" PRId64 " failed to acquired stream-task", taskRefId);
stError("invalid task rid:%" PRId64 " failed to acquired stream-task at %s", taskRefId, __func__);
streamTaskFreeRefId(param);
return;
}

View File

@ -745,6 +745,9 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa
int32_t val = atomic_add_fetch_32(&pMeta->numOfStreamTasks, 1);
}
// enable the scheduler for stream tasks
streamSetupScheduleTrigger(pTask);
*pAdded = true;
return code;
}
@ -1147,6 +1150,9 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) {
continue;
}
// enable the scheduler for stream tasks after acquire the task RefId.
streamSetupScheduleTrigger(pTask);
stInfo("s-task:0x%x vgId:%d set refId:%"PRId64, (int32_t) id.taskId, vgId, pTask->id.refId);
if (pTask->info.fillHistory == 0) {
int32_t val = atomic_add_fetch_32(&pMeta->numOfStreamTasks, 1);

View File

@ -20,12 +20,15 @@ static void streamTaskResumeHelper(void* param, void* tmrId);
static void streamTaskSchedHelper(void* param, void* tmrId);
void streamSetupScheduleTrigger(SStreamTask* pTask) {
int64_t delaySchema = pTask->info.delaySchedParam;
if (delaySchema != 0 && pTask->info.fillHistory == 0) {
int64_t delayParam = pTask->info.delaySchedParam;
if (delayParam != 0 && pTask->info.fillHistory == 0) {
int64_t* pTaskRefId = NULL;
int32_t code = streamTaskAllocRefId(pTask, &pTaskRefId);
if (code == 0) {
streamTmrStart(streamTaskSchedHelper, (int32_t)delaySchema, pTaskRefId, streamTimer,
stDebug("s-task:%s refId:%" PRId64 " enable the scheduler trigger, delay:%" PRId64, pTask->id.idStr,
pTask->id.refId, delayParam);
streamTmrStart(streamTaskSchedHelper, (int32_t)delayParam, pTaskRefId, streamTimer,
&pTask->schedInfo.pDelayTimer, pTask->pMeta->vgId, "sched-tmr");
pTask->schedInfo.status = TASK_TRIGGER_STATUS__INACTIVE;
}
@ -93,7 +96,7 @@ void streamTaskResumeHelper(void* param, void* tmrId) {
int64_t taskRefId = *(int64_t*)param;
SStreamTask* pTask = taosAcquireRef(streamTaskRefPool, taskRefId);
if (pTask == NULL) {
stError("invalid task rid:%" PRId64 " failed to acquired stream-task", taskRefId);
stError("invalid task rid:%" PRId64 " failed to acquired stream-task at %s", taskRefId, __func__);
streamTaskFreeRefId(param);
return;
}
@ -129,7 +132,7 @@ void streamTaskSchedHelper(void* param, void* tmrId) {
int64_t taskRefId = *(int64_t*)param;
SStreamTask* pTask = taosAcquireRef(streamTaskRefPool, taskRefId);
if (pTask == NULL) {
stError("invalid task rid:%" PRId64 " failed to acquired stream-task", taskRefId);
stError("invalid task rid:%" PRId64 " failed to acquired stream-task at %s", taskRefId, __func__);
streamTaskFreeRefId(param);
return;
}
@ -141,13 +144,21 @@ void streamTaskSchedHelper(void* param, void* tmrId) {
int8_t status = atomic_load_8(&pTask->schedInfo.status);
stTrace("s-task:%s in scheduler, trigger status:%d, next:%dms", id, status, nextTrigger);
if (streamTaskShouldStop(pTask) || streamTaskShouldPause(pTask)) {
if (streamTaskShouldStop(pTask)) {
stDebug("s-task:%s should stop, jump out of schedTimer", id);
streamMetaReleaseTask(pTask->pMeta, pTask);
streamTaskFreeRefId(param);
return;
}
if (streamTaskShouldPause(pTask)) {
stDebug("s-task:%s is paused, recheck in %.2fs", id, nextTrigger/1000.0);
streamTmrStart(streamTaskSchedHelper, nextTrigger, param, streamTimer, &pTask->schedInfo.pDelayTimer, vgId,
"sched-run-tmr");
streamMetaReleaseTask(pTask->pMeta, pTask);
return;
}
if (streamTaskGetStatus(pTask).state == TASK_STATUS__CK) {
stDebug("s-task:%s in checkpoint procedure, not retrieve result, next:%dms", id, nextTrigger);
} else {

View File

@ -583,7 +583,7 @@ void doExecScanhistoryInFuture(void* param, void* tmrId) {
SStreamTask* pTask = taosAcquireRef(streamTaskRefPool, taskRefId);
if (pTask == NULL) {
stError("invalid task rid:%" PRId64 " failed to acquired stream-task", taskRefId);
stError("invalid task rid:%" PRId64 " failed to acquired stream-task at %s", taskRefId, __func__);
streamTaskFreeRefId(param);
return;
}