fix:send rsp offset to client if unregister push mgr

This commit is contained in:
wangmm0220 2023-07-06 19:35:48 +08:00
parent b2d41ce907
commit 0928bd5510
4 changed files with 41 additions and 20 deletions

View File

@ -1928,7 +1928,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
// update the local offset value only for the returned values, only when the local offset is NOT updated
// by tmq_offset_seek function
if (!pVg->seekUpdated) {
tscDebug("consumer:0x%" PRIx64" local offset is update, since seekupdate not set", tmq->consumerId);
tscDebug("consumer:0x%" PRIx64" local offset is update, since seekupdate not set, rsp offset:%d,%"PRId64, tmq->consumerId, pDataRsp->rspOffset.type, pDataRsp->rspOffset.version);
pVg->offsetInfo.currentOffset = pDataRsp->rspOffset;
} else {
tscDebug("consumer:0x%" PRIx64" local offset is NOT update, since seekupdate is set", tmq->consumerId);

View File

@ -151,7 +151,8 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols, int8_t precision);
int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp,
int32_t type, int32_t vgId);
int32_t tqPushDataRsp(STqHandle* pHandle, int32_t vgId);
//int32_t tqPushDataRsp(STqHandle* pHandle, int32_t vgId);
int32_t tqPushEmptyDataRsp(STqHandle* pHandle, int32_t vgId);
// tqMeta
int32_t tqMetaOpen(STQ* pTq);

View File

@ -232,26 +232,43 @@ static int32_t doSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqData
return 0;
}
int32_t tqPushDataRsp(STqHandle* pHandle, int32_t vgId) {
int32_t tqPushEmptyDataRsp(STqHandle* pHandle, int32_t vgId) {
SMqPollReq req = {0};
if (tDeserializeSMqPollReq(pHandle->msg->pCont, pHandle->msg->contLen, &req) < 0) {
tqError("tDeserializeSMqPollReq %d failed", pHandle->msg->contLen);
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
SMqDataRsp dataRsp = {0};
dataRsp.head.consumerId = pHandle->consumerId;
dataRsp.head.epoch = pHandle->epoch;
dataRsp.head.mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
int64_t sver = 0, ever = 0;
walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
tqDoSendDataRsp(&pHandle->msg->info, &dataRsp, pHandle->epoch, pHandle->consumerId, TMQ_MSG_TYPE__POLL_RSP, sver,
ever);
char buf1[TSDB_OFFSET_LEN] = {0};
char buf2[TSDB_OFFSET_LEN] = {0};
tFormatOffset(buf1, tListLen(buf1), &dataRsp.reqOffset);
tFormatOffset(buf2, tListLen(buf2), &dataRsp.rspOffset);
tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) push rsp, block num: %d, req:%s, rsp:%s", vgId,
dataRsp.head.consumerId, dataRsp.head.epoch, dataRsp.blockNum, buf1, buf2);
tqInitDataRsp(&dataRsp, &req);
dataRsp.blockNum = 0;
dataRsp.rspOffset = dataRsp.reqOffset;
tqSendDataRsp(pHandle, pHandle->msg, &req, &dataRsp, TMQ_MSG_TYPE__POLL_RSP, vgId);
tDeleteMqDataRsp(&dataRsp);
return 0;
}
//int32_t tqPushDataRsp(STqHandle* pHandle, int32_t vgId) {
// SMqDataRsp dataRsp = {0};
// dataRsp.head.consumerId = pHandle->consumerId;
// dataRsp.head.epoch = pHandle->epoch;
// dataRsp.head.mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
//
// int64_t sver = 0, ever = 0;
// walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
// tqDoSendDataRsp(&pHandle->msg->info, &dataRsp, pHandle->epoch, pHandle->consumerId, TMQ_MSG_TYPE__POLL_RSP, sver,
// ever);
//
// char buf1[TSDB_OFFSET_LEN] = {0};
// char buf2[TSDB_OFFSET_LEN] = {0};
// tFormatOffset(buf1, tListLen(buf1), &dataRsp.reqOffset);
// tFormatOffset(buf2, tListLen(buf2), &dataRsp.rspOffset);
// tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) push rsp, block num: %d, req:%s, rsp:%s", vgId,
// dataRsp.head.consumerId, dataRsp.head.epoch, dataRsp.blockNum, buf1, buf2);
// return 0;
//}
int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp,
int32_t type, int32_t vgId) {
int64_t sver = 0, ever = 0;

View File

@ -64,7 +64,9 @@ int32_t tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg) {
memcpy(pHandle->msg, pMsg, sizeof(SRpcMsg));
pHandle->msg->pCont = rpcMallocCont(pMsg->contLen);
} else {
tqPushDataRsp(pHandle, vgId);
// tqPushDataRsp(pHandle, vgId);
tqPushEmptyDataRsp(pHandle, vgId);
void* tmp = pHandle->msg->pCont;
memcpy(pHandle->msg, pMsg, sizeof(SRpcMsg));
pHandle->msg->pCont = tmp;
@ -89,7 +91,8 @@ int32_t tqUnregisterPushHandle(STQ* pTq, void *handle) {
tqDebug("vgId:%d remove pHandle:%p,ret:%d consumer Id:0x%" PRIx64, vgId, pHandle, ret, pHandle->consumerId);
if(pHandle->msg != NULL) {
tqPushDataRsp(pHandle, vgId);
// tqPushDataRsp(pHandle, vgId);
tqPushEmptyDataRsp(pHandle, vgId);
rpcFreeCont(pHandle->msg->pCont);
taosMemoryFree(pHandle->msg);