fix(stream): fix error in pause scan-history data.

This commit is contained in:
Haojun Liao 2023-09-26 01:09:55 +08:00
parent 6a98b11bef
commit feb09c8361
3 changed files with 42 additions and 32 deletions

View File

@ -698,7 +698,6 @@ int32_t streamSetStatusUnint(SStreamTask* pTask);
const char* streamGetTaskStatusStr(int32_t status); const char* streamGetTaskStatusStr(int32_t status);
void streamTaskPause(SStreamTask* pTask, SStreamMeta* pMeta); void streamTaskPause(SStreamTask* pTask, SStreamMeta* pMeta);
void streamTaskResume(SStreamTask* pTask, SStreamMeta* pMeta); void streamTaskResume(SStreamTask* pTask, SStreamMeta* pMeta);
void streamTaskHalt(SStreamTask* pTask);
void streamTaskResumeFromHalt(SStreamTask* pTask); void streamTaskResumeFromHalt(SStreamTask* pTask);
void streamTaskDisablePause(SStreamTask* pTask); void streamTaskDisablePause(SStreamTask* pTask);
void streamTaskEnablePause(SStreamTask* pTask); void streamTaskEnablePause(SStreamTask* pTask);

View File

@ -1135,8 +1135,12 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
} }
if (status == TASK_STATUS__HALT) { if (status == TASK_STATUS__HALT) {
// return; // tqDebug("s-task:%s level:%d sched-status:%d is halt by fill-history task:%s", pStreamTask->id.idStr,
// do nothing // pStreamTask->info.taskLevel, pStreamTask->status.schedStatus, id);
// latestVer = walReaderGetCurrentVer(pStreamTask->exec.pWalReader);
//
// taosThreadMutexUnlock(&pStreamTask->lock);
// break;
} }
if (pStreamTask->status.taskStatus == TASK_STATUS__CK) { if (pStreamTask->status.taskStatus == TASK_STATUS__CK) {
@ -1152,7 +1156,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
qDebug("s-task:%s upgrade status to %s from %s", pStreamTask->id.idStr, streamGetTaskStatusStr(TASK_STATUS__HALT), qDebug("s-task:%s upgrade status to %s from %s", pStreamTask->id.idStr, streamGetTaskStatusStr(TASK_STATUS__HALT),
streamGetTaskStatusStr(TASK_STATUS__PAUSE)); streamGetTaskStatusStr(TASK_STATUS__PAUSE));
} else { } else {
qDebug("s-task:%s halt task", pStreamTask->id.idStr); qDebug("s-task:%s halt task, prev status:%s", pStreamTask->id.idStr, streamGetTaskStatusStr(status));
} }
pStreamTask->status.keepTaskStatus = status; pStreamTask->status.keepTaskStatus = status;
@ -1174,6 +1178,13 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
pTask->execInfo.step2Start = taosGetTimestampMs(); pTask->execInfo.step2Start = taosGetTimestampMs();
qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, 0.0); qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, 0.0);
streamTaskPutTranstateIntoInputQ(pTask); streamTaskPutTranstateIntoInputQ(pTask);
if (pTask->status.taskStatus == TASK_STATUS__PAUSE) {
pTask->status.keepTaskStatus = TASK_STATUS__NORMAL;
qDebug("s-task:%s prev status is %s, update the kept status to be:%s when after step 2", id,
streamGetTaskStatusStr(TASK_STATUS__PAUSE), streamGetTaskStatusStr(pTask->status.keepTaskStatus));
}
streamTryExec(pTask); // exec directly streamTryExec(pTask); // exec directly
} else { } else {
STimeWindow* pWindow = &pTask->dataRange.window; STimeWindow* pWindow = &pTask->dataRange.window;

View File

@ -1001,34 +1001,34 @@ void streamTaskEnablePause(SStreamTask* pTask) {
} }
// fix: this function should be removed, it may cause deadlock. // fix: this function should be removed, it may cause deadlock.
void streamTaskHalt(SStreamTask* pTask) { //void streamTaskHalt(SStreamTask* pTask) {
int8_t status = pTask->status.taskStatus; // int8_t status = pTask->status.taskStatus;
if (status == TASK_STATUS__DROPPING || status == TASK_STATUS__STOP) { // if (status == TASK_STATUS__DROPPING || status == TASK_STATUS__STOP) {
return; // return;
} // }
//
if (status == TASK_STATUS__HALT) { // if (status == TASK_STATUS__HALT) {
return; // return;
} // }
//
// wait for checkpoint completed // // wait for checkpoint completed
while(pTask->status.taskStatus == TASK_STATUS__CK) { // while(pTask->status.taskStatus == TASK_STATUS__CK) {
qDebug("s-task:%s status:%s during generating checkpoint, wait for 1sec and retry set status:halt", pTask->id.idStr, // qDebug("s-task:%s status:%s during generating checkpoint, wait for 1sec and retry set status:halt", pTask->id.idStr,
streamGetTaskStatusStr(TASK_STATUS__CK)); // streamGetTaskStatusStr(TASK_STATUS__CK));
taosMsleep(1000); // taosMsleep(1000);
} // }
//
// upgrade to halt status // // upgrade to halt status
if (status == TASK_STATUS__PAUSE) { // if (status == TASK_STATUS__PAUSE) {
stDebug("s-task:%s upgrade status to %s from %s", pTask->id.idStr, streamGetTaskStatusStr(TASK_STATUS__HALT), // stDebug("s-task:%s upgrade status to %s from %s", pTask->id.idStr, streamGetTaskStatusStr(TASK_STATUS__HALT),
streamGetTaskStatusStr(TASK_STATUS__PAUSE)); // streamGetTaskStatusStr(TASK_STATUS__PAUSE));
} else { // } else {
stDebug("s-task:%s halt task", pTask->id.idStr); // stDebug("s-task:%s halt task", pTask->id.idStr);
} // }
//
pTask->status.keepTaskStatus = status; // pTask->status.keepTaskStatus = status;
pTask->status.taskStatus = TASK_STATUS__HALT; // pTask->status.taskStatus = TASK_STATUS__HALT;
} //}
void streamTaskResumeFromHalt(SStreamTask* pTask) { void streamTaskResumeFromHalt(SStreamTask* pTask) {
const char* id = pTask->id.idStr; const char* id = pTask->id.idStr;