fix:add committed & seek process logic
This commit is contained in:
parent
8093ef2dc8
commit
cddf27e8b4
|
@ -1251,7 +1251,8 @@ TEST(clientCase, td_25129) {
|
|||
}
|
||||
|
||||
for(int i = 0; i < numOfAssign; i++){
|
||||
printf("assign i:%d, vgId:%d, offset:%lld, start:%lld, end:%lld\n", i, pAssign[i].vgId, pAssign[i].currentOffset, pAssign[i].begin, pAssign[i].end);
|
||||
int64_t committed = tmq_committed(tmq, topicName, pAssign[i].vgId);
|
||||
printf("assign i:%d, vgId:%d, committed:%lld, offset:%lld, start:%lld, end:%lld\n", i, pAssign[i].vgId, committed, pAssign[i].currentOffset, pAssign[i].begin, pAssign[i].end);
|
||||
}
|
||||
|
||||
while (1) {
|
||||
|
|
|
@ -692,6 +692,7 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
|
|||
taosArrayDestroy(rebOutput.modifyConsumers);
|
||||
taosArrayDestroy(rebOutput.rebVgs);
|
||||
|
||||
taosHashCancelIterate(pReq->rebSubHash, pIter);
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
mInfo("mq re-balance failed, due to out of memory");
|
||||
taosHashCleanup(pReq->rebSubHash);
|
||||
|
|
|
@ -336,10 +336,10 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t
|
|||
STqOffset* pOffset = &vgOffset.offset;
|
||||
|
||||
if (pOffset->val.type == TMQ_OFFSET__SNAPSHOT_DATA || pOffset->val.type == TMQ_OFFSET__SNAPSHOT_META) {
|
||||
tqInfo("receive offset commit msg to %s on vgId:%d, offset(type:snapshot) uid:%" PRId64 ", ts:%" PRId64,
|
||||
tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:snapshot) uid:%" PRId64 ", ts:%" PRId64,
|
||||
pOffset->subKey, vgId, pOffset->val.uid, pOffset->val.ts);
|
||||
} else if (pOffset->val.type == TMQ_OFFSET__LOG) {
|
||||
tqInfo("receive offset commit msg to %s on vgId:%d, offset(type:log) version:%" PRId64, pOffset->subKey, vgId,
|
||||
tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:log) version:%" PRId64, pOffset->subKey, vgId,
|
||||
pOffset->val.version);
|
||||
} else {
|
||||
tqError("invalid commit offset type:%d", pOffset->val.type);
|
||||
|
@ -367,12 +367,12 @@ int32_t tqProcessSeekReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
SRpcMsg rsp = {.info = pMsg->info};
|
||||
int code = 0;
|
||||
|
||||
tqDebug("tmq seek: consumer:0x%" PRIx64 " vgId:%d, subkey %s", req.consumerId, vgId, req.subKey);
|
||||
if (tDeserializeSMqSeekReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto end;
|
||||
}
|
||||
|
||||
tqDebug("tmq seek: consumer:0x%" PRIx64 " vgId:%d, subkey %s", req.consumerId, vgId, req.subKey);
|
||||
STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
|
||||
if (pHandle == NULL) {
|
||||
tqWarn("tmq seek: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", req.consumerId, vgId, req.subKey);
|
||||
|
|
|
@ -624,6 +624,11 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
|
|||
// return tqProcessPollReq(pVnode->pTq, pMsg);
|
||||
case TDMT_VND_TMQ_VG_WALINFO:
|
||||
return tqProcessVgWalInfoReq(pVnode->pTq, pMsg);
|
||||
case TDMT_VND_TMQ_VG_COMMITTEDINFO:
|
||||
return tqProcessVgCommittedInfoReq(pVnode->pTq, pMsg);
|
||||
case TDMT_VND_TMQ_SEEK:
|
||||
return tqProcessSeekReq(pVnode->pTq, pMsg);
|
||||
|
||||
default:
|
||||
vError("unknown msg type:%d in fetch queue", pMsg->msgType);
|
||||
return TSDB_CODE_APP_ERROR;
|
||||
|
|
Loading…
Reference in New Issue