fix(stream): keep the original tsdb scan version range.
This commit is contained in:
parent
87dfc1f931
commit
1ac192c069
|
@ -445,6 +445,7 @@ struct SStreamTask {
|
||||||
SCheckpointInfo chkInfo;
|
SCheckpointInfo chkInfo;
|
||||||
STaskExec exec;
|
STaskExec exec;
|
||||||
SDataRange dataRange;
|
SDataRange dataRange;
|
||||||
|
SVersionRange step2Range;
|
||||||
SHistoryTaskInfo hTaskInfo;
|
SHistoryTaskInfo hTaskInfo;
|
||||||
STaskId streamTaskId;
|
STaskId streamTaskId;
|
||||||
STaskExecStatisInfo execInfo;
|
STaskExecStatisInfo execInfo;
|
||||||
|
|
|
@ -1541,7 +1541,12 @@ static int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SS
|
||||||
// history_task_id
|
// history_task_id
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
if (pe->hTaskId != 0) {
|
if (pe->hTaskId != 0) {
|
||||||
colDataSetVal(pColInfo, numOfRows, (const char*)&pe->hTaskId, false);
|
memset(idstr, 0, tListLen(idstr));
|
||||||
|
len = tintToHex(pe->hTaskId, &idstr[4]);
|
||||||
|
idstr[2] = '0';
|
||||||
|
idstr[3] = 'x';
|
||||||
|
varDataSetLen(idstr, len + 2);
|
||||||
|
colDataSetVal(pColInfo, numOfRows, idstr, false);
|
||||||
} else {
|
} else {
|
||||||
colDataSetVal(pColInfo, numOfRows, 0, true);
|
colDataSetVal(pColInfo, numOfRows, 0, true);
|
||||||
}
|
}
|
||||||
|
|
|
@ -800,33 +800,33 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
|
||||||
static void doStartFillhistoryStep2(SStreamTask* pTask, SStreamTask* pStreamTask, STQ* pTq) {
|
static void doStartFillhistoryStep2(SStreamTask* pTask, SStreamTask* pStreamTask, STQ* pTq) {
|
||||||
const char* id = pTask->id.idStr;
|
const char* id = pTask->id.idStr;
|
||||||
int64_t nextProcessedVer = pStreamTask->hTaskInfo.haltVer;
|
int64_t nextProcessedVer = pStreamTask->hTaskInfo.haltVer;
|
||||||
SVersionRange* pRange = &pTask->dataRange.range;
|
SVersionRange* pStep2Range = &pTask->step2Range;
|
||||||
|
|
||||||
// if it's an source task, extract the last version in wal.
|
// if it's an source task, extract the last version in wal.
|
||||||
bool done = streamHistoryTaskSetVerRangeStep2(pTask, nextProcessedVer);
|
bool done = streamHistoryTaskSetVerRangeStep2(pTask, nextProcessedVer);
|
||||||
pTask->execInfo.step2Start = taosGetTimestampMs();
|
pTask->execInfo.step2Start = taosGetTimestampMs();
|
||||||
|
|
||||||
if (done) {
|
if (done) {
|
||||||
qDebug("s-task:%s scan wal(step 2) verRange:%" PRId64 "-%" PRId64 " ended, elapsed time:%.2fs", id, pRange->minVer,
|
qDebug("s-task:%s scan wal(step 2) verRange:%" PRId64 "-%" PRId64 " ended, elapsed time:%.2fs", id, pStep2Range->minVer,
|
||||||
pRange->maxVer, 0.0);
|
pStep2Range->maxVer, 0.0);
|
||||||
streamTaskPutTranstateIntoInputQ(pTask);
|
streamTaskPutTranstateIntoInputQ(pTask);
|
||||||
streamExecTask(pTask); // exec directly
|
streamExecTask(pTask); // exec directly
|
||||||
} else {
|
} else {
|
||||||
STimeWindow* pWindow = &pTask->dataRange.window;
|
STimeWindow* pWindow = &pTask->dataRange.window;
|
||||||
tqDebug("s-task:%s level:%d verRange:%" PRId64 "-%" PRId64 " window:%" PRId64 "-%" PRId64
|
tqDebug("s-task:%s level:%d verRange:%" PRId64 "-%" PRId64 " window:%" PRId64 "-%" PRId64
|
||||||
", do secondary scan-history from WAL after halt the related stream task:%s",
|
", do secondary scan-history from WAL after halt the related stream task:%s",
|
||||||
id, pTask->info.taskLevel, pRange->minVer, pRange->maxVer, pWindow->skey, pWindow->ekey,
|
id, pTask->info.taskLevel, pStep2Range->minVer, pStep2Range->maxVer, pWindow->skey, pWindow->ekey,
|
||||||
pStreamTask->id.idStr);
|
pStreamTask->id.idStr);
|
||||||
ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING);
|
ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING);
|
||||||
|
|
||||||
streamSetParamForStreamScannerStep2(pTask, pRange, pWindow);
|
streamSetParamForStreamScannerStep2(pTask, pStep2Range, pWindow);
|
||||||
|
|
||||||
int64_t dstVer = pTask->dataRange.range.minVer;
|
int64_t dstVer =pStep2Range->minVer;
|
||||||
pTask->chkInfo.nextProcessVer = dstVer;
|
pTask->chkInfo.nextProcessVer = dstVer;
|
||||||
|
|
||||||
walReaderSetSkipToVersion(pTask->exec.pWalReader, dstVer);
|
walReaderSetSkipToVersion(pTask->exec.pWalReader, dstVer);
|
||||||
tqDebug("s-task:%s wal reader start scan WAL verRange:%" PRId64 "-%" PRId64 ", set sched-status:%d", id, dstVer,
|
tqDebug("s-task:%s wal reader start scan WAL verRange:%" PRId64 "-%" PRId64 ", set sched-status:%d", id, dstVer,
|
||||||
pTask->dataRange.range.maxVer, TASK_SCHED_STATUS__INACTIVE);
|
pStep2Range->maxVer, TASK_SCHED_STATUS__INACTIVE);
|
||||||
|
|
||||||
/*int8_t status = */ streamTaskSetSchedStatusInactive(pTask);
|
/*int8_t status = */ streamTaskSetSchedStatusInactive(pTask);
|
||||||
|
|
||||||
|
|
|
@ -242,21 +242,23 @@ int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId) {
|
||||||
// todo handle memory error
|
// todo handle memory error
|
||||||
bool handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver) {
|
bool handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver) {
|
||||||
const char* id = pTask->id.idStr;
|
const char* id = pTask->id.idStr;
|
||||||
int64_t maxVer = pTask->dataRange.range.maxVer;
|
int64_t maxVer = pTask->step2Range.maxVer;
|
||||||
|
|
||||||
if ((pTask->info.fillHistory == 1) && ver > pTask->dataRange.range.maxVer) {
|
if ((pTask->info.fillHistory == 1) && ver > maxVer) {
|
||||||
if (!pTask->status.appendTranstateBlock) {
|
if (!pTask->status.appendTranstateBlock) {
|
||||||
qWarn("s-task:%s fill-history scan WAL, nextProcessVer:%" PRId64 " out of the maximum ver:%" PRId64
|
qWarn("s-task:%s fill-history scan WAL, nextProcessVer:%" PRId64 " out of the maximum ver:%" PRId64
|
||||||
", not scan wal anymore, add transfer-state block into inputQ",
|
", not scan wal anymore, add transfer-state block into inputQ",
|
||||||
id, ver, maxVer);
|
id, ver, maxVer);
|
||||||
|
|
||||||
double el = (taosGetTimestampMs() - pTask->execInfo.step2Start) / 1000.0;
|
double el = (taosGetTimestampMs() - pTask->execInfo.step2Start) / 1000.0;
|
||||||
qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, el);
|
qDebug("s-task:%s scan-history from WAL stage(step 2) ended, range:%" PRId64 "-%" PRId64 ", elapsed time:%.2fs",
|
||||||
|
id, pTask->step2Range.minVer, maxVer, el);
|
||||||
/*int32_t code = */streamTaskPutTranstateIntoInputQ(pTask);
|
/*int32_t code = */streamTaskPutTranstateIntoInputQ(pTask);
|
||||||
return true;
|
return true;
|
||||||
} else {
|
} else {
|
||||||
qWarn("s-task:%s fill-history scan WAL, nextProcessVer:%" PRId64 " out of the maximum ver:%" PRId64 ", not scan wal",
|
qWarn("s-task:%s fill-history scan WAL, nextProcessVer:%" PRId64 " out of the ver range:%" PRId64 "-%" PRId64
|
||||||
id, ver, maxVer);
|
", not scan wal",
|
||||||
|
id, ver, pTask->step2Range.minVer, maxVer);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -389,7 +391,7 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t numOfItems = streamQueueGetNumOfItems(pTask->inputq.queue);
|
int32_t numOfItems = streamQueueGetNumOfItems(pTask->inputq.queue);
|
||||||
int64_t maxVer = (pTask->info.fillHistory == 1) ? pTask->dataRange.range.maxVer : INT64_MAX;
|
int64_t maxVer = (pTask->info.fillHistory == 1) ? pTask->step2Range.maxVer : INT64_MAX;
|
||||||
|
|
||||||
taosThreadMutexLock(&pTask->lock);
|
taosThreadMutexLock(&pTask->lock);
|
||||||
|
|
||||||
|
|
|
@ -868,8 +868,10 @@ bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t nextProcessVe
|
||||||
} else {
|
} else {
|
||||||
// 2. do secondary scan of the history data, the time window remain, and the version range is updated to
|
// 2. do secondary scan of the history data, the time window remain, and the version range is updated to
|
||||||
// [pTask->dataRange.range.maxVer, ver1]
|
// [pTask->dataRange.range.maxVer, ver1]
|
||||||
pRange->minVer = walScanStartVer;
|
pTask->step2Range.minVer = walScanStartVer;
|
||||||
pRange->maxVer = nextProcessVer - 1;
|
pTask->step2Range.maxVer = nextProcessVer - 1;
|
||||||
|
stDebug("s-task:%s set step2 verRange:%" PRId64 "-%" PRId64 ", step1 verRange:%" PRId64 "-%" PRId64, pTask->id.idStr,
|
||||||
|
pTask->step2Range.minVer, pTask->step2Range.maxVer, pRange->minVer, pRange->maxVer);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue