refactor: do some internal refactor.
This commit is contained in:
parent
53377c2c1f
commit
13028744df
|
@ -1177,12 +1177,13 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
|||
tqDebug("s-task:%s set status to be dropping", pTask->id.idStr);
|
||||
|
||||
streamMetaSaveTask(pMeta, pTask);
|
||||
streamMetaSaveTask(pMeta, pStreamTask);
|
||||
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
// streamMetaRemoveTask(pMeta, pTask->id.taskId);
|
||||
|
||||
streamMetaReleaseTask(pMeta, pStreamTask);
|
||||
if (streamMetaCommit(pTask->pMeta) < 0) {
|
||||
|
||||
if (streamMetaCommit(pTask->pMeta) < 0) {
|
||||
// persist to disk
|
||||
}
|
||||
} else {
|
||||
// todo update the chkInfo version for current task.
|
||||
|
|
|
@ -313,6 +313,69 @@ int32_t updateCheckPointInfo(SStreamTask* pTask) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static void waitForTaskTobeIdle(SStreamTask* pTask, SStreamTask* pStreamTask) {
|
||||
// wait for the stream task to be idle
|
||||
int64_t st = taosGetTimestampMs();
|
||||
|
||||
while (!streamTaskIsIdle(pStreamTask)) {
|
||||
qDebug("s-task:%s level:%d wait for stream task:%s to be idle, check again in 100ms", pTask->id.idStr,
|
||||
pTask->info.taskLevel, pStreamTask->id.idStr);
|
||||
taosMsleep(100);
|
||||
}
|
||||
|
||||
double el = (taosGetTimestampMs() - st) / 1000.0;
|
||||
if (el > 0) {
|
||||
qDebug("s-task:%s wait for stream task:%s for %.2fs to execute all data in inputQ", pTask->id.idStr,
|
||||
pStreamTask->id.idStr, el);
|
||||
}
|
||||
}
|
||||
|
||||
static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
|
||||
SStreamTask* pStreamTask = streamMetaAcquireTask(pTask->pMeta, pTask->streamTaskId.taskId);
|
||||
qDebug("s-task:%s scan history task end, update stream task:%s info and launch it", pTask->id.idStr, pStreamTask->id.idStr);
|
||||
|
||||
// todo handle stream task is dropped here
|
||||
|
||||
ASSERT(pStreamTask != NULL && pStreamTask->historyTaskId.taskId == pTask->id.taskId);
|
||||
STimeWindow* pTimeWindow = &pStreamTask->dataRange.window;
|
||||
|
||||
// here we need to wait for the stream task handle all data in the input queue.
|
||||
if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
||||
ASSERT(pStreamTask->status.taskStatus == TASK_STATUS__HALT);
|
||||
} else {
|
||||
ASSERT(pStreamTask->status.taskStatus == TASK_STATUS__NORMAL);
|
||||
pStreamTask->status.taskStatus = TASK_STATUS__HALT;
|
||||
}
|
||||
|
||||
// wait for the stream task to be idle
|
||||
waitForTaskTobeIdle(pTask, pStreamTask);
|
||||
|
||||
if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
||||
// update the scan data range for source task.
|
||||
qDebug("s-task:%s level:%d stream task window %" PRId64 " - %" PRId64 " transfer to %" PRId64 " - %" PRId64
|
||||
", status:%s, sched-status:%d",
|
||||
pStreamTask->id.idStr, TASK_LEVEL__SOURCE, pTimeWindow->skey, pTimeWindow->ekey, INT64_MIN,
|
||||
pTimeWindow->ekey, streamGetTaskStatusStr(TASK_STATUS__NORMAL), pStreamTask->status.schedStatus);
|
||||
|
||||
// todo transfer state
|
||||
} else {
|
||||
// for sink tasks, they are continue to execute, no need to be halt.
|
||||
// the process should be stopped for a while, during the term of transfer task state.
|
||||
// OR wait for the inputQ && outputQ of agg tasks are all consumed, and then start the state transfer
|
||||
qDebug("s-task:%s no need to update time window, for non-source task", pStreamTask->id.idStr);
|
||||
|
||||
// todo transfer state
|
||||
}
|
||||
|
||||
// expand the query time window for stream scanner
|
||||
pTimeWindow->skey = INT64_MIN;
|
||||
|
||||
streamSetStatusNormal(pStreamTask);
|
||||
streamSchedExec(pStreamTask);
|
||||
streamMetaReleaseTask(pTask->pMeta, pStreamTask);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
/**
|
||||
* todo: the batch of blocks should be tuned dynamic, according to the total elapsed time of each batch of blocks, the
|
||||
* appropriate batch of blocks should be handled in 5 to 10 sec.
|
||||
|
@ -389,56 +452,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
|
|||
|
||||
if (pInput == NULL) {
|
||||
if (pTask->info.fillHistory && pTask->status.transferState) {
|
||||
// todo transfer task state here
|
||||
|
||||
SStreamTask* pStreamTask = streamMetaAcquireTask(pTask->pMeta, pTask->streamTaskId.taskId);
|
||||
qDebug("s-task:%s scan history task end, update stream task:%s info and launch it", pTask->id.idStr, pStreamTask->id.idStr);
|
||||
|
||||
ASSERT(pStreamTask != NULL && pStreamTask->historyTaskId.taskId == pTask->id.taskId);
|
||||
STimeWindow* pTimeWindow = &pStreamTask->dataRange.window;
|
||||
|
||||
// here we need to wait for the stream task handle all data in the input queue.
|
||||
if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
||||
ASSERT(pStreamTask->status.taskStatus == TASK_STATUS__HALT);
|
||||
} else {
|
||||
ASSERT(pStreamTask->status.taskStatus == TASK_STATUS__NORMAL);
|
||||
pStreamTask->status.taskStatus = TASK_STATUS__HALT;
|
||||
}
|
||||
|
||||
{// wait for the stream task to be idle
|
||||
while(!streamTaskIsIdle(pStreamTask)) {
|
||||
qDebug("s-task:%s level:%d wait for stream task:%s to be idle, check again in 100ms", pTask->id.idStr,
|
||||
pTask->info.taskLevel, pStreamTask->id.idStr);
|
||||
taosMsleep(100);
|
||||
}
|
||||
}
|
||||
|
||||
if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
||||
// update the scan data range for source task.
|
||||
qDebug("s-task:%s level:%d stream task window %" PRId64 " - %" PRId64 " transfer to %" PRId64 " - %" PRId64
|
||||
", status:%s, sched-status:%d",
|
||||
pStreamTask->id.idStr, TASK_LEVEL__SOURCE, pTimeWindow->skey, pTimeWindow->ekey, INT64_MIN,
|
||||
pTimeWindow->ekey, streamGetTaskStatusStr(TASK_STATUS__NORMAL), pStreamTask->status.schedStatus);
|
||||
} else {
|
||||
// for sink tasks, they are continue to execute, no need to be halt.
|
||||
// the process should be stopped for a while, during the term of transfer task state.
|
||||
// OR wait for the inputQ && outputQ of agg tasks are all consumed, and then start the state transfer
|
||||
|
||||
|
||||
qDebug("s-task:%s no need to update time window, for non-source task", pStreamTask->id.idStr);
|
||||
}
|
||||
|
||||
// expand the query time window for stream scanner
|
||||
pTimeWindow->skey = INT64_MIN;
|
||||
|
||||
streamSetStatusNormal(pStreamTask);
|
||||
streamMetaSaveTask(pTask->pMeta, pStreamTask);
|
||||
if (streamMetaCommit(pTask->pMeta)) {
|
||||
// persistent to disk
|
||||
}
|
||||
|
||||
streamSchedExec(pStreamTask);
|
||||
streamMetaReleaseTask(pTask->pMeta, pStreamTask);
|
||||
int32_t code = streamTransferStateToStreamTask(pTask);
|
||||
}
|
||||
|
||||
break;
|
||||
|
|
Loading…
Reference in New Issue