enh: refact tmq pull message
This commit is contained in:
parent
6d774dee4e
commit
c66390d401
|
@ -2948,6 +2948,10 @@ typedef struct {
|
|||
STqOffsetVal reqOffset;
|
||||
} SMqPollReq;
|
||||
|
||||
int32_t tSerializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq);
|
||||
int32_t tDeserializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq);
|
||||
|
||||
|
||||
typedef struct {
|
||||
int32_t vgId;
|
||||
int64_t offset;
|
||||
|
|
|
@ -1461,12 +1461,7 @@ int32_t tmqAskEp(tmq_t* tmq, bool async) {
|
|||
return code;
|
||||
}
|
||||
|
||||
SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t timeout, SMqClientTopic* pTopic, SMqClientVg* pVg) {
|
||||
SMqPollReq* pReq = taosMemoryCalloc(1, sizeof(SMqPollReq));
|
||||
if (pReq == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
void tmqBuildConsumeReqImpl(SMqPollReq *pReq, tmq_t* tmq, int64_t timeout, SMqClientTopic* pTopic, SMqClientVg* pVg) {
|
||||
/*strcpy(pReq->topic, pTopic->topicName);*/
|
||||
/*strcpy(pReq->cgroup, tmq->groupId);*/
|
||||
|
||||
|
@ -1485,9 +1480,7 @@ SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t timeout, SMqClientTopic*
|
|||
|
||||
pReq->useSnapshot = tmq->useSnapshot;
|
||||
|
||||
pReq->head.vgId = htonl(pVg->vgId);
|
||||
pReq->head.contLen = htonl(sizeof(SMqPollReq));
|
||||
return pReq;
|
||||
pReq->head.vgId = pVg->vgId;
|
||||
}
|
||||
|
||||
SMqMetaRspObj* tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper) {
|
||||
|
@ -1559,15 +1552,32 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
|
|||
#endif
|
||||
}
|
||||
atomic_store_32(&pVg->vgSkipCnt, 0);
|
||||
SMqPollReq* pReq = tmqBuildConsumeReqImpl(tmq, timeout, pTopic, pVg);
|
||||
if (pReq == NULL) {
|
||||
|
||||
SMqPollReq req = {0};
|
||||
tmqBuildConsumeReqImpl(&req, tmq, timeout, pTopic, pVg);
|
||||
int32_t msgSize = tSerializeSMqPollReq(NULL, 0, &req);
|
||||
if (msgSize < 0) {
|
||||
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
|
||||
tsem_post(&tmq->rspSem);
|
||||
return -1;
|
||||
}
|
||||
char *msg = taosMemoryCalloc(1, msgSize);
|
||||
if (NULL == msg) {
|
||||
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
|
||||
tsem_post(&tmq->rspSem);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (tSerializeSMqPollReq(msg, msgSize, &req) < 0) {
|
||||
taosMemoryFree(msg);
|
||||
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
|
||||
tsem_post(&tmq->rspSem);
|
||||
return -1;
|
||||
}
|
||||
|
||||
SMqPollCbParam* pParam = taosMemoryMalloc(sizeof(SMqPollCbParam));
|
||||
if (pParam == NULL) {
|
||||
taosMemoryFree(pReq);
|
||||
taosMemoryFree(msg);
|
||||
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
|
||||
tsem_post(&tmq->rspSem);
|
||||
return -1;
|
||||
|
@ -1581,7 +1591,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
|
|||
|
||||
SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
|
||||
if (sendInfo == NULL) {
|
||||
taosMemoryFree(pReq);
|
||||
taosMemoryFree(msg);
|
||||
taosMemoryFree(pParam);
|
||||
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
|
||||
tsem_post(&tmq->rspSem);
|
||||
|
@ -1589,11 +1599,11 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
|
|||
}
|
||||
|
||||
sendInfo->msgInfo = (SDataBuf){
|
||||
.pData = pReq,
|
||||
.len = sizeof(SMqPollReq),
|
||||
.pData = msg,
|
||||
.len = msgSize,
|
||||
.handle = NULL,
|
||||
};
|
||||
sendInfo->requestId = pReq->reqId;
|
||||
sendInfo->requestId = req.reqId;
|
||||
sendInfo->requestObjRefId = 0;
|
||||
sendInfo->param = pParam;
|
||||
sendInfo->fp = tmqPollCb;
|
||||
|
@ -1605,7 +1615,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
|
|||
char offsetFormatBuf[80];
|
||||
tFormatOffset(offsetFormatBuf, 80, &pVg->currentOffset);
|
||||
tscDebug("consumer:%" PRId64 ", send poll to %s vgId:%d, epoch %d, req offset:%s, reqId:%" PRIu64,
|
||||
tmq->consumerId, pTopic->topicName, pVg->vgId, tmq->epoch, offsetFormatBuf, pReq->reqId);
|
||||
tmq->consumerId, pTopic->topicName, pVg->vgId, tmq->epoch, offsetFormatBuf, req.reqId);
|
||||
/*printf("send vgId:%d %" PRId64 "\n", pVg->vgId, pVg->currentOffset);*/
|
||||
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
|
||||
pVg->pollCnt++;
|
||||
|
|
|
@ -4766,7 +4766,7 @@ int32_t tSerializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq) {
|
|||
|
||||
if (buf != NULL) {
|
||||
SMsgHead *pHead = (SMsgHead *)((char *)buf - headLen);
|
||||
pHead->vgId = htonl(pReq->header.vgId);
|
||||
pHead->vgId = htonl(pReq->head.vgId);
|
||||
pHead->contLen = htonl(tlen + headLen);
|
||||
}
|
||||
|
||||
|
@ -4777,8 +4777,8 @@ int32_t tDeserializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq) {
|
|||
int32_t headLen = sizeof(SMsgHead);
|
||||
|
||||
SMsgHead *pHead = buf;
|
||||
pHead->vgId = pReq->header.vgId;
|
||||
pHead->contLen = pReq->header.contLen;
|
||||
pHead->vgId = pReq->head.vgId;
|
||||
pHead->contLen = pReq->head.contLen;
|
||||
|
||||
SDecoder decoder = {0};
|
||||
tDecoderInit(&decoder, (char *)buf + headLen, bufLen - headLen);
|
||||
|
|
|
@ -458,20 +458,26 @@ static int32_t tqInitTaosxRsp(STaosxRsp* pRsp, const SMqPollReq* pReq) {
|
|||
}
|
||||
|
||||
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||
SMqPollReq* pReq = pMsg->pCont;
|
||||
int64_t consumerId = pReq->consumerId;
|
||||
int32_t reqEpoch = pReq->epoch;
|
||||
SMqPollReq req = {0};
|
||||
int32_t code = 0;
|
||||
STqOffsetVal reqOffset = pReq->reqOffset;
|
||||
STqOffsetVal fetchOffsetNew;
|
||||
SWalCkHead* pCkHead = NULL;
|
||||
|
||||
if (tDeserializeSMqPollReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
|
||||
tqError("tDeserializeSMqPollReq %d failed", pMsg->contLen);
|
||||
return -1;
|
||||
}
|
||||
|
||||
int64_t consumerId = req.consumerId;
|
||||
int32_t reqEpoch = req.epoch;
|
||||
STqOffsetVal reqOffset = req.reqOffset;
|
||||
|
||||
// 1.find handle
|
||||
STqHandle* pHandle = taosHashGet(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
|
||||
STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
|
||||
/*ASSERT(pHandle);*/
|
||||
if (pHandle == NULL) {
|
||||
tqError("tmq poll: no consumer handle for consumer:%" PRId64 ", in vgId:%d, subkey %s", consumerId,
|
||||
TD_VID(pTq->pVnode), pReq->subKey);
|
||||
TD_VID(pTq->pVnode), req.subKey);
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -479,7 +485,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
if (pHandle->consumerId != consumerId) {
|
||||
tqError("tmq poll: consumer handle mismatch for consumer:%" PRId64
|
||||
", in vgId:%d, subkey %s, handle consumer id %" PRId64,
|
||||
consumerId, TD_VID(pTq->pVnode), pReq->subKey, pHandle->consumerId);
|
||||
consumerId, TD_VID(pTq->pVnode), req.subKey, pHandle->consumerId);
|
||||
terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
|
||||
return -1;
|
||||
}
|
||||
|
@ -493,13 +499,13 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
char buf[80];
|
||||
tFormatOffset(buf, 80, &reqOffset);
|
||||
tqDebug("tmq poll: consumer %" PRId64 " (epoch %d), subkey %s, recv poll req in vg %d, req offset %s", consumerId,
|
||||
pReq->epoch, pHandle->subKey, TD_VID(pTq->pVnode), buf);
|
||||
req.epoch, pHandle->subKey, TD_VID(pTq->pVnode), buf);
|
||||
|
||||
// 2.reset offset if needed
|
||||
if (reqOffset.type > 0) {
|
||||
fetchOffsetNew = reqOffset;
|
||||
} else {
|
||||
STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, pReq->subKey);
|
||||
STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, req.subKey);
|
||||
if (pOffset != NULL) {
|
||||
fetchOffsetNew = pOffset->val;
|
||||
char formatBuf[80];
|
||||
|
@ -508,7 +514,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
TD_VID(pTq->pVnode), formatBuf);
|
||||
} else {
|
||||
if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEAST) {
|
||||
if (pReq->useSnapshot) {
|
||||
if (req.useSnapshot) {
|
||||
if (pHandle->fetchMeta) {
|
||||
tqOffsetResetToMeta(&fetchOffsetNew, 0);
|
||||
} else {
|
||||
|
@ -520,21 +526,21 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
} else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
|
||||
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||
SMqDataRsp dataRsp = {0};
|
||||
tqInitDataRsp(&dataRsp, pReq, pHandle->execHandle.subType);
|
||||
tqInitDataRsp(&dataRsp, &req, pHandle->execHandle.subType);
|
||||
|
||||
tqOffsetResetToLog(&dataRsp.rspOffset, walGetLastVer(pTq->pVnode->pWal));
|
||||
tqDebug("tmq poll: consumer %" PRId64 ", subkey %s, vg %d, offset reset to %" PRId64, consumerId,
|
||||
pHandle->subKey, TD_VID(pTq->pVnode), dataRsp.rspOffset.version);
|
||||
if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) {
|
||||
if (tqSendDataRsp(pTq, pMsg, &req, &dataRsp) < 0) {
|
||||
code = -1;
|
||||
}
|
||||
tDeleteSMqDataRsp(&dataRsp);
|
||||
return code;
|
||||
} else {
|
||||
STaosxRsp taosxRsp = {0};
|
||||
tqInitTaosxRsp(&taosxRsp, pReq);
|
||||
tqInitTaosxRsp(&taosxRsp, &req);
|
||||
tqOffsetResetToLog(&taosxRsp.rspOffset, walGetLastVer(pTq->pVnode->pWal));
|
||||
if (tqSendTaosxRsp(pTq, pMsg, pReq, &taosxRsp) < 0) {
|
||||
if (tqSendTaosxRsp(pTq, pMsg, &req, &taosxRsp) < 0) {
|
||||
code = -1;
|
||||
}
|
||||
tDeleteSTaosxRsp(&taosxRsp);
|
||||
|
@ -543,7 +549,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
} else if (reqOffset.type == TMQ_OFFSET__RESET_NONE) {
|
||||
tqError("tmq poll: subkey %s, no offset committed for consumer %" PRId64
|
||||
" in vg %d, subkey %s, reset none failed",
|
||||
pHandle->subKey, consumerId, TD_VID(pTq->pVnode), pReq->subKey);
|
||||
pHandle->subKey, consumerId, TD_VID(pTq->pVnode), req.subKey);
|
||||
terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
|
||||
return -1;
|
||||
}
|
||||
|
@ -552,7 +558,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
|
||||
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||
SMqDataRsp dataRsp = {0};
|
||||
tqInitDataRsp(&dataRsp, pReq, pHandle->execHandle.subType);
|
||||
tqInitDataRsp(&dataRsp, &req, pHandle->execHandle.subType);
|
||||
// lock
|
||||
taosWLockLatch(&pTq->pushLock);
|
||||
tqScanData(pTq, pHandle, &dataRsp, &fetchOffsetNew);
|
||||
|
@ -580,7 +586,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
#endif
|
||||
taosWUnLockLatch(&pTq->pushLock);
|
||||
|
||||
if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) {
|
||||
if (tqSendDataRsp(pTq, pMsg, &req, &dataRsp) < 0) {
|
||||
code = -1;
|
||||
}
|
||||
|
||||
|
@ -599,13 +605,13 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
SMqMetaRsp metaRsp = {0};
|
||||
|
||||
STaosxRsp taosxRsp = {0};
|
||||
tqInitTaosxRsp(&taosxRsp, pReq);
|
||||
tqInitTaosxRsp(&taosxRsp, &req);
|
||||
|
||||
if (fetchOffsetNew.type != TMQ_OFFSET__LOG) {
|
||||
tqScanTaosx(pTq, pHandle, &taosxRsp, &metaRsp, &fetchOffsetNew);
|
||||
|
||||
if (metaRsp.metaRspLen > 0) {
|
||||
if (tqSendMetaPollRsp(pTq, pMsg, pReq, &metaRsp) < 0) {
|
||||
if (tqSendMetaPollRsp(pTq, pMsg, &req, &metaRsp) < 0) {
|
||||
code = -1;
|
||||
}
|
||||
tqDebug("tmq poll: consumer %" PRId64 ", subkey %s, vg %d, send meta offset type:%d,uid:%" PRId64
|
||||
|
@ -618,7 +624,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
}
|
||||
|
||||
if (taosxRsp.blockNum > 0) {
|
||||
if (tqSendTaosxRsp(pTq, pMsg, pReq, &taosxRsp) < 0) {
|
||||
if (tqSendTaosxRsp(pTq, pMsg, &req, &taosxRsp) < 0) {
|
||||
code = -1;
|
||||
}
|
||||
tDeleteSTaosxRsp(&taosxRsp);
|
||||
|
@ -648,13 +654,13 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
if (consumerEpoch > reqEpoch) {
|
||||
tqWarn("tmq poll: consumer %" PRId64 " (epoch %d), subkey %s, vg %d offset %" PRId64
|
||||
", found new consumer epoch %d, discard req epoch %d",
|
||||
consumerId, pReq->epoch, pHandle->subKey, TD_VID(pTq->pVnode), fetchVer, consumerEpoch, reqEpoch);
|
||||
consumerId, req.epoch, pHandle->subKey, TD_VID(pTq->pVnode), fetchVer, consumerEpoch, reqEpoch);
|
||||
break;
|
||||
}
|
||||
|
||||
if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead) < 0) {
|
||||
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
|
||||
if (tqSendTaosxRsp(pTq, pMsg, pReq, &taosxRsp) < 0) {
|
||||
if (tqSendTaosxRsp(pTq, pMsg, &req, &taosxRsp) < 0) {
|
||||
code = -1;
|
||||
}
|
||||
tDeleteSTaosxRsp(&taosxRsp);
|
||||
|
@ -665,7 +671,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
SWalCont* pHead = &pCkHead->head;
|
||||
|
||||
tqDebug("tmq poll: consumer:%" PRId64 ", (epoch %d) iter log, vgId:%d offset %" PRId64 " msgType %d", consumerId,
|
||||
pReq->epoch, TD_VID(pTq->pVnode), fetchVer, pHead->msgType);
|
||||
req.epoch, TD_VID(pTq->pVnode), fetchVer, pHead->msgType);
|
||||
|
||||
if (pHead->msgType == TDMT_VND_SUBMIT) {
|
||||
SSubmitReq* pCont = (SSubmitReq*)&pHead->body;
|
||||
|
@ -675,7 +681,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
}
|
||||
if (taosxRsp.blockNum > 0 /* threshold */) {
|
||||
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
|
||||
if (tqSendTaosxRsp(pTq, pMsg, pReq, &taosxRsp) < 0) {
|
||||
if (tqSendTaosxRsp(pTq, pMsg, &req, &taosxRsp) < 0) {
|
||||
code = -1;
|
||||
}
|
||||
tDeleteSTaosxRsp(&taosxRsp);
|
||||
|
@ -693,7 +699,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
metaRsp.resMsgType = pHead->msgType;
|
||||
metaRsp.metaRspLen = pHead->bodyLen;
|
||||
metaRsp.metaRsp = pHead->body;
|
||||
if (tqSendMetaPollRsp(pTq, pMsg, pReq, &metaRsp) < 0) {
|
||||
if (tqSendMetaPollRsp(pTq, pMsg, &req, &metaRsp) < 0) {
|
||||
code = -1;
|
||||
taosMemoryFreeClear(pCkHead);
|
||||
tDeleteSTaosxRsp(&taosxRsp);
|
||||
|
|
Loading…
Reference in New Issue