fix: set nextProcessedVer properly in tqProcessTaskScanHistory

This commit is contained in:
Benguang Zhao 2023-10-16 16:36:38 +08:00
parent 5c85525fd0
commit c4e9069a66
2 changed files with 7 additions and 2 deletions

View File

@ -1200,6 +1200,9 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
pStreamTask->status.taskStatus = TASK_STATUS__HALT;
nextProcessedVer = walReaderGetCurrentVer(pStreamTask->exec.pWalReader);
if (nextProcessedVer == -1) {
nextProcessedVer = pStreamTask->dataRange.range.maxVer + 1;
}
tqDebug("s-task:%s level:%d nextProcessedVer:%" PRId64 ", sched-status:%d is halt by fill-history task:%s",
pStreamTask->id.idStr, pStreamTask->info.taskLevel, nextProcessedVer, pStreamTask->status.schedStatus,
@ -1975,4 +1978,4 @@ int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg) {
streamMetaReleaseTask(pMeta, pTask);
return TSDB_CODE_SUCCESS;
}
}

View File

@ -309,7 +309,9 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
pStreamTask->id.idStr);
}
ASSERT(pStreamTask->hTaskInfo.id.taskId == pTask->id.taskId && pTask->status.appendTranstateBlock == true);
ASSERT(((pStreamTask->status.taskStatus == TASK_STATUS__STOP) ||
(pStreamTask->hTaskInfo.id.taskId == pTask->id.taskId)) &&
pTask->status.appendTranstateBlock == true);
STimeWindow* pTimeWindow = &pStreamTask->dataRange.window;