fix(stream): transfer the state for agg tasks.
This commit is contained in:
parent
d43cb3fcdf
commit
3bcc7ed83e
|
@ -293,32 +293,12 @@ static void waitForTaskIdle(SStreamTask* pTask, SStreamTask* pStreamTask) {
|
|||
}
|
||||
}
|
||||
|
||||
static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
if (!pTask->status.transferState) {
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t level = pTask->info.taskLevel;
|
||||
if (level == TASK_LEVEL__SOURCE) {
|
||||
streamTaskFillHistoryFinished(pTask);
|
||||
streamTaskEndScanWAL(pTask);
|
||||
} else if (level == TASK_LEVEL__AGG) { // do transfer task operator states.
|
||||
code = streamDoTransferStateToStreamTask(pTask);
|
||||
if (code != TSDB_CODE_SUCCESS) { // todo handle this
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
|
||||
SStreamMeta* pMeta = pTask->pMeta;
|
||||
|
||||
SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.taskId);
|
||||
if (pStreamTask == NULL) {
|
||||
// todo: destroy this task here
|
||||
// todo: destroy the fill-history task here
|
||||
qError("s-task:%s failed to find related stream task:0x%x, it may have been destroyed or closed", pTask->id.idStr,
|
||||
pTask->streamTaskId.taskId);
|
||||
return TSDB_CODE_STREAM_TASK_NOT_EXIST;
|
||||
|
@ -359,34 +339,36 @@ static int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
|
|||
qDebug("s-task:%s no need to update time window for non-source task", pStreamTask->id.idStr);
|
||||
}
|
||||
|
||||
// expand the query time window for stream scanner
|
||||
// 1. expand the query time window for stream task of WAL scanner
|
||||
pTimeWindow->skey = INT64_MIN;
|
||||
qStreamInfoResetTimewindowFilter(pStreamTask->exec.pExecutor);
|
||||
|
||||
// transfer the ownership of executor state
|
||||
// 2. transfer the ownership of executor state
|
||||
streamTaskReleaseState(pTask);
|
||||
streamTaskReloadState(pStreamTask);
|
||||
|
||||
// clear the link between fill-history task and stream task info
|
||||
// 3. clear the link between fill-history task and stream task info
|
||||
pStreamTask->historyTaskId.taskId = 0;
|
||||
|
||||
// 4. resume the state of stream task, after this function, the stream task will run immidately. But it can not be
|
||||
// pause, since the pause allowed attribute is not set yet.
|
||||
streamTaskResumeFromHalt(pStreamTask);
|
||||
|
||||
qDebug("s-task:%s fill-history task set status to be dropping, save the state into disk", pTask->id.idStr);
|
||||
int32_t taskId = pTask->id.taskId;
|
||||
|
||||
// free it and remove it from disk meta-store
|
||||
// 5. free it and remove fill-history task from disk meta-store
|
||||
streamMetaUnregisterTask(pMeta, taskId);
|
||||
|
||||
// save to disk
|
||||
// 6. save to disk
|
||||
taosWLockLatch(&pMeta->lock);
|
||||
|
||||
streamMetaSaveTask(pMeta, pStreamTask);
|
||||
if (streamMetaCommit(pMeta) < 0) {
|
||||
// persist to disk
|
||||
}
|
||||
taosWUnLockLatch(&pMeta->lock);
|
||||
|
||||
// pause allowed
|
||||
// 7. pause allowed.
|
||||
streamTaskEnablePause(pStreamTask);
|
||||
|
||||
streamSchedExec(pStreamTask);
|
||||
|
@ -394,6 +376,25 @@ static int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
if (!pTask->status.transferState) {
|
||||
return code;
|
||||
}
|
||||
|
||||
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
||||
streamTaskFillHistoryFinished(pTask);
|
||||
streamTaskEndScanWAL(pTask);
|
||||
} else { // do transfer task operator states.
|
||||
code = streamDoTransferStateToStreamTask(pTask);
|
||||
if (code != TSDB_CODE_SUCCESS) { // todo handle this
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t extractMsgFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks,
|
||||
const char* id) {
|
||||
int32_t retryTimes = 0;
|
||||
|
@ -547,17 +548,16 @@ int32_t streamTaskEndScanWAL(SStreamTask* pTask) {
|
|||
double el = (taosGetTimestampMs() - pTask->tsInfo.step2Start) / 1000.0;
|
||||
qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, el);
|
||||
|
||||
// 3. notify downstream tasks to transfer executor state after handle all history blocks.
|
||||
pTask->status.transferState = true;
|
||||
|
||||
// 1. notify all downstream tasks to transfer executor state after handle all history blocks.
|
||||
int32_t code = streamDispatchTransferStateMsg(pTask);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
// todo handle error
|
||||
}
|
||||
|
||||
// the last execution of fill-history task, in order to transfer task operator states.
|
||||
code = streamTransferStateToStreamTask(pTask);
|
||||
if (code != TSDB_CODE_SUCCESS) { // todo handle this
|
||||
// 2. do transfer stream task operator states.
|
||||
pTask->status.transferState = true;
|
||||
code = streamDoTransferStateToStreamTask(pTask);
|
||||
if (code != TSDB_CODE_SUCCESS) { // todo handle error
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -582,9 +582,11 @@ int32_t streamTryExec(SStreamTask* pTask) {
|
|||
// todo the task should be commit here
|
||||
if (taosQueueEmpty(pTask->inputQueue->queue)) {
|
||||
// fill-history WAL scan has completed
|
||||
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE && pTask->status.transferState == true) {
|
||||
streamTaskFillHistoryFinished(pTask);
|
||||
streamTaskEndScanWAL(pTask);
|
||||
if (pTask->status.transferState) {
|
||||
code = streamTransferStateToStreamTask(pTask);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
} else {
|
||||
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
|
||||
qDebug("s-task:%s exec completed, status:%s, sched-status:%d", id, streamGetTaskStatusStr(pTask->status.taskStatus),
|
||||
|
|
Loading…
Reference in New Issue