fix(stream): ignore the related stream task destory msg in transfer state.

This commit is contained in:
Haojun Liao 2023-08-14 10:58:24 +08:00
parent 98f40325e9
commit 2806fe1c56
3 changed files with 22 additions and 18 deletions

View File

@ -1527,7 +1527,7 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
if (pTask) { if (pTask) {
streamProcessDispatchRsp(pTask, pRsp, pMsg->code); streamProcessDispatchRsp(pTask, pRsp, pMsg->code);
streamMetaReleaseTask(pTq->pStreamMeta, pTask); streamMetaReleaseTask(pTq->pStreamMeta, pTask);
return 0; return TSDB_CODE_SUCCESS;
} else { } else {
tqDebug("vgId:%d failed to handle the dispatch rsp, since find task:0x%x failed", vgId, taskId); tqDebug("vgId:%d failed to handle the dispatch rsp, since find task:0x%x failed", vgId, taskId);
return TSDB_CODE_INVALID_MSG; return TSDB_CODE_INVALID_MSG;

View File

@ -758,10 +758,10 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
ASSERT(pTask->info.fillHistory == 1); ASSERT(pTask->info.fillHistory == 1);
code = streamTransferStateToStreamTask(pTask); code = streamTransferStateToStreamTask(pTask);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) { // todo: do nothing if error happens
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); // atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
return code;
} }
return TSDB_CODE_SUCCESS;
} }
pTask->msgInfo.retryCount = 0; pTask->msgInfo.retryCount = 0;

View File

@ -292,9 +292,20 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId); SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId);
if (pStreamTask == NULL) { if (pStreamTask == NULL) {
// todo: destroy the fill-history task here qError(
qError("s-task:%s failed to find related stream task:0x%x, it may have been destroyed or closed", pTask->id.idStr, "s-task:%s failed to find related stream task:0x%x, it may have been destroyed or closed, destroy the related "
pTask->streamTaskId.taskId); "fill-history task",
pTask->id.idStr, pTask->streamTaskId.taskId);
// 1. free it and remove fill-history task from disk meta-store
streamMetaUnregisterTask(pMeta, pTask->id.streamId, pTask->id.taskId);
// 2. save to disk
taosWLockLatch(&pMeta->lock);
if (streamMetaCommit(pMeta) < 0) {
// persist to disk
}
taosWUnLockLatch(&pMeta->lock);
return TSDB_CODE_STREAM_TASK_NOT_EXIST; return TSDB_CODE_STREAM_TASK_NOT_EXIST;
} else { } else {
qDebug("s-task:%s fill-history task end, update related stream task:%s info, transfer exec state", pTask->id.idStr, qDebug("s-task:%s fill-history task end, update related stream task:%s info, transfer exec state", pTask->id.idStr,
@ -334,9 +345,6 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
qDebug("s-task:%s no need to update time window for non-source task", pStreamTask->id.idStr); qDebug("s-task:%s no need to update time window for non-source task", pStreamTask->id.idStr);
} }
// todo check the output queue for fill-history task, and wait for it complete
// 1. expand the query time window for stream task of WAL scanner // 1. expand the query time window for stream task of WAL scanner
pTimeWindow->skey = INT64_MIN; pTimeWindow->skey = INT64_MIN;
qStreamInfoResetTimewindowFilter(pStreamTask->exec.pExecutor); qStreamInfoResetTimewindowFilter(pStreamTask->exec.pExecutor);
@ -390,15 +398,10 @@ int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
int32_t level = pTask->info.taskLevel; int32_t level = pTask->info.taskLevel;
if (level == TASK_LEVEL__SOURCE) { if (level == TASK_LEVEL__SOURCE) {
streamTaskFillHistoryFinished(pTask); streamTaskFillHistoryFinished(pTask);
}
if (level == TASK_LEVEL__AGG || level == TASK_LEVEL__SOURCE) { // do transfer task operator states.
code = streamDoTransferStateToStreamTask(pTask); code = streamDoTransferStateToStreamTask(pTask);
if (code != TSDB_CODE_SUCCESS) { // todo handle this
return code;
}
} else if (level == TASK_LEVEL__AGG) { // do transfer task operator states.
code = streamDoTransferStateToStreamTask(pTask);
if (code != TSDB_CODE_SUCCESS) { // todo handle this
return code;
}
} }
return code; return code;
@ -522,6 +525,7 @@ int32_t streamProcessTranstateBlock(SStreamTask* pTask, SStreamDataBlock* pBlock
ASSERT(pTask->streamTaskId.taskId != 0 && pTask->info.fillHistory == 1); ASSERT(pTask->streamTaskId.taskId != 0 && pTask->info.fillHistory == 1);
} }
// agg task should dispatch trans-state msg to sink task, to flush all data to sink task.
if (level == TASK_LEVEL__AGG || level == TASK_LEVEL__SOURCE) { if (level == TASK_LEVEL__AGG || level == TASK_LEVEL__SOURCE) {
pBlock->srcVgId = pTask->pMeta->vgId; pBlock->srcVgId = pTask->pMeta->vgId;
code = taosWriteQitem(pTask->outputInfo.queue->queue, pBlock); code = taosWriteQitem(pTask->outputInfo.queue->queue, pBlock);