fix(stream): handle the case that recvs drop msg twice.

This commit is contained in:
Haojun Liao 2023-12-21 11:52:45 +08:00
parent 8a8ddf5699
commit 71f6d9f06f
1 changed files with 7 additions and 3 deletions

View File

@ -1340,6 +1340,7 @@ int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg) {
return tqStreamTaskProcessTaskResetReq(pTq->pStreamMeta, pMsg); return tqStreamTaskProcessTaskResetReq(pTq->pStreamMeta, pMsg);
} }
// NOTE: here we may receive this message more than once, so need to handle this case
int32_t tqProcessTaskDropHTask(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskDropHTask(STQ* pTq, SRpcMsg* pMsg) {
SVDropHTaskReq* pReq = (SVDropHTaskReq*)pMsg->pCont; SVDropHTaskReq* pReq = (SVDropHTaskReq*)pMsg->pCont;
@ -1358,14 +1359,17 @@ int32_t tqProcessTaskDropHTask(STQ* pTq, SRpcMsg* pMsg) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
taosThreadMutexLock(&pTask->lock);
ETaskStatus status = streamTaskGetStatus(pTask, NULL); ETaskStatus status = streamTaskGetStatus(pTask, NULL);
ASSERT(status == TASK_STATUS__STREAM_SCAN_HISTORY); if (status == TASK_STATUS__STREAM_SCAN_HISTORY) {
streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE);
streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE); }
SStreamTaskId id = {.streamId = pTask->hTaskInfo.id.streamId, .taskId = pTask->hTaskInfo.id.taskId}; SStreamTaskId id = {.streamId = pTask->hTaskInfo.id.streamId, .taskId = pTask->hTaskInfo.id.taskId};
streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &id); streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &id);
taosThreadMutexUnlock(&pTask->lock);
// clear the scheduler status // clear the scheduler status
streamTaskSetSchedStatusInactive(pTask); streamTaskSetSchedStatusInactive(pTask);
tqDebug("s-task:%s set scheduler status:%d after drop fill-history task", pTask->id.idStr, pTask->status.schedStatus); tqDebug("s-task:%s set scheduler status:%d after drop fill-history task", pTask->id.idStr, pTask->status.schedStatus);