fix(taosx): reset latest offset

This commit is contained in:
Liu Jicong 2022-10-26 21:20:35 +08:00
parent 2a3448552e
commit 42422b7a65
1 changed files with 20 additions and 9 deletions

View File

@ -516,17 +516,28 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
tqOffsetResetToLog(&fetchOffsetNew, walGetFirstVer(pTq->pVnode->pWal)); tqOffsetResetToLog(&fetchOffsetNew, walGetFirstVer(pTq->pVnode->pWal));
} }
} else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) { } else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
SMqDataRsp dataRsp = {0}; if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
tqInitDataRsp(&dataRsp, pReq, pHandle->execHandle.subType); SMqDataRsp dataRsp = {0};
tqInitDataRsp(&dataRsp, pReq, pHandle->execHandle.subType);
tqOffsetResetToLog(&dataRsp.rspOffset, walGetLastVer(pTq->pVnode->pWal)); tqOffsetResetToLog(&dataRsp.rspOffset, walGetLastVer(pTq->pVnode->pWal));
tqDebug("tmq poll: consumer %" PRId64 ", subkey %s, vg %d, offset reset to %" PRId64, consumerId, tqDebug("tmq poll: consumer %" PRId64 ", subkey %s, vg %d, offset reset to %" PRId64, consumerId,
pHandle->subKey, TD_VID(pTq->pVnode), dataRsp.rspOffset.version); pHandle->subKey, TD_VID(pTq->pVnode), dataRsp.rspOffset.version);
if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) { if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) {
code = -1; code = -1;
}
tDeleteSMqDataRsp(&dataRsp);
return code;
} else {
STaosxRsp taosxRsp = {0};
tqInitTaosxRsp(&taosxRsp, pReq);
tqOffsetResetToLog(&taosxRsp.rspOffset, walGetLastVer(pTq->pVnode->pWal));
if (tqSendTaosxRsp(pTq, pMsg, pReq, &taosxRsp) < 0) {
code = -1;
}
tDeleteSTaosxRsp(&taosxRsp);
return code;
} }
tDeleteSMqDataRsp(&dataRsp);
return code;
} else if (reqOffset.type == TMQ_OFFSET__RESET_NONE) { } else if (reqOffset.type == TMQ_OFFSET__RESET_NONE) {
tqError("tmq poll: subkey %s, no offset committed for consumer %" PRId64 tqError("tmq poll: subkey %s, no offset committed for consumer %" PRId64
" in vg %d, subkey %s, reset none failed", " in vg %d, subkey %s, reset none failed",