fix(stream): fix error in drop task.

This commit is contained in:
Haojun Liao 2023-09-15 00:51:13 +08:00
parent 73c2782d78
commit 117de7ab7d
2 changed files with 2 additions and 8 deletions

View File

@ -1330,14 +1330,7 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
int32_t tqProcessTaskDropReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { int32_t tqProcessTaskDropReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg; SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
tqDebug("vgId:%d receive msg to drop stream task:0x%x", TD_VID(pTq->pVnode), pReq->taskId); tqDebug("vgId:%d receive msg to drop stream task:0x%x", TD_VID(pTq->pVnode), pReq->taskId);
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->streamId, pReq->taskId);
if (pTask == NULL) {
tqError("vgId:%d failed to acquire s-task:0x%x when dropping it", pTq->pStreamMeta->vgId, pReq->taskId);
return 0;
}
streamMetaUnregisterTask(pTq->pStreamMeta, pReq->streamId, pReq->taskId); streamMetaUnregisterTask(pTq->pStreamMeta, pReq->streamId, pReq->taskId);
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
return 0; return 0;
} }

View File

@ -458,7 +458,8 @@ void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpS
SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->pUpstreamInfoList, i); SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->pUpstreamInfoList, i);
if (pInfo->nodeId == nodeId) { if (pInfo->nodeId == nodeId) {
epsetAssign(&pInfo->epSet, pEpSet); epsetAssign(&pInfo->epSet, pEpSet);
qDebug("s-task:0x%x update the upstreamInfo, nodeId:%d newEpset:%s", pTask->id.taskId, nodeId, buf); qDebug("s-task:0x%x update the upstreamInfo, nodeId:%d taskId:0x%x newEpset:%s", pTask->id.taskId, nodeId,
pInfo->taskId, buf);
break; break;
} }
} }