fix(stream): check if stream task is in normal state.
This commit is contained in:
parent
da6a4de021
commit
2e386a6fdf
|
@ -1106,7 +1106,15 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
// 1. stop the related stream task, get the current scan wal version of stream task, ver.
|
// 1. stop the related stream task, get the current scan wal version of stream task, ver.
|
||||||
pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.taskId);
|
pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.taskId);
|
||||||
if (pStreamTask == NULL) {
|
if (pStreamTask == NULL) {
|
||||||
// todo handle error
|
qError("failed to find s-task:0x%x, it may have been destroyed, drop fill history task:%s",
|
||||||
|
pTask->streamTaskId.taskId, pTask->id.idStr);
|
||||||
|
|
||||||
|
pTask->status.taskStatus = TASK_STATUS__DROPPING;
|
||||||
|
tqDebug("s-task:%s scan-history-task set status to be dropping", pId);
|
||||||
|
|
||||||
|
streamMetaSaveTask(pMeta, pTask);
|
||||||
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
ASSERT(pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE);
|
ASSERT(pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE);
|
||||||
|
|
Loading…
Reference in New Issue