fix(stream): set downstream task ready state.
This commit is contained in:
parent
5f6afc6661
commit
9513f76172
|
@ -1137,14 +1137,15 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
// wait for the stream task get ready for scan history data
|
// wait for the stream task get ready for scan history data
|
||||||
while (((pStreamTask->status.downstreamReady == 0) && (pStreamTask->status.taskStatus != TASK_STATUS__STOP)) ||
|
while (((pStreamTask->status.downstreamReady == 0) && (pStreamTask->status.taskStatus != TASK_STATUS__STOP)) ||
|
||||||
pStreamTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) {
|
pStreamTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) {
|
||||||
tqDebug("s-task:%s level:%d related stream task:%s not ready for halt, wait for it and recheck in 100ms", pId,
|
tqDebug(
|
||||||
pTask->info.taskLevel, pId);
|
"s-task:%s level:%d related stream task:%s(status:%s) not ready for halt, wait for it and recheck in 100ms",
|
||||||
|
pId, pTask->info.taskLevel, pStreamTask->id.idStr, streamGetTaskStatusStr(pStreamTask->status.taskStatus));
|
||||||
taosMsleep(100);
|
taosMsleep(100);
|
||||||
}
|
}
|
||||||
|
|
||||||
// now we can stop the stream task execution
|
// now we can stop the stream task execution
|
||||||
pStreamTask->status.taskStatus = TASK_STATUS__HALT;
|
pStreamTask->status.taskStatus = TASK_STATUS__HALT;
|
||||||
tqDebug("s-task:%s level:%d status is set to halt by history scan task:%s", pId,
|
tqDebug("s-task:%s level:%d status is set to halt by history scan task:%s", pStreamTask->id.idStr,
|
||||||
pStreamTask->info.taskLevel, pId);
|
pStreamTask->info.taskLevel, pId);
|
||||||
|
|
||||||
// if it's an source task, extract the last version in wal.
|
// if it's an source task, extract the last version in wal.
|
||||||
|
|
|
@ -201,6 +201,7 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
|
||||||
if (left == 0) {
|
if (left == 0) {
|
||||||
taosArrayDestroy(pTask->checkReqIds);
|
taosArrayDestroy(pTask->checkReqIds);
|
||||||
pTask->checkReqIds = NULL;
|
pTask->checkReqIds = NULL;
|
||||||
|
pTask->status.downstreamReady = 1;
|
||||||
|
|
||||||
if (pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) {
|
if (pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) {
|
||||||
qDebug("s-task:%s all %d downstream tasks are ready, now enter into scan-history-data stage, status:%s", id,
|
qDebug("s-task:%s all %d downstream tasks are ready, now enter into scan-history-data stage, status:%s", id,
|
||||||
|
|
Loading…
Reference in New Issue