fix(stream): check more status when handling the state transfer.
This commit is contained in:
parent
db8ce3de3b
commit
17c6143471
|
@ -1277,7 +1277,6 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
|||
if (done) {
|
||||
pTask->tsInfo.step2Start = taosGetTimestampMs();
|
||||
streamTaskEndScanWAL(pTask);
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
} else {
|
||||
STimeWindow* pWindow = &pTask->dataRange.window;
|
||||
tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64 " window:%" PRId64 "-%" PRId64
|
||||
|
@ -1303,13 +1302,11 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
|||
streamSetStatusNormal(pTask);
|
||||
}
|
||||
|
||||
// 4. 1) transfer the ownership of executor state, 2) update the scan data range for source task.
|
||||
// 5. resume the related stream task.
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
streamMetaReleaseTask(pMeta, pStreamTask);
|
||||
|
||||
tqStartStreamTasks(pTq);
|
||||
}
|
||||
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
streamMetaReleaseTask(pMeta, pStreamTask);
|
||||
} else {
|
||||
// todo update the chkInfo version for current task.
|
||||
// this task has an associated history stream task, so we need to scan wal from the end version of
|
||||
|
@ -1515,7 +1512,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
if (pTask != NULL) {
|
||||
// even in halt status, the data in inputQ must be processed
|
||||
int8_t st = pTask->status.taskStatus;
|
||||
if (st == TASK_STATUS__NORMAL || st == TASK_STATUS__SCAN_HISTORY/* || st == TASK_STATUS__SCAN_HISTORY_WAL*/) {
|
||||
if (st == TASK_STATUS__NORMAL || st == TASK_STATUS__SCAN_HISTORY) {
|
||||
tqDebug("vgId:%d s-task:%s start to process block from inputQ, last chk point:%" PRId64, vgId, pTask->id.idStr,
|
||||
pTask->chkInfo.version);
|
||||
streamProcessRunReq(pTask);
|
||||
|
@ -1528,8 +1525,9 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
|
||||
tqStartStreamTasks(pTq);
|
||||
return 0;
|
||||
} else {
|
||||
tqError("vgId:%d failed to found s-task, taskId:%d", vgId, taskId);
|
||||
} else { // NOTE: pTask->status.schedStatus is not updated since it is not be handled by the run exec.
|
||||
// todo add one function to handle this
|
||||
tqError("vgId:%d failed to found s-task, taskId:0x%x may have been dropped", vgId, taskId);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1046,7 +1046,7 @@ int32_t qStreamInfoResetTimewindowFilter(qTaskInfo_t tinfo) {
|
|||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||
STimeWindow* pWindow = &pTaskInfo->streamInfo.fillHistoryWindow;
|
||||
|
||||
qDebug("%s set remove scan-history filter window:%" PRId64 "-%" PRId64 ", new window:%" PRId64 "-%" PRId64,
|
||||
qDebug("%s remove scan-history filter window:%" PRId64 "-%" PRId64 ", set new window:%" PRId64 "-%" PRId64,
|
||||
GET_TASKID(pTaskInfo), pWindow->skey, pWindow->ekey, INT64_MIN, INT64_MAX);
|
||||
|
||||
pWindow->skey = INT64_MIN;
|
||||
|
|
|
@ -545,8 +545,11 @@ int32_t streamExecForAll(SStreamTask* pTask) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
// the task may be set dropping/stopping, while it is still in the task queue, therefore, the sched-status can not
|
||||
// be updated by tryExec function, therefore, the schedStatus will always be the TASK_SCHED_STATUS__WAITING.
|
||||
bool streamTaskIsIdle(const SStreamTask* pTask) {
|
||||
return (pTask->status.schedStatus == TASK_SCHED_STATUS__INACTIVE);
|
||||
return (pTask->status.schedStatus == TASK_SCHED_STATUS__INACTIVE || pTask->status.taskStatus == TASK_STATUS__STOP ||
|
||||
pTask->status.taskStatus == TASK_STATUS__DROPPING);
|
||||
}
|
||||
|
||||
int32_t streamTaskEndScanWAL(SStreamTask* pTask) {
|
||||
|
@ -584,7 +587,6 @@ int32_t streamTryExec(SStreamTask* pTask) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
// todo the task should be commit here
|
||||
// todo the task should be commit here
|
||||
if (taosQueueEmpty(pTask->inputQueue->queue)) {
|
||||
// fill-history WAL scan has completed
|
||||
|
|
|
@ -330,7 +330,8 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int32_t taskId) {
|
|||
}
|
||||
taosWUnLockLatch(&pMeta->lock);
|
||||
|
||||
qDebug("s-task:0x%x set task status:%s", taskId, streamGetTaskStatusStr(TASK_STATUS__DROPPING));
|
||||
qDebug("s-task:0x%x set task status:%s and start to unregister it", taskId,
|
||||
streamGetTaskStatusStr(TASK_STATUS__DROPPING));
|
||||
|
||||
while (1) {
|
||||
taosRLockLatch(&pMeta->lock);
|
||||
|
|
Loading…
Reference in New Issue