fix(stream): fix error during transferring executor state, while a task is not in normal status.

This commit is contained in:
Haojun Liao 2023-07-06 18:34:01 +08:00
parent 85782bbff9
commit 4f814db5d5
1 changed files with 12 additions and 5 deletions

View File

@ -351,9 +351,13 @@ static void waitForTaskIdle(SStreamTask* pTask, SStreamTask* pStreamTask) {
static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
SStreamTask* pStreamTask = streamMetaAcquireTask(pTask->pMeta, pTask->streamTaskId.taskId);
if (pStreamTask == NULL) {
qError("s-task:%s failed to find related stream task:0x%x, it may have been destoryed or closed",
pTask->id.idStr, pTask->streamTaskId.taskId);
return TSDB_CODE_STREAM_TASK_NOT_EXIST;
} else {
qDebug("s-task:%s scan history task end, update stream task:%s info, transfer exec state", pTask->id.idStr, pStreamTask->id.idStr);
// todo handle stream task is dropped here
}
ASSERT(pStreamTask != NULL && pStreamTask->historyTaskId.taskId == pTask->id.taskId);
STimeWindow* pTimeWindow = &pStreamTask->dataRange.window;
@ -473,6 +477,9 @@ int32_t streamExecForAll(SStreamTask* pTask) {
ASSERT(batchSize == 0);
if (pTask->info.fillHistory && pTask->status.transferState) {
int32_t code = streamTransferStateToStreamTask(pTask);
if (code != TSDB_CODE_SUCCESS) { // todo handle this
return 0;
}
}
break;
@ -564,7 +571,7 @@ int32_t streamTryExec(SStreamTask* pTask) {
if (schedStatus == TASK_SCHED_STATUS__WAITING) {
int32_t code = streamExecForAll(pTask);
if (code < 0) {
if (code < 0) { // todo this status shoudl be removed
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__FAILED);
return -1;
}