fix(stream):add more check for test cases.

This commit is contained in:
Haojun Liao 2023-10-16 18:47:01 +08:00
parent 8f2c1944c0
commit a1aa2c9e0c
2 changed files with 7 additions and 1 deletions

View File

@ -1196,7 +1196,11 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
pStreamTask->status.keepTaskStatus = status; pStreamTask->status.keepTaskStatus = status;
pStreamTask->status.taskStatus = TASK_STATUS__HALT; pStreamTask->status.taskStatus = TASK_STATUS__HALT;
// wal scan not start yet, reset it to be the start position
nextProcessedVer = walReaderGetCurrentVer(pStreamTask->exec.pWalReader); nextProcessedVer = walReaderGetCurrentVer(pStreamTask->exec.pWalReader);
if (nextProcessedVer == -1) {
nextProcessedVer = pStreamTask->dataRange.range.maxVer + 1;
}
tqDebug("s-task:%s level:%d nextProcessedVer:%" PRId64 ", sched-status:%d is halt by fill-history task:%s", tqDebug("s-task:%s level:%d nextProcessedVer:%" PRId64 ", sched-status:%d is halt by fill-history task:%s",
pStreamTask->id.idStr, pStreamTask->info.taskLevel, nextProcessedVer, pStreamTask->status.schedStatus, pStreamTask->id.idStr, pStreamTask->info.taskLevel, nextProcessedVer, pStreamTask->status.schedStatus,

View File

@ -309,7 +309,9 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
pStreamTask->id.idStr); pStreamTask->id.idStr);
} }
ASSERT(pStreamTask->hTaskInfo.id.taskId == pTask->id.taskId && pTask->status.appendTranstateBlock == true); ASSERT(((pStreamTask->status.taskStatus == TASK_STATUS__STOP) ||
(pStreamTask->hTaskInfo.id.taskId == pTask->id.taskId)) &&
pTask->status.appendTranstateBlock == true);
STimeWindow* pTimeWindow = &pStreamTask->dataRange.window; STimeWindow* pTimeWindow = &pStreamTask->dataRange.window;