fix(stream): fix bugs caused by refactor sm.

This commit is contained in:
Haojun Liao 2023-10-20 14:19:57 +08:00
parent d75948160d
commit 3b8c85f632
4 changed files with 26 additions and 49 deletions

View File

@ -717,6 +717,7 @@ int32_t streamSchedExec(SStreamTask* pTask);
bool streamTaskShouldStop(const SStreamTask* pStatus);
bool streamTaskShouldPause(const SStreamTask* pStatus);
bool streamTaskIsIdle(const SStreamTask* pTask);
bool streamTaskReadyToRun(const SStreamTask* pTask, char** pStatus);
char* createStreamTaskIdStr(int64_t streamId, int32_t taskId);
ETaskStatus streamTaskGetStatus(const SStreamTask* pTask, char** pStr);
@ -758,8 +759,7 @@ int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue);
// common
int32_t streamRestoreParam(SStreamTask* pTask);
void streamTaskPause(SStreamTask* pTask, SStreamMeta* pMeta);
void streamTaskResume(SStreamTask* pTask, SStreamMeta* pMeta);
void streamTaskResumeFromHalt(SStreamTask* pTask);
void streamTaskResume(SStreamTask* pTask);
void streamTaskDisablePause(SStreamTask* pTask);
void streamTaskEnablePause(SStreamTask* pTask);
int32_t streamTaskSetUpstreamInfo(SStreamTask* pTask, const SStreamTask* pUpstreamTask);

View File

@ -1365,8 +1365,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->streamId, taskId);
if (pTask != NULL) { // even in halt status, the data in inputQ must be processed
char* p = NULL;
ETaskStatus st = streamTaskGetStatus(pTask, &p);
if (st == TASK_STATUS__READY || st == TASK_STATUS__SCAN_HISTORY || st == TASK_STATUS__CK) {
if (streamTaskReadyToRun(pTask, &p)) {
tqDebug("vgId:%d s-task:%s start to process block from inputQ, next checked ver:%" PRId64, vgId, pTask->id.idStr,
pTask->chkInfo.nextProcessVer);
streamExecTask(pTask);
@ -1515,7 +1514,7 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion,
return -1;
}
streamTaskResume(pTask, pTq->pStreamMeta);
streamTaskResume(pTask);
int32_t level = pTask->info.taskLevel;
if (level == TASK_LEVEL__SINK) {
@ -1874,6 +1873,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
if (ppHTask == NULL || *ppHTask == NULL) {
tqError("vgId:%d failed to acquire fill-history task:0x%x when handling update, it may have been dropped already",
pMeta->vgId, req.taskId);
CLEAR_RELATED_FILLHISTORY_TASK(pTask);
} else {
tqDebug("s-task:%s fill-history task update nodeEp along with stream task", (*ppHTask)->id.idStr);
streamTaskUpdateEpsetInfo(*ppHTask, req.pNodeList);

View File

@ -352,7 +352,7 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
// 3. resume the state of stream task, after this function, the stream task will run immidately. But it can not be
// pause, since the pause allowed attribute is not set yet.
streamTaskResumeFromHalt(pStreamTask); // todo refactor: use streamTaskResume.
streamTaskResume(pStreamTask); // todo refactor: use streamTaskResume.
stDebug("s-task:%s fill-history task set status to be dropping, save the state into disk", pTask->id.idStr);
@ -610,6 +610,12 @@ bool streamTaskIsIdle(const SStreamTask* pTask) {
status == TASK_STATUS__DROPPING);
}
bool streamTaskReadyToRun(const SStreamTask* pTask, char** pStatus) {
ETaskStatus st = streamTaskGetStatus(pTask, NULL);
return (st == TASK_STATUS__READY || st == TASK_STATUS__SCAN_HISTORY || st == TASK_STATUS__STREAM_SCAN_HISTORY ||
st == TASK_STATUS__CK);
}
int32_t streamExecTask(SStreamTask* pTask) {
// this function may be executed by multi-threads, so status check is required.
const char* id = pTask->id.idStr;

View File

@ -1021,36 +1021,25 @@ void streamTaskPause(SStreamTask* pTask, SStreamMeta* pMeta) {
stDebug("vgId:%d s-task:%s set pause flag and pause task", pMeta->vgId, pTask->id.idStr);
}
void streamTaskResume(SStreamTask* pTask, SStreamMeta* pMeta) {
char* p = NULL;
ETaskStatus status = streamTaskGetStatus(pTask, &p);
void streamTaskResume(SStreamTask* pTask) {
char* p = NULL;
ETaskStatus status = streamTaskGetStatus(pTask, &p);
SStreamMeta* pMeta = pTask->pMeta;
if (status == TASK_STATUS__PAUSE) {
if (status == TASK_STATUS__PAUSE || status == TASK_STATUS__HALT) {
streamTaskRestoreStatus(pTask);
streamTaskGetStatus(pTask, &p);
int32_t num = atomic_sub_fetch_32(&pMeta->numOfPausedTasks, 1);
stInfo("vgId:%d s-task:%s sink task.resume from pause, status:%s. pause task num:%d", pMeta->vgId, pTask->id.idStr,
p, num);
char* pNew = NULL;
streamTaskGetStatus(pTask, &pNew);
if (status == TASK_STATUS__PAUSE) {
int32_t num = atomic_sub_fetch_32(&pMeta->numOfPausedTasks, 1);
stInfo("s-task:%s status:%s resume from %s, paused task(s):%d", pTask->id.idStr, pNew, p, num);
} else {
stInfo("s-task:%s status:%s resume from %s", pTask->id.idStr, pNew, p);
}
} else {
stDebug("s-task:%s status:%s not in pause status, no need to resume", pTask->id.idStr, p);
stDebug("s-task:%s status:%s not in pause/halt status, no need to resume", pTask->id.idStr, p);
}
#if 0
int8_t status = pTask->status.taskStatus;
if (status == TASK_STATUS__PAUSE) {
pTask->status.taskStatus = pTask->status.keepTaskStatus;
pTask->status.keepTaskStatus = TASK_STATUS__READY;
int32_t num = atomic_sub_fetch_32(&pMeta->numOfPausedTasks, 1);
stInfo("vgId:%d s-task:%s resume from pause, status:%s. pause task num:%d", pMeta->vgId, pTask->id.idStr, streamGetTaskStatusStr(status), num);
} else if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
int32_t num = atomic_sub_fetch_32(&pMeta->numOfPausedTasks, 1);
stInfo("vgId:%d s-task:%s sink task.resume from pause, status:%s. pause task num:%d", pMeta->vgId, pTask->id.idStr, streamGetTaskStatusStr(status), num);
} else {
stError("s-task:%s not in pause, failed to resume, status:%s", pTask->id.idStr, streamGetTaskStatusStr(status));
}
#endif
}
// todo fix race condition
@ -1071,24 +1060,6 @@ void streamTaskEnablePause(SStreamTask* pTask) {
pTask->status.pauseAllowed = 1;
}
void streamTaskResumeFromHalt(SStreamTask* pTask) {
const char* id = pTask->id.idStr;
char* p = NULL;
ASSERT(streamTaskGetStatus(pTask, NULL) == TASK_STATUS__HALT);
// int8_t status = pTask->status.taskStatus;
// if (status != TASK_STATUS__HALT) {
// stError("s-task:%s not in halt status, status:%s", id, streamGetTaskStatusStr(status));
// return;
// }
// pTask->status.taskStatus = pTask->status.keepTaskStatus;
// pTask->status.keepTaskStatus = TASK_STATUS__READY;
streamTaskRestoreStatus(pTask);
streamTaskGetStatus(pTask, &p);
stDebug("s-task:%s resume from halt, current status:%s", id, p);
}
int32_t updateTaskReadyInMeta(SStreamTask* pTask) {
SStreamMeta* pMeta = pTask->pMeta;