fix(tmq): offset
This commit is contained in:
parent
0f0fe6276a
commit
e74187248b
|
@ -1124,10 +1124,10 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||||
pRspWrapper->topicHandle = pTopic;
|
pRspWrapper->topicHandle = pTopic;
|
||||||
|
|
||||||
if (rspType == TMQ_MSG_TYPE__POLL_RSP) {
|
if (rspType == TMQ_MSG_TYPE__POLL_RSP) {
|
||||||
memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead));
|
|
||||||
SDecoder decoder;
|
SDecoder decoder;
|
||||||
tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
|
tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
|
||||||
tDecodeSMqDataRsp(&decoder, &pRspWrapper->dataRsp);
|
tDecodeSMqDataRsp(&decoder, &pRspWrapper->dataRsp);
|
||||||
|
memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead));
|
||||||
/*tDecodeSMqDataBlkRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRspWrapper->dataRsp);*/
|
/*tDecodeSMqDataBlkRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRspWrapper->dataRsp);*/
|
||||||
} else {
|
} else {
|
||||||
ASSERT(rspType == TMQ_MSG_TYPE__POLL_META_RSP);
|
ASSERT(rspType == TMQ_MSG_TYPE__POLL_META_RSP);
|
||||||
|
@ -1138,7 +1138,7 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||||
taosMemoryFree(pMsg->pData);
|
taosMemoryFree(pMsg->pData);
|
||||||
|
|
||||||
tscDebug("consumer %ld recv poll: vg %d, req offset %ld, rsp offset %ld, type %d", tmq->consumerId, pVg->vgId,
|
tscDebug("consumer %ld recv poll: vg %d, req offset %ld, rsp offset %ld, type %d", tmq->consumerId, pVg->vgId,
|
||||||
pRspWrapper->dataRsp.reqOffset, pRspWrapper->dataRsp.rspOffset, rspType);
|
pRspWrapper->dataRsp.reqOffset.version, pRspWrapper->dataRsp.rspOffset.version, rspType);
|
||||||
|
|
||||||
taosWriteQitem(tmq->mqueue, pRspWrapper);
|
taosWriteQitem(tmq->mqueue, pRspWrapper);
|
||||||
tsem_post(&tmq->rspSem);
|
tsem_post(&tmq->rspSem);
|
||||||
|
@ -1203,17 +1203,17 @@ bool tmqUpdateEp2(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) {
|
||||||
for (int32_t j = 0; j < vgNumGet; j++) {
|
for (int32_t j = 0; j < vgNumGet; j++) {
|
||||||
SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j);
|
SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j);
|
||||||
sprintf(vgKey, "%s:%d", topic.topicName, pVgEp->vgId);
|
sprintf(vgKey, "%s:%d", topic.topicName, pVgEp->vgId);
|
||||||
int64_t* pOffset = taosHashGet(pHash, vgKey, strlen(vgKey));
|
STqOffsetVal* pOffset = taosHashGet(pHash, vgKey, strlen(vgKey));
|
||||||
int64_t offset = tmq->resetOffsetCfg;
|
STqOffsetVal offsetNew = {.type = tmq->resetOffsetCfg};
|
||||||
if (pOffset != NULL) {
|
if (pOffset != NULL) {
|
||||||
offset = *pOffset;
|
offsetNew = *pOffset;
|
||||||
}
|
}
|
||||||
|
|
||||||
tscDebug("consumer %ld(epoch %d) offset of vg %d updated to %ld, vgKey is %s", tmq->consumerId, epoch,
|
/*tscDebug("consumer %ld(epoch %d) offset of vg %d updated to %ld, vgKey is %s", tmq->consumerId, epoch,*/
|
||||||
pVgEp->vgId, offset, vgKey);
|
/*pVgEp->vgId, offset, vgKey);*/
|
||||||
SMqClientVg clientVg = {
|
SMqClientVg clientVg = {
|
||||||
.pollCnt = 0,
|
.pollCnt = 0,
|
||||||
.currentOffsetNew.type = tmq->resetOffsetCfg,
|
.currentOffsetNew = offsetNew,
|
||||||
.vgId = pVgEp->vgId,
|
.vgId = pVgEp->vgId,
|
||||||
.epSet = pVgEp->epSet,
|
.epSet = pVgEp->epSet,
|
||||||
.vgStatus = TMQ_VG_STATUS__IDLE,
|
.vgStatus = TMQ_VG_STATUS__IDLE,
|
||||||
|
|
|
@ -142,7 +142,7 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con
|
||||||
void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
|
void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
|
||||||
|
|
||||||
SEncoder encoder;
|
SEncoder encoder;
|
||||||
tEncoderInit(&encoder, abuf, tlen);
|
tEncoderInit(&encoder, abuf, len);
|
||||||
tEncodeSMqDataRsp(&encoder, pRsp);
|
tEncodeSMqDataRsp(&encoder, pRsp);
|
||||||
/*tEncodeSMqDataBlkRsp(&abuf, pRsp);*/
|
/*tEncodeSMqDataBlkRsp(&abuf, pRsp);*/
|
||||||
|
|
||||||
|
@ -332,7 +332,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
||||||
if (tqFetchLog(pTq, pHandle, &fetchVer, &pHeadWithCkSum) < 0) {
|
if (tqFetchLog(pTq, pHandle, &fetchVer, &pHeadWithCkSum) < 0) {
|
||||||
// TODO add push mgr
|
// TODO add push mgr
|
||||||
|
|
||||||
tqOffsetResetToLog(&dataRsp.rspOffset, fetchVer);
|
tqOffsetResetToLog(&dataRsp.rspOffset, fetchVer - 1);
|
||||||
if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) {
|
if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) {
|
||||||
code = -1;
|
code = -1;
|
||||||
}
|
}
|
||||||
|
|
|
@ -134,6 +134,8 @@ endi
|
||||||
$totalMsgCons = $totalMsgOfOneTopic + $totalMsgOfStb
|
$totalMsgCons = $totalMsgOfOneTopic + $totalMsgOfStb
|
||||||
$sumOfMsgCnt = $data[0][2] + $data[1][2]
|
$sumOfMsgCnt = $data[0][2] + $data[1][2]
|
||||||
if $sumOfMsgCnt != $totalMsgCons then
|
if $sumOfMsgCnt != $totalMsgCons then
|
||||||
|
print total: $totalMsgCons
|
||||||
|
print sum: $sumOfMsgCnt
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue