fix(stream): drop related fill-history task when dropping stream tasks.
This commit is contained in:
parent
83f84d92bc
commit
b0a4ed3217
|
@ -247,7 +247,7 @@ int32_t tqProcessVgCommittedInfoReq(STQ* pTq, SRpcMsg* pMsg);
|
||||||
|
|
||||||
// tq-stream
|
// tq-stream
|
||||||
int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
|
int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
|
||||||
int32_t tqProcessTaskDropReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
|
int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen);
|
||||||
int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
|
int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
|
||||||
int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
|
int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
|
||||||
int32_t tqProcessTaskCheckReq(STQ* pTq, SRpcMsg* pMsg);
|
int32_t tqProcessTaskCheckReq(STQ* pTq, SRpcMsg* pMsg);
|
||||||
|
|
|
@ -1406,20 +1406,36 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqProcessTaskDropReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
|
int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen) {
|
||||||
SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
|
SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
|
||||||
tqDebug("vgId:%d receive msg to drop stream task:0x%x", TD_VID(pTq->pVnode), pReq->taskId);
|
|
||||||
streamMetaUnregisterTask(pTq->pStreamMeta, pReq->streamId, pReq->taskId);
|
int32_t vgId = TD_VID(pTq->pVnode);
|
||||||
|
SStreamMeta* pMeta = pTq->pStreamMeta;
|
||||||
|
tqDebug("vgId:%d receive msg to drop s-task:0x%x", vgId, pReq->taskId);
|
||||||
|
|
||||||
|
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId);
|
||||||
|
if (pTask != NULL) {
|
||||||
|
// drop the related fill-history task firstly
|
||||||
|
if (pTask->hTaskInfo.id.taskId != 0) {
|
||||||
|
STaskId* pHTaskId = &pTask->hTaskInfo.id;
|
||||||
|
streamMetaUnregisterTask(pMeta, pHTaskId->streamId, pHTaskId->taskId);
|
||||||
|
tqDebug("vgId:%d drop fill-history task:0x%x dropped firstly", vgId, (int32_t)pHTaskId->taskId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
|
|
||||||
|
// drop the stream task now
|
||||||
|
streamMetaUnregisterTask(pMeta, pReq->streamId, pReq->taskId);
|
||||||
|
|
||||||
// commit the update
|
// commit the update
|
||||||
taosWLockLatch(&pTq->pStreamMeta->lock);
|
taosWLockLatch(&pMeta->lock);
|
||||||
int32_t numOfTasks = streamMetaGetNumOfTasks(pTq->pStreamMeta);
|
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
|
||||||
tqDebug("vgId:%d task:0x%x dropped, remain tasks:%d", TD_VID(pTq->pVnode), pReq->taskId, numOfTasks);
|
tqDebug("vgId:%d task:0x%x dropped, remain tasks:%d", vgId, pReq->taskId, numOfTasks);
|
||||||
|
|
||||||
if (streamMetaCommit(pTq->pStreamMeta) < 0) {
|
if (streamMetaCommit(pMeta) < 0) {
|
||||||
// persist to disk
|
// persist to disk
|
||||||
}
|
}
|
||||||
taosWUnLockLatch(&pTq->pStreamMeta->lock);
|
taosWUnLockLatch(&pMeta->lock);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -567,7 +567,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg
|
||||||
}
|
}
|
||||||
} break;
|
} break;
|
||||||
case TDMT_STREAM_TASK_DROP: {
|
case TDMT_STREAM_TASK_DROP: {
|
||||||
if (tqProcessTaskDropReq(pVnode->pTq, ver, pMsg->pCont, pMsg->contLen) < 0) {
|
if (tqProcessTaskDropReq(pVnode->pTq, pMsg->pCont, pMsg->contLen) < 0) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
} break;
|
} break;
|
||||||
|
|
Loading…
Reference in New Issue