refactor: do some internal refactor.

This commit is contained in:
Haojun Liao 2023-07-20 16:31:46 +08:00
parent 45cb478b31
commit c9fa170e65
3 changed files with 35 additions and 74 deletions

View File

@ -1066,8 +1066,8 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
// do recovery step 1
const char* id = pTask->id.idStr;
tqDebug("s-task:%s start history data scan stage(step 1), status:%s", id,
streamGetTaskStatusStr(pTask->status.taskStatus));
const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus);
tqDebug("s-task:%s start history data scan stage(step 1), status:%s", id, pStatus);
int64_t st = taosGetTimestampMs();
int8_t schedStatus = atomic_val_compare_exchange_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE,
@ -1112,6 +1112,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
ASSERT(pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE);
// todo remove this
// wait for the stream task get ready for scan history data
while (((pStreamTask->status.downstreamReady == 0) && (pStreamTask->status.taskStatus != TASK_STATUS__STOP)) ||
pStreamTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) {
@ -1168,20 +1169,8 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
// 5. resume the related stream task.
streamTryExec(pTask);
pTask->status.taskStatus = TASK_STATUS__DROPPING;
tqDebug("s-task:%s scan-history-task set status to be dropping", id);
streamMetaSaveTask(pMeta, pTask);
streamMetaSaveTask(pMeta, pStreamTask);
streamMetaReleaseTask(pMeta, pTask);
streamMetaReleaseTask(pMeta, pStreamTask);
taosWLockLatch(&pMeta->lock);
if (streamMetaCommit(pTask->pMeta) < 0) {
// persist to disk
}
taosWUnLockLatch(&pMeta->lock);
} else {
// todo update the chkInfo version for current task.
// this task has an associated history stream task, so we need to scan wal from the end version of
@ -1241,22 +1230,9 @@ int32_t tqProcessTaskTransferStateReq(STQ* pTq, SRpcMsg* pMsg) {
}
// transfer the ownership of executor state
tqDebug("s-task:%s all upstream tasks end transfer msg", pTask->id.idStr);
tqDebug("s-task:%s all upstream tasks send transfer msg, open transfer state flag", pTask->id.idStr);
ASSERT(pTask->streamTaskId.taskId != 0 && pTask->info.fillHistory == 1);
// related stream task load the state from the state storage backend
SStreamTask* pStreamTask = streamMetaAcquireTask(pTq->pStreamMeta, pTask->streamTaskId.taskId);
if (pStreamTask == NULL) {
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
tqError("failed to find related stream task:0x%x, it may have been dropped already", req.taskId);
return -1;
}
// when all upstream tasks have notified the this task to start transfer state, then we start the transfer procedure.
streamTaskReleaseState(pTask);
streamTaskReloadState(pStreamTask);
streamMetaReleaseTask(pTq->pStreamMeta, pStreamTask);
ASSERT(pTask->streamTaskId.taskId != 0);
pTask->status.transferState = true;
streamSchedExec(pTask);
@ -1366,7 +1342,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
if (pTask != NULL) {
// even in halt status, the data in inputQ must be processed
int8_t status = pTask->status.taskStatus;
if (status == TASK_STATUS__NORMAL || status == TASK_STATUS__HALT) {
if (status == TASK_STATUS__NORMAL || status == TASK_STATUS__HALT || status == TASK_STATUS__SCAN_HISTORY) {
tqDebug("vgId:%d s-task:%s start to process block from inputQ, last chk point:%" PRId64, vgId, pTask->id.idStr,
pTask->chkInfo.version);
streamProcessRunReq(pTask);

View File

@ -351,30 +351,36 @@ static void waitForTaskIdle(SStreamTask* pTask, SStreamTask* pStreamTask) {
}
static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
SStreamTask* pStreamTask = streamMetaAcquireTask(pTask->pMeta, pTask->streamTaskId.taskId);
SStreamMeta* pMeta = pTask->pMeta;
SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.taskId);
if (pStreamTask == NULL) {
qError("s-task:%s failed to find related stream task:0x%x, it may have been destroyed or closed",
pTask->id.idStr, pTask->streamTaskId.taskId);
pTask->status.transferState = false; // reset this value, to avoid transfer state again
qError("s-task:%s failed to find related stream task:0x%x, it may have been destroyed or closed", pTask->id.idStr,
pTask->streamTaskId.taskId);
return TSDB_CODE_STREAM_TASK_NOT_EXIST;
} else {
qDebug("s-task:%s fill-history task end, update related stream task:%s info, transfer exec state", pTask->id.idStr,
pStreamTask->id.idStr);
}
ASSERT(pStreamTask != NULL && pStreamTask->historyTaskId.taskId == pTask->id.taskId);
ASSERT(pStreamTask->historyTaskId.taskId == pTask->id.taskId && pTask->status.transferState == true);
STimeWindow* pTimeWindow = &pStreamTask->dataRange.window;
// It must be halted for a source stream task, since when the related scan-history-data task start scan the history
// for the step 2. For a agg task
int8_t status = pStreamTask->status.taskStatus;
if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) {
ASSERT(pStreamTask->status.taskStatus == TASK_STATUS__HALT);
ASSERT(status == TASK_STATUS__HALT);
} else {
ASSERT(pStreamTask->status.taskStatus == TASK_STATUS__NORMAL);
ASSERT(status == TASK_STATUS__SCAN_HISTORY);
pStreamTask->status.taskStatus = TASK_STATUS__HALT;
qDebug("s-task:%s status: halt by related fill history task:%s", pStreamTask->id.idStr, pTask->id.idStr);
qDebug("s-task:%s halt by related fill history task:%s", pStreamTask->id.idStr, pTask->id.idStr);
}
// wait for the stream task to be idle
// wait for the stream task to handle all in the inputQ, and to be idle
waitForTaskIdle(pTask, pStreamTask);
// In case of sink tasks, no need to be halted for them.
@ -399,10 +405,23 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
streamTaskReleaseState(pTask);
streamTaskReloadState(pStreamTask);
// reset the status of stream task
streamSetStatusNormal(pStreamTask);
pTask->status.taskStatus = TASK_STATUS__DROPPING;
qDebug("s-task:%s fill-history task set status to be dropping, save the state into disk", pTask->id.idStr);
// save to disk
taosWLockLatch(&pMeta->lock);
streamMetaSaveTask(pMeta, pTask);
streamMetaSaveTask(pMeta, pStreamTask);
if (streamMetaCommit(pMeta) < 0) {
// persist to disk
}
taosWUnLockLatch(&pMeta->lock);
streamSchedExec(pStreamTask);
streamMetaReleaseTask(pTask->pMeta, pStreamTask);
streamMetaReleaseTask(pMeta, pStreamTask);
return TSDB_CODE_SUCCESS;
}
@ -480,7 +499,6 @@ int32_t streamExecForAll(SStreamTask* pTask) {
ASSERT(batchSize == 0);
if (pTask->info.fillHistory && pTask->status.transferState) {
int32_t code = streamTransferStateToStreamTask(pTask);
pTask->status.transferState = false; // reset this value, to avoid transfer state again
if (code != TSDB_CODE_SUCCESS) { // todo handle this
return 0;
}

View File

@ -62,10 +62,6 @@ const char* streamGetTaskStatusStr(int32_t status) {
static int32_t doLaunchScanHistoryTask(SStreamTask* pTask) {
SVersionRange* pRange = &pTask->dataRange.range;
// qDebug("s-task:%s vgId:%d status:%s, start scan-history-data task, verRange:%" PRId64 " - %" PRId64, pTask->id.idStr,
// pTask->info.nodeId, streamGetTaskStatusStr(pTask->status.taskStatus), pRange->minVer, pRange->maxVer);
//
streamSetParamForScanHistory(pTask);
streamSetParamForStreamScannerStep1(pTask, pRange, &pTask->dataRange.window);
@ -84,12 +80,10 @@ int32_t streamTaskLaunchScanHistory(SStreamTask* pTask) {
walReaderGetCurrentVer(pTask->exec.pWalReader));
}
} else if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
streamSetStatusNormal(pTask);
streamSetParamForScanHistory(pTask);
streamAggScanHistoryPrepare(pTask);
} else if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
streamSetStatusNormal(pTask);
qDebug("s-task:%s sink task convert to normal immediately", pTask->id.idStr);
qDebug("s-task:%s sink task do nothing to handle scan-history", pTask->id.idStr);
}
return 0;
@ -145,7 +139,6 @@ int32_t streamTaskDoCheckDownstreamTasks(SStreamTask* pTask) {
streamTaskSetForReady(pTask, 0);
streamTaskSetRangeStreamCalc(pTask);
streamTaskLaunchScanHistory(pTask);
launchFillHistoryTask(pTask);
}
@ -759,32 +752,6 @@ void streamTaskCheckDownstreamTasks(SStreamTask* pTask) {
return;
}
// calculate the correct start time window, and start the handle the history data for the main task.
/* if (pTask->historyTaskId.taskId != 0) {
// check downstream tasks for associated scan-history-data tasks
streamLaunchFillHistoryTask(pTask);
// launch current task
SHistDataRange* pRange = &pTask->dataRange;
int64_t ekey = pRange->window.ekey + 1;
int64_t ver = pRange->range.minVer;
pRange->window.skey = ekey;
pRange->window.ekey = INT64_MAX;
pRange->range.minVer = 0;
pRange->range.maxVer = ver;
qDebug("s-task:%s level:%d fill-history task exists, update stream time window:%" PRId64 " - %" PRId64
", ver range:%" PRId64 " - %" PRId64,
pTask->id.idStr, pTask->info.taskLevel, pRange->window.skey, pRange->window.ekey, pRange->range.minVer,
pRange->range.maxVer);
} else {
SHistDataRange* pRange = &pTask->dataRange;
qDebug("s-task:%s no associated scan-history task, stream time window:%" PRId64 " - %" PRId64 ", ver range:%" PRId64
" - %" PRId64,
pTask->id.idStr, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, pRange->range.maxVer);
}*/
ASSERT(pTask->status.downstreamReady == 0);
// check downstream tasks for itself