fix:[TD-23972] push subscribe msg to vnode even though consumer not change

This commit is contained in:
wangmm0220 2023-05-06 11:06:41 +08:00
parent 55eddbfb5e
commit 446097434e
1 changed files with 8 additions and 1 deletions

View File

@ -165,12 +165,19 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
SRpcMsg* pMsg, STqOffsetVal* pOffset) { SRpcMsg* pMsg, STqOffsetVal* pOffset) {
uint64_t consumerId = pRequest->consumerId; uint64_t consumerId = pRequest->consumerId;
int32_t vgId = TD_VID(pTq->pVnode); int32_t vgId = TD_VID(pTq->pVnode);
int code = 0;
SMqDataRsp dataRsp = {0}; SMqDataRsp dataRsp = {0};
tqInitDataRsp(&dataRsp, pRequest, pHandle->execHandle.subType); tqInitDataRsp(&dataRsp, pRequest, pHandle->execHandle.subType);
qTaskInfo_t task = pHandle->execHandle.task;
if(qTaskIsExecuting(task)){
code = tqSendDataRsp(pTq, pMsg, pRequest, &dataRsp, TMQ_MSG_TYPE__POLL_RSP);
tDeleteSMqDataRsp(&dataRsp);
return code;
}
qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId); qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId);
int code = tqScanData(pTq, pHandle, &dataRsp, pOffset); code = tqScanData(pTq, pHandle, &dataRsp, pOffset);
if(code != 0) { if(code != 0) {
goto end; goto end;
} }