refactor: do some internal refactor.
This commit is contained in:
parent
fb1e017363
commit
51e0b903b0
|
@ -21,6 +21,7 @@
|
||||||
#define MAX_STREAM_RESULT_DUMP_THRESHOLD 100
|
#define MAX_STREAM_RESULT_DUMP_THRESHOLD 100
|
||||||
|
|
||||||
static int32_t updateCheckPointInfo(SStreamTask* pTask);
|
static int32_t updateCheckPointInfo(SStreamTask* pTask);
|
||||||
|
static int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask);
|
||||||
|
|
||||||
bool streamTaskShouldStop(const SStreamStatus* pStatus) {
|
bool streamTaskShouldStop(const SStreamStatus* pStatus) {
|
||||||
int32_t status = atomic_load_8((int8_t*)&pStatus->taskStatus);
|
int32_t status = atomic_load_8((int8_t*)&pStatus->taskStatus);
|
||||||
|
@ -357,6 +358,26 @@ static void waitForTaskIdle(SStreamTask* pTask, SStreamTask* pStreamTask) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
|
static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
if (!pTask->status.transferState) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t level = pTask->info.taskLevel;
|
||||||
|
if (level == TASK_LEVEL__SOURCE) {
|
||||||
|
streamTaskFillHistoryFinished(pTask);
|
||||||
|
streamTaskEndScanWAL(pTask);
|
||||||
|
} else if (level == TASK_LEVEL__AGG) { // do transfer task operator states.
|
||||||
|
code = streamDoTransferStateToStreamTask(pTask);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) { // todo handle this
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
|
||||||
SStreamMeta* pMeta = pTask->pMeta;
|
SStreamMeta* pMeta = pTask->pMeta;
|
||||||
|
|
||||||
SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.taskId);
|
SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.taskId);
|
||||||
|
@ -621,12 +642,16 @@ int32_t streamTryExec(SStreamTask* pTask) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// todo the task should be commit here
|
||||||
// todo the task should be commit here
|
// todo the task should be commit here
|
||||||
if (taosQueueEmpty(pTask->inputQueue->queue)) {
|
if (taosQueueEmpty(pTask->inputQueue->queue)) {
|
||||||
// fill-history WAL scan has completed
|
// fill-history WAL scan has completed
|
||||||
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE && pTask->status.transferState == true) {
|
if (pTask->status.transferState) {
|
||||||
streamTaskRecoverSetAllStepFinished(pTask);
|
code = streamTransferStateToStreamTask(pTask);
|
||||||
streamTaskEndScanWAL(pTask);
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
streamSchedExec(pTask);
|
||||||
} else {
|
} else {
|
||||||
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
|
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
|
||||||
qDebug("s-task:%s exec completed, status:%s, sched-status:%d", id, streamGetTaskStatusStr(pTask->status.taskStatus),
|
qDebug("s-task:%s exec completed, status:%s, sched-status:%d", id, streamGetTaskStatusStr(pTask->status.taskStatus),
|
||||||
|
|
Loading…
Reference in New Issue