Merge pull request #22174 from taosdata/fix/3_liaohj

fix(stream): fix the invalid check of step2 .
This commit is contained in:
Haojun Liao 2023-07-25 00:30:06 +08:00 committed by GitHub
commit 96722a07b1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 8 additions and 10 deletions

View File

@ -598,7 +598,7 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
int32_t streamLaunchFillHistoryTask(SStreamTask* pTask); int32_t streamLaunchFillHistoryTask(SStreamTask* pTask);
int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask); int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask);
int32_t streamStartRecoverTask(SStreamTask* pTask, int8_t igUntreated); int32_t streamStartRecoverTask(SStreamTask* pTask, int8_t igUntreated);
void streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask); void streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer);
bool streamTaskRecoverScanStep1Finished(SStreamTask* pTask); bool streamTaskRecoverScanStep1Finished(SStreamTask* pTask);
bool streamTaskRecoverScanStep2Finished(SStreamTask* pTask); bool streamTaskRecoverScanStep2Finished(SStreamTask* pTask);

View File

@ -1154,16 +1154,15 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
taosMsleep(100); taosMsleep(100);
} }
streamTaskHalt(pTask);
// now we can stop the stream task execution // now we can stop the stream task execution
// todo upgrade the statu to be HALT from PAUSE or NORMAL streamTaskHalt(pStreamTask);
pStreamTask->status.taskStatus = TASK_STATUS__HALT; tqDebug("s-task:%s level:%d is halt by fill-history task:%s", pStreamTask->id.idStr, pStreamTask->info.taskLevel,
tqDebug("s-task:%s level:%d status is set to halt by fill-history task:%s", pStreamTask->id.idStr, id);
pStreamTask->info.taskLevel, id);
// if it's an source task, extract the last version in wal. // if it's an source task, extract the last version in wal.
streamHistoryTaskSetVerRangeStep2(pTask); pRange = &pTask->dataRange.range;
int64_t latestVer = walReaderGetCurrentVer(pStreamTask->exec.pWalReader);
streamHistoryTaskSetVerRangeStep2(pTask, latestVer);
} }
if (!streamTaskRecoverScanStep1Finished(pTask)) { if (!streamTaskRecoverScanStep1Finished(pTask)) {

View File

@ -654,9 +654,8 @@ int32_t streamTaskRecoverSetAllStepFinished(SStreamTask* pTask) {
return qStreamRecoverSetAllStepFinished(exec); return qStreamRecoverSetAllStepFinished(exec);
} }
void streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask) { void streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer) {
SVersionRange* pRange = &pTask->dataRange.range; SVersionRange* pRange = &pTask->dataRange.range;
int64_t latestVer = walReaderGetCurrentVer(pTask->exec.pWalReader);
ASSERT(latestVer >= pRange->maxVer); ASSERT(latestVer >= pRange->maxVer);
int64_t nextStartVer = pRange->maxVer + 1; int64_t nextStartVer = pRange->maxVer + 1;