fix(stream): fix error in pause scan-history data.
This commit is contained in:
parent
5a4a0aa0e3
commit
6cc3618ed9
|
@ -698,7 +698,6 @@ int32_t streamSetStatusUnint(SStreamTask* pTask);
|
||||||
const char* streamGetTaskStatusStr(int32_t status);
|
const char* streamGetTaskStatusStr(int32_t status);
|
||||||
void streamTaskPause(SStreamTask* pTask, SStreamMeta* pMeta);
|
void streamTaskPause(SStreamTask* pTask, SStreamMeta* pMeta);
|
||||||
void streamTaskResume(SStreamTask* pTask, SStreamMeta* pMeta);
|
void streamTaskResume(SStreamTask* pTask, SStreamMeta* pMeta);
|
||||||
void streamTaskHalt(SStreamTask* pTask);
|
|
||||||
void streamTaskResumeFromHalt(SStreamTask* pTask);
|
void streamTaskResumeFromHalt(SStreamTask* pTask);
|
||||||
void streamTaskDisablePause(SStreamTask* pTask);
|
void streamTaskDisablePause(SStreamTask* pTask);
|
||||||
void streamTaskEnablePause(SStreamTask* pTask);
|
void streamTaskEnablePause(SStreamTask* pTask);
|
||||||
|
|
|
@ -1144,8 +1144,12 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (status == TASK_STATUS__HALT) {
|
if (status == TASK_STATUS__HALT) {
|
||||||
// return;
|
// tqDebug("s-task:%s level:%d sched-status:%d is halt by fill-history task:%s", pStreamTask->id.idStr,
|
||||||
// do nothing
|
// pStreamTask->info.taskLevel, pStreamTask->status.schedStatus, id);
|
||||||
|
// latestVer = walReaderGetCurrentVer(pStreamTask->exec.pWalReader);
|
||||||
|
//
|
||||||
|
// taosThreadMutexUnlock(&pStreamTask->lock);
|
||||||
|
// break;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pStreamTask->status.taskStatus == TASK_STATUS__CK) {
|
if (pStreamTask->status.taskStatus == TASK_STATUS__CK) {
|
||||||
|
@ -1161,7 +1165,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
qDebug("s-task:%s upgrade status to %s from %s", pStreamTask->id.idStr, streamGetTaskStatusStr(TASK_STATUS__HALT),
|
qDebug("s-task:%s upgrade status to %s from %s", pStreamTask->id.idStr, streamGetTaskStatusStr(TASK_STATUS__HALT),
|
||||||
streamGetTaskStatusStr(TASK_STATUS__PAUSE));
|
streamGetTaskStatusStr(TASK_STATUS__PAUSE));
|
||||||
} else {
|
} else {
|
||||||
qDebug("s-task:%s halt task", pStreamTask->id.idStr);
|
qDebug("s-task:%s halt task, prev status:%s", pStreamTask->id.idStr, streamGetTaskStatusStr(status));
|
||||||
}
|
}
|
||||||
|
|
||||||
pStreamTask->status.keepTaskStatus = status;
|
pStreamTask->status.keepTaskStatus = status;
|
||||||
|
@ -1183,6 +1187,13 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
pTask->execInfo.step2Start = taosGetTimestampMs();
|
pTask->execInfo.step2Start = taosGetTimestampMs();
|
||||||
qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, 0.0);
|
qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, 0.0);
|
||||||
streamTaskPutTranstateIntoInputQ(pTask);
|
streamTaskPutTranstateIntoInputQ(pTask);
|
||||||
|
|
||||||
|
if (pTask->status.taskStatus == TASK_STATUS__PAUSE) {
|
||||||
|
pTask->status.keepTaskStatus = TASK_STATUS__NORMAL;
|
||||||
|
qDebug("s-task:%s prev status is %s, update the kept status to be:%s when after step 2", id,
|
||||||
|
streamGetTaskStatusStr(TASK_STATUS__PAUSE), streamGetTaskStatusStr(pTask->status.keepTaskStatus));
|
||||||
|
}
|
||||||
|
|
||||||
streamTryExec(pTask); // exec directly
|
streamTryExec(pTask); // exec directly
|
||||||
} else {
|
} else {
|
||||||
STimeWindow* pWindow = &pTask->dataRange.window;
|
STimeWindow* pWindow = &pTask->dataRange.window;
|
||||||
|
|
|
@ -1001,34 +1001,34 @@ void streamTaskEnablePause(SStreamTask* pTask) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// fix: this function should be removed, it may cause deadlock.
|
// fix: this function should be removed, it may cause deadlock.
|
||||||
void streamTaskHalt(SStreamTask* pTask) {
|
//void streamTaskHalt(SStreamTask* pTask) {
|
||||||
int8_t status = pTask->status.taskStatus;
|
// int8_t status = pTask->status.taskStatus;
|
||||||
if (status == TASK_STATUS__DROPPING || status == TASK_STATUS__STOP) {
|
// if (status == TASK_STATUS__DROPPING || status == TASK_STATUS__STOP) {
|
||||||
return;
|
// return;
|
||||||
}
|
// }
|
||||||
|
//
|
||||||
if (status == TASK_STATUS__HALT) {
|
// if (status == TASK_STATUS__HALT) {
|
||||||
return;
|
// return;
|
||||||
}
|
// }
|
||||||
|
//
|
||||||
// wait for checkpoint completed
|
// // wait for checkpoint completed
|
||||||
while(pTask->status.taskStatus == TASK_STATUS__CK) {
|
// while(pTask->status.taskStatus == TASK_STATUS__CK) {
|
||||||
qDebug("s-task:%s status:%s during generating checkpoint, wait for 1sec and retry set status:halt", pTask->id.idStr,
|
// qDebug("s-task:%s status:%s during generating checkpoint, wait for 1sec and retry set status:halt", pTask->id.idStr,
|
||||||
streamGetTaskStatusStr(TASK_STATUS__CK));
|
// streamGetTaskStatusStr(TASK_STATUS__CK));
|
||||||
taosMsleep(1000);
|
// taosMsleep(1000);
|
||||||
}
|
// }
|
||||||
|
//
|
||||||
// upgrade to halt status
|
// // upgrade to halt status
|
||||||
if (status == TASK_STATUS__PAUSE) {
|
// if (status == TASK_STATUS__PAUSE) {
|
||||||
stDebug("s-task:%s upgrade status to %s from %s", pTask->id.idStr, streamGetTaskStatusStr(TASK_STATUS__HALT),
|
// stDebug("s-task:%s upgrade status to %s from %s", pTask->id.idStr, streamGetTaskStatusStr(TASK_STATUS__HALT),
|
||||||
streamGetTaskStatusStr(TASK_STATUS__PAUSE));
|
// streamGetTaskStatusStr(TASK_STATUS__PAUSE));
|
||||||
} else {
|
// } else {
|
||||||
stDebug("s-task:%s halt task", pTask->id.idStr);
|
// stDebug("s-task:%s halt task", pTask->id.idStr);
|
||||||
}
|
// }
|
||||||
|
//
|
||||||
pTask->status.keepTaskStatus = status;
|
// pTask->status.keepTaskStatus = status;
|
||||||
pTask->status.taskStatus = TASK_STATUS__HALT;
|
// pTask->status.taskStatus = TASK_STATUS__HALT;
|
||||||
}
|
//}
|
||||||
|
|
||||||
void streamTaskResumeFromHalt(SStreamTask* pTask) {
|
void streamTaskResumeFromHalt(SStreamTask* pTask) {
|
||||||
const char* id = pTask->id.idStr;
|
const char* id = pTask->id.idStr;
|
||||||
|
|
Loading…
Reference in New Issue