refactor(stream): refactor the pause/resume for fill history execution.
This commit is contained in:
parent
7f3d1dc464
commit
d5974a8f25
|
@ -45,7 +45,7 @@ enum {
|
||||||
TASK_STATUS__FAIL,
|
TASK_STATUS__FAIL,
|
||||||
TASK_STATUS__STOP,
|
TASK_STATUS__STOP,
|
||||||
TASK_STATUS__SCAN_HISTORY, // stream task scan history data by using tsdbread in the stream scanner
|
TASK_STATUS__SCAN_HISTORY, // stream task scan history data by using tsdbread in the stream scanner
|
||||||
TASK_STATUS__SCAN_HISTORY_WAL, // scan history data in wal
|
// TASK_STATUS__SCAN_HISTORY_WAL, // scan history data in wal
|
||||||
TASK_STATUS__HALT, // pause, but not be manipulated by user command
|
TASK_STATUS__HALT, // pause, but not be manipulated by user command
|
||||||
TASK_STATUS__PAUSE, // pause
|
TASK_STATUS__PAUSE, // pause
|
||||||
};
|
};
|
||||||
|
|
|
@ -1067,7 +1067,6 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
|
||||||
// 3. It's an fill history task, do nothing. wait for the main task to start it
|
// 3. It's an fill history task, do nothing. wait for the main task to start it
|
||||||
SStreamTask* p = streamMetaAcquireTask(pStreamMeta, taskId);
|
SStreamTask* p = streamMetaAcquireTask(pStreamMeta, taskId);
|
||||||
if (p != NULL) { // reset the downstreamReady flag.
|
if (p != NULL) { // reset the downstreamReady flag.
|
||||||
p->status.downstreamReady = 0;
|
|
||||||
streamTaskCheckDownstreamTasks(p);
|
streamTaskCheckDownstreamTasks(p);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1076,12 +1075,10 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
SStreamScanHistoryReq* pReq = (SStreamScanHistoryReq*)pMsg->pCont;
|
||||||
char* msg = pMsg->pCont;
|
|
||||||
|
|
||||||
SStreamMeta* pMeta = pTq->pStreamMeta;
|
SStreamMeta* pMeta = pTq->pStreamMeta;
|
||||||
SStreamScanHistoryReq* pReq = (SStreamScanHistoryReq*)msg;
|
|
||||||
|
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->taskId);
|
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->taskId);
|
||||||
if (pTask == NULL) {
|
if (pTask == NULL) {
|
||||||
tqError("vgId:%d failed to acquire stream task:0x%x during stream recover, task may have been destroyed",
|
tqError("vgId:%d failed to acquire stream task:0x%x during stream recover, task may have been destroyed",
|
||||||
|
@ -1089,12 +1086,14 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
// do recovery step 1
|
// do recovery step1
|
||||||
const char* id = pTask->id.idStr;
|
const char* id = pTask->id.idStr;
|
||||||
const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus);
|
const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus);
|
||||||
tqDebug("s-task:%s start history data scan stage(step 1), status:%s", id, pStatus);
|
tqDebug("s-task:%s start scan-history stage(step 1), status:%s", id, pStatus);
|
||||||
|
|
||||||
int64_t st = taosGetTimestampMs();
|
if (pTask->tsInfo.step1Start == 0) {
|
||||||
|
pTask->tsInfo.step1Start = taosGetTimestampMs();
|
||||||
|
}
|
||||||
|
|
||||||
// we have to continue retrying to successfully execute the scan history task.
|
// we have to continue retrying to successfully execute the scan history task.
|
||||||
int8_t schedStatus = atomic_val_compare_exchange_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE,
|
int8_t schedStatus = atomic_val_compare_exchange_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE,
|
||||||
|
@ -1108,30 +1107,21 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
ASSERT(pTask->status.pauseAllowed == false);
|
ASSERT(pTask->status.pauseAllowed == false);
|
||||||
|
|
||||||
if (pTask->info.fillHistory == 1) {
|
if (pTask->info.fillHistory == 1) {
|
||||||
streamTaskEnablePause(pTask);
|
streamTaskEnablePause(pTask);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!streamTaskRecoverScanStep1Finished(pTask)) {
|
streamSourceScanHistoryData(pTask);
|
||||||
streamSourceScanHistoryData(pTask);
|
if (pTask->status.taskStatus == TASK_STATUS__PAUSE) {
|
||||||
}
|
double el = taosGetTimestampMs() - pTask->tsInfo.step1Start;
|
||||||
|
tqDebug("s-task:%s is paused in the step1, elapsed time:%.2fs, sched-status:%d", pTask->id.idStr, el,
|
||||||
// disable the pause when handling the step2 scan of tsdb data.
|
TASK_SCHED_STATUS__INACTIVE);
|
||||||
// the whole next procedure cann't be stopped.
|
|
||||||
// todo fix it: the following procedure should be executed completed and then shutdown when trying to close vnode.
|
|
||||||
if (pTask->info.fillHistory == 1) {
|
|
||||||
streamTaskDisablePause(pTask);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (streamTaskShouldStop(&pTask->status) || streamTaskShouldPause(&pTask->status)) {
|
|
||||||
tqDebug("s-task:%s is dropped or paused, abort recover in step1", id);
|
|
||||||
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
|
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
double el = (taosGetTimestampMs() - st) / 1000.0;
|
// the following procedure should be executed, no matter status is stop/pause or not
|
||||||
|
double el = (taosGetTimestampMs() - pTask->tsInfo.step1Start) / 1000.0;
|
||||||
tqDebug("s-task:%s scan-history stage(step 1) ended, elapsed time:%.2fs", id, el);
|
tqDebug("s-task:%s scan-history stage(step 1) ended, elapsed time:%.2fs", id, el);
|
||||||
|
|
||||||
if (pTask->info.fillHistory) {
|
if (pTask->info.fillHistory) {
|
||||||
|
@ -1139,77 +1129,72 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
SStreamTask* pStreamTask = NULL;
|
SStreamTask* pStreamTask = NULL;
|
||||||
bool done = false;
|
bool done = false;
|
||||||
|
|
||||||
if (!pReq->igUntreated && !streamTaskRecoverScanStep1Finished(pTask)) {
|
// if (!pReq->igUntreated && !streamTaskRecoverScanStep1Finished(pTask)) {
|
||||||
// 1. stop the related stream task, get the current scan wal version of stream task, ver.
|
// 1. stop the related stream task, get the current scan wal version of stream task, ver.
|
||||||
pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.taskId);
|
pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.taskId);
|
||||||
if (pStreamTask == NULL) {
|
if (pStreamTask == NULL) {
|
||||||
qError("failed to find s-task:0x%x, it may have been destroyed, drop fill-history task:%s",
|
// todo delete this task, if the related stream task is dropped
|
||||||
pTask->streamTaskId.taskId, pTask->id.idStr);
|
qError("failed to find s-task:0x%x, it may have been destroyed, drop fill-history task:%s",
|
||||||
|
pTask->streamTaskId.taskId, pTask->id.idStr);
|
||||||
|
|
||||||
pTask->status.taskStatus = TASK_STATUS__DROPPING;
|
pTask->status.taskStatus = TASK_STATUS__DROPPING;
|
||||||
tqDebug("s-task:%s fill-history task set status to be dropping", id);
|
tqDebug("s-task:%s fill-history task set status to be dropping", id);
|
||||||
|
|
||||||
streamMetaSaveTask(pMeta, pTask);
|
streamMetaSaveTask(pMeta, pTask);
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
|
||||||
|
|
||||||
ASSERT(pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE);
|
|
||||||
|
|
||||||
// stream task in TASK_STATUS__SCAN_HISTORY can not be paused.
|
|
||||||
// wait for the stream task get ready for scan history data
|
|
||||||
while (pStreamTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) {
|
|
||||||
tqDebug(
|
|
||||||
"s-task:%s level:%d related stream task:%s(status:%s) not ready for halt, wait for it and recheck in 100ms",
|
|
||||||
id, pTask->info.taskLevel, pStreamTask->id.idStr, streamGetTaskStatusStr(pStreamTask->status.taskStatus));
|
|
||||||
taosMsleep(100);
|
|
||||||
}
|
|
||||||
|
|
||||||
// now we can stop the stream task execution
|
|
||||||
streamTaskHalt(pStreamTask);
|
|
||||||
tqDebug("s-task:%s level:%d sched-status:%d is halt by fill-history task:%s", pStreamTask->id.idStr,
|
|
||||||
pStreamTask->info.taskLevel, pStreamTask->status.schedStatus, id);
|
|
||||||
|
|
||||||
// if it's an source task, extract the last version in wal.
|
|
||||||
pRange = &pTask->dataRange.range;
|
|
||||||
int64_t latestVer = walReaderGetCurrentVer(pStreamTask->exec.pWalReader);
|
|
||||||
done = streamHistoryTaskSetVerRangeStep2(pTask, latestVer);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ASSERT(pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE);
|
||||||
|
|
||||||
|
// stream task in TASK_STATUS__SCAN_HISTORY can not be paused.
|
||||||
|
// wait for the stream task get ready for scan history data
|
||||||
|
while (pStreamTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) {
|
||||||
|
tqDebug(
|
||||||
|
"s-task:%s level:%d related stream task:%s(status:%s) not ready for halt, wait for it and recheck in 100ms",
|
||||||
|
id, pTask->info.taskLevel, pStreamTask->id.idStr, streamGetTaskStatusStr(pStreamTask->status.taskStatus));
|
||||||
|
taosMsleep(100);
|
||||||
|
}
|
||||||
|
|
||||||
|
// now we can stop the stream task execution
|
||||||
|
streamTaskHalt(pStreamTask);
|
||||||
|
tqDebug("s-task:%s level:%d sched-status:%d is halt by fill-history task:%s", pStreamTask->id.idStr,
|
||||||
|
pStreamTask->info.taskLevel, pStreamTask->status.schedStatus, id);
|
||||||
|
|
||||||
|
// if it's an source task, extract the last version in wal.
|
||||||
|
pRange = &pTask->dataRange.range;
|
||||||
|
int64_t latestVer = walReaderGetCurrentVer(pStreamTask->exec.pWalReader);
|
||||||
|
done = streamHistoryTaskSetVerRangeStep2(pTask, latestVer);
|
||||||
|
|
||||||
if (done) {
|
if (done) {
|
||||||
pTask->tsInfo.step2Start = taosGetTimestampMs();
|
pTask->tsInfo.step2Start = taosGetTimestampMs();
|
||||||
streamTaskEndScanWAL(pTask);
|
streamTaskEndScanWAL(pTask);
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
} else {
|
} else {
|
||||||
if (!streamTaskRecoverScanStep1Finished(pTask)) {
|
STimeWindow* pWindow = &pTask->dataRange.window;
|
||||||
STimeWindow* pWindow = &pTask->dataRange.window;
|
tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64 " window:%" PRId64 "-%" PRId64
|
||||||
tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64 " window:%" PRId64 "-%" PRId64
|
", do secondary scan-history from WAL after halt the related stream task:%s",
|
||||||
", do secondary scan-history from WAL after halt the related stream task:%s",
|
id, pTask->info.taskLevel, pRange->minVer, pRange->maxVer, pWindow->skey, pWindow->ekey,
|
||||||
id, pTask->info.taskLevel, pRange->minVer, pRange->maxVer, pWindow->skey, pWindow->ekey, id);
|
pStreamTask->id.idStr);
|
||||||
ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING);
|
ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING);
|
||||||
|
|
||||||
pTask->tsInfo.step2Start = taosGetTimestampMs();
|
pTask->tsInfo.step2Start = taosGetTimestampMs();
|
||||||
streamSetParamForStreamScannerStep2(pTask, pRange, pWindow);
|
streamSetParamForStreamScannerStep2(pTask, pRange, pWindow);
|
||||||
}
|
|
||||||
|
|
||||||
if (!streamTaskRecoverScanStep2Finished(pTask)) {
|
int64_t dstVer = pTask->dataRange.range.minVer - 1;
|
||||||
pTask->status.taskStatus = TASK_STATUS__SCAN_HISTORY_WAL;
|
|
||||||
if (streamTaskShouldStop(&pTask->status) || streamTaskShouldPause(&pTask->status)) {
|
|
||||||
tqDebug("s-task:%s is dropped or paused, abort recover in step1", id);
|
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int64_t dstVer = pTask->dataRange.range.minVer - 1;
|
pTask->chkInfo.currentVer = dstVer;
|
||||||
|
walReaderSetSkipToVersion(pTask->exec.pWalReader, dstVer);
|
||||||
pTask->chkInfo.currentVer = dstVer;
|
tqDebug("s-task:%s wal reader start scan WAL verRange:%" PRId64 "-%" PRId64 ", set sched-status:%d", id, dstVer,
|
||||||
walReaderSetSkipToVersion(pTask->exec.pWalReader, dstVer);
|
pTask->dataRange.range.maxVer, TASK_SCHED_STATUS__INACTIVE);
|
||||||
tqDebug("s-task:%s wal reader start scan from WAL ver:%" PRId64 ", set sched-status:%d", id, dstVer,
|
|
||||||
TASK_SCHED_STATUS__INACTIVE);
|
|
||||||
}
|
|
||||||
|
|
||||||
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
|
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
|
||||||
|
|
||||||
|
// set the fill-history task to be normal
|
||||||
|
if (pTask->info.fillHistory == 1) {
|
||||||
|
streamSetStatusNormal(pTask);
|
||||||
|
}
|
||||||
|
|
||||||
// 4. 1) transfer the ownership of executor state, 2) update the scan data range for source task.
|
// 4. 1) transfer the ownership of executor state, 2) update the scan data range for source task.
|
||||||
// 5. resume the related stream task.
|
// 5. resume the related stream task.
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
|
@ -1226,7 +1211,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
if (pTask->historyTaskId.taskId == 0) {
|
if (pTask->historyTaskId.taskId == 0) {
|
||||||
*pWindow = (STimeWindow){INT64_MIN, INT64_MAX};
|
*pWindow = (STimeWindow){INT64_MIN, INT64_MAX};
|
||||||
tqDebug(
|
tqDebug(
|
||||||
"s-task:%s scanhistory in stream time window completed, no related fill-history task, reset the time "
|
"s-task:%s scan-history in stream time window completed, no related fill-history task, reset the time "
|
||||||
"window:%" PRId64 " - %" PRId64,
|
"window:%" PRId64 " - %" PRId64,
|
||||||
id, pWindow->skey, pWindow->ekey);
|
id, pWindow->skey, pWindow->ekey);
|
||||||
qResetStreamInfoTimeWindow(pTask->exec.pExecutor);
|
qResetStreamInfoTimeWindow(pTask->exec.pExecutor);
|
||||||
|
@ -1422,7 +1407,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
if (pTask != NULL) {
|
if (pTask != NULL) {
|
||||||
// even in halt status, the data in inputQ must be processed
|
// even in halt status, the data in inputQ must be processed
|
||||||
int8_t st = pTask->status.taskStatus;
|
int8_t st = pTask->status.taskStatus;
|
||||||
if (st == TASK_STATUS__NORMAL || st == TASK_STATUS__SCAN_HISTORY || st == TASK_STATUS__SCAN_HISTORY_WAL) {
|
if (st == TASK_STATUS__NORMAL || st == TASK_STATUS__SCAN_HISTORY/* || st == TASK_STATUS__SCAN_HISTORY_WAL*/) {
|
||||||
tqDebug("vgId:%d s-task:%s start to process block from inputQ, last chk point:%" PRId64, vgId, pTask->id.idStr,
|
tqDebug("vgId:%d s-task:%s start to process block from inputQ, last chk point:%" PRId64, vgId, pTask->id.idStr,
|
||||||
pTask->chkInfo.version);
|
pTask->chkInfo.version);
|
||||||
streamProcessRunReq(pTask);
|
streamProcessRunReq(pTask);
|
||||||
|
@ -1559,7 +1544,7 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion,
|
||||||
vgId, pTask->id.idStr, pTask->chkInfo.currentVer, sversion, pTask->status.schedStatus);
|
vgId, pTask->id.idStr, pTask->chkInfo.currentVer, sversion, pTask->status.schedStatus);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (level == TASK_LEVEL__SOURCE && pTask->info.fillHistory) {
|
if (level == TASK_LEVEL__SOURCE && pTask->info.fillHistory && pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) {
|
||||||
streamStartRecoverTask(pTask, igUntreated);
|
streamStartRecoverTask(pTask, igUntreated);
|
||||||
} else if (level == TASK_LEVEL__SOURCE && (taosQueueItemSize(pTask->inputQueue->queue) == 0)) {
|
} else if (level == TASK_LEVEL__SOURCE && (taosQueueItemSize(pTask->inputQueue->queue) == 0)) {
|
||||||
tqStartStreamTasks(pTq);
|
tqStartStreamTasks(pTq);
|
||||||
|
|
|
@ -211,7 +211,7 @@ int32_t doSetOffsetForWalReader(SStreamTask *pTask, int32_t vgId) {
|
||||||
|
|
||||||
static void checkForFillHistoryVerRange(SStreamTask* pTask, int64_t ver) {
|
static void checkForFillHistoryVerRange(SStreamTask* pTask, int64_t ver) {
|
||||||
if ((pTask->info.fillHistory == 1) && ver > pTask->dataRange.range.maxVer) {
|
if ((pTask->info.fillHistory == 1) && ver > pTask->dataRange.range.maxVer) {
|
||||||
qWarn("s-task:%s fill-history scan WAL, currentVer:%" PRId64 "reach the maximum ver:%" PRId64
|
qWarn("s-task:%s fill-history scan WAL, currentVer:%" PRId64 " reach the maximum ver:%" PRId64
|
||||||
", not scan wal anymore, set the transfer state flag",
|
", not scan wal anymore, set the transfer state flag",
|
||||||
pTask->id.idStr, ver, pTask->dataRange.range.maxVer);
|
pTask->id.idStr, ver, pTask->dataRange.range.maxVer);
|
||||||
pTask->status.transferState = true;
|
pTask->status.transferState = true;
|
||||||
|
@ -256,14 +256,15 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (status != TASK_STATUS__NORMAL && status != TASK_STATUS__SCAN_HISTORY_WAL) {
|
if (status != TASK_STATUS__NORMAL/* && status != TASK_STATUS__SCAN_HISTORY_WAL*/) {
|
||||||
tqDebug("s-task:%s not ready for new submit block from wal, status:%s", pTask->id.idStr, streamGetTaskStatusStr(status));
|
tqDebug("s-task:%s not ready for new submit block from wal, status:%s", pTask->id.idStr, streamGetTaskStatusStr(status));
|
||||||
streamMetaReleaseTask(pStreamMeta, pTask);
|
streamMetaReleaseTask(pStreamMeta, pTask);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((pTask->info.fillHistory == 1) && pTask->status.transferState) {
|
if ((pTask->info.fillHistory == 1) && pTask->status.transferState) {
|
||||||
ASSERT(status == TASK_STATUS__SCAN_HISTORY_WAL);
|
// ASSERT(status == TASK_STATUS__SCAN_HISTORY_WAL);
|
||||||
|
ASSERT(status == TASK_STATUS__NORMAL);
|
||||||
// the maximum version of data in the WAL has reached already, the step2 is done
|
// the maximum version of data in the WAL has reached already, the step2 is done
|
||||||
tqDebug("s-task:%s fill-history reach the maximum ver:%" PRId64 ", not scan wal anymore", pTask->id.idStr,
|
tqDebug("s-task:%s fill-history reach the maximum ver:%" PRId64 ", not scan wal anymore", pTask->id.idStr,
|
||||||
pTask->dataRange.range.maxVer);
|
pTask->dataRange.range.maxVer);
|
||||||
|
|
|
@ -615,7 +615,7 @@ int32_t streamTryExec(SStreamTask* pTask) {
|
||||||
// todo the task should be commit here
|
// todo the task should be commit here
|
||||||
if (taosQueueEmpty(pTask->inputQueue->queue)) {
|
if (taosQueueEmpty(pTask->inputQueue->queue)) {
|
||||||
// fill-history WAL scan has completed
|
// fill-history WAL scan has completed
|
||||||
if (pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY_WAL && pTask->status.transferState == true) {
|
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE && pTask->status.transferState == true) {
|
||||||
streamTaskRecoverSetAllStepFinished(pTask);
|
streamTaskRecoverSetAllStepFinished(pTask);
|
||||||
streamTaskEndScanWAL(pTask);
|
streamTaskEndScanWAL(pTask);
|
||||||
} else {
|
} else {
|
||||||
|
|
Loading…
Reference in New Issue