Merge pull request #21961 from taosdata/fix/TD-25129

fix:bugs in tmq_get_topic_assignment
This commit is contained in:
Haojun Liao 2023-07-07 17:11:33 +08:00 committed by GitHub
commit 6c71f21cb2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 53 additions and 80 deletions

View File

@ -1877,6 +1877,23 @@ static int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* p
return 0; return 0;
} }
static void updateVgInfo(SMqClientVg* pVg, STqOffsetVal* offset, int64_t sver, int64_t ever, int64_t consumerId){
if (!pVg->seekUpdated) {
tscDebug("consumer:0x%" PRIx64" local offset is update, since seekupdate not set", consumerId);
pVg->offsetInfo.currentOffset = *offset;
} else {
tscDebug("consumer:0x%" PRIx64" local offset is NOT update, since seekupdate is set", consumerId);
}
// update the status
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
// update the valid wal version range
pVg->offsetInfo.walVerBegin = sver;
pVg->offsetInfo.walVerEnd = ever;
pVg->receivedInfoFromVnode = true;
}
static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
tscDebug("consumer:0x%" PRIx64 " start to handle the rsp, total:%d", tmq->consumerId, tmq->qall->numOfItems); tscDebug("consumer:0x%" PRIx64 " start to handle the rsp, total:%d", tmq->consumerId, tmq->qall->numOfItems);
@ -1925,22 +1942,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
pVg->epSet = *pollRspWrapper->pEpset; pVg->epSet = *pollRspWrapper->pEpset;
} }
// update the local offset value only for the returned values, only when the local offset is NOT updated updateVgInfo(pVg, &pDataRsp->rspOffset, pDataRsp->head.walsver, pDataRsp->head.walever, tmq->consumerId);
// by tmq_offset_seek function
if (!pVg->seekUpdated) {
tscDebug("consumer:0x%" PRIx64" local offset is update, since seekupdate not set", tmq->consumerId);
pVg->offsetInfo.currentOffset = pDataRsp->rspOffset;
} else {
tscDebug("consumer:0x%" PRIx64" local offset is NOT update, since seekupdate is set", tmq->consumerId);
}
// update the status
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
// update the valid wal version range
pVg->offsetInfo.walVerBegin = pDataRsp->head.walsver;
pVg->offsetInfo.walVerEnd = pDataRsp->head.walever;
pVg->receivedInfoFromVnode = true;
char buf[TSDB_OFFSET_LEN]; char buf[TSDB_OFFSET_LEN];
tFormatOffset(buf, TSDB_OFFSET_LEN, &pDataRsp->rspOffset); tFormatOffset(buf, TSDB_OFFSET_LEN, &pDataRsp->rspOffset);
@ -1990,11 +1992,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
return NULL; return NULL;
} }
if(pollRspWrapper->metaRsp.rspOffset.type != 0){ // if offset is validate updateVgInfo(pVg, &pollRspWrapper->metaRsp.rspOffset, pollRspWrapper->metaRsp.head.walsver, pollRspWrapper->metaRsp.head.walever, tmq->consumerId);
pVg->offsetInfo.currentOffset = pollRspWrapper->metaRsp.rspOffset;
}
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
// build rsp // build rsp
SMqMetaRspObj* pRsp = tmqBuildMetaRspFromWrapper(pollRspWrapper); SMqMetaRspObj* pRsp = tmqBuildMetaRspFromWrapper(pollRspWrapper);
taosFreeQitem(pollRspWrapper); taosFreeQitem(pollRspWrapper);
@ -2022,18 +2020,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
return NULL; return NULL;
} }
// update the local offset value only for the returned values, only when the local offset is NOT updated updateVgInfo(pVg, &pollRspWrapper->taosxRsp.rspOffset, pollRspWrapper->taosxRsp.head.walsver, pollRspWrapper->taosxRsp.head.walever, tmq->consumerId);
// by tmq_offset_seek function
if (!pVg->seekUpdated) {
if(pollRspWrapper->taosxRsp.rspOffset.type != 0) { // if offset is validate
tscDebug("consumer:0x%" PRIx64" local offset is update, since seekupdate not set", tmq->consumerId);
pVg->offsetInfo.currentOffset = pollRspWrapper->taosxRsp.rspOffset;
}
} else {
tscDebug("consumer:0x%" PRIx64" local offset is NOT update, since seekupdate is set", tmq->consumerId);
}
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
if (pollRspWrapper->taosxRsp.blockNum == 0) { if (pollRspWrapper->taosxRsp.blockNum == 0) {
tscDebug("consumer:0x%" PRIx64 " taosx empty block received, vgId:%d, vg total:%" PRId64 ", reqId:0x%" PRIx64, tscDebug("consumer:0x%" PRIx64 " taosx empty block received, vgId:%d, vg total:%" PRId64 ", reqId:0x%" PRIx64,
@ -2615,6 +2602,8 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
pAssignment->begin = pClientVg->offsetInfo.walVerBegin; pAssignment->begin = pClientVg->offsetInfo.walVerBegin;
pAssignment->end = pClientVg->offsetInfo.walVerEnd; pAssignment->end = pClientVg->offsetInfo.walVerEnd;
pAssignment->vgId = pClientVg->vgId; pAssignment->vgId = pClientVg->vgId;
tscInfo("consumer:0x%" PRIx64 " get assignment from local:%d->%" PRId64, tmq->consumerId,
pAssignment->vgId, pAssignment->currentOffset);
} }
if (needFetch) { if (needFetch) {
@ -2690,7 +2679,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pClientVg->offsetInfo.currentOffset); tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pClientVg->offsetInfo.currentOffset);
tscInfo("consumer:0x%" PRIx64 " %s retrieve wal info vgId:%d, epoch %d, req:%s, reqId:0x%" PRIx64, tscInfo("consumer:0x%" PRIx64 " %s retrieve wal info vgId:%d, epoch %d, req:%s, reqId:0x%" PRIx64,
tmq->consumerId, pTopic->topicName, pClientVg->vgId, tmq->epoch, offsetFormatBuf, req.reqId); tmq->consumerId, pTopic->topicName, pClientVg->vgId, tmq->epoch, offsetFormatBuf, req.reqId);
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pClientVg->epSet, &transporterId, sendInfo); asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pClientVg->epSet, &transporterId, sendInfo);
} }
@ -2718,17 +2707,17 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
SVgOffsetInfo* pOffsetInfo = &pClientVg->offsetInfo; SVgOffsetInfo* pOffsetInfo = &pClientVg->offsetInfo;
pOffsetInfo->currentOffset.type = TMQ_OFFSET__LOG; // pOffsetInfo->currentOffset.type = TMQ_OFFSET__LOG;
char offsetBuf[TSDB_OFFSET_LEN] = {0}; // char offsetBuf[TSDB_OFFSET_LEN] = {0};
tFormatOffset(offsetBuf, tListLen(offsetBuf), &pOffsetInfo->currentOffset); // tFormatOffset(offsetBuf, tListLen(offsetBuf), &pOffsetInfo->currentOffset);
tscInfo("vgId:%d offset is update to:%s", p->vgId, offsetBuf); tscInfo("vgId:%d offset is old to:%"PRId64, p->vgId, p->currentOffset);
pOffsetInfo->walVerBegin = p->begin; pOffsetInfo->walVerBegin = p->begin;
pOffsetInfo->walVerEnd = p->end; pOffsetInfo->walVerEnd = p->end;
pOffsetInfo->currentOffset.version = p->currentOffset; // pOffsetInfo->currentOffset.version = p->currentOffset;
pOffsetInfo->committedOffset.version = p->currentOffset; // pOffsetInfo->committedOffset.version = p->currentOffset;
} }
} }
} }

View File

@ -275,7 +275,7 @@ static void doAddNewConsumers(SMqRebOutputObj *pOutput, const SMqRebInputObj *pI
taosHashPut(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t), &newConsumerEp, sizeof(SMqConsumerEp)); taosHashPut(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t), &newConsumerEp, sizeof(SMqConsumerEp));
taosArrayPush(pOutput->newConsumers, &consumerId); taosArrayPush(pOutput->newConsumers, &consumerId);
mInfo("sub:%s mq rebalance add new consumer:%" PRIx64, pSubKey, consumerId); mInfo("sub:%s mq rebalance add new consumer:0x%" PRIx64, pSubKey, consumerId);
} }
} }

View File

@ -560,48 +560,32 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) {
SMqDataRsp dataRsp = {0}; SMqDataRsp dataRsp = {0};
tqInitDataRsp(&dataRsp, &req); tqInitDataRsp(&dataRsp, &req);
STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, req.subKey); if (req.useSnapshot == true) {
if (pOffset != NULL) { tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s snapshot not support wal info", consumerId, vgId, req.subKey);
if (pOffset->val.type != TMQ_OFFSET__LOG) { terrno = TSDB_CODE_INVALID_PARA;
tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s use snapshot, no valid wal info", consumerId, vgId, req.subKey); tDeleteMqDataRsp(&dataRsp);
terrno = TSDB_CODE_INVALID_PARA; return -1;
tDeleteMqDataRsp(&dataRsp);
return -1;
}
dataRsp.rspOffset.type = TMQ_OFFSET__LOG;
dataRsp.rspOffset.version = pOffset->val.version;
} else {
if (req.useSnapshot == true) {
tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s snapshot not support wal info", consumerId, vgId, req.subKey);
terrno = TSDB_CODE_INVALID_PARA;
tDeleteMqDataRsp(&dataRsp);
return -1;
}
dataRsp.rspOffset.type = TMQ_OFFSET__LOG;
if (reqOffset.type == TMQ_OFFSET__LOG) {
int64_t currentVer = walReaderGetCurrentVer(pHandle->execHandle.pTqReader->pWalReader);
if (currentVer == -1) { // not start to read data from wal yet, return req offset directly
dataRsp.rspOffset.version = reqOffset.version;
} else {
dataRsp.rspOffset.version = currentVer; // return current consume offset value
}
} else if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEST) {
dataRsp.rspOffset.version = sver; // not consume yet, set the earliest position
} else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
dataRsp.rspOffset.version = ever;
} else {
tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s invalid offset type:%d", consumerId, vgId, req.subKey,
reqOffset.type);
terrno = TSDB_CODE_INVALID_PARA;
tDeleteMqDataRsp(&dataRsp);
return -1;
}
} }
dataRsp.rspOffset.type = TMQ_OFFSET__LOG;
if (reqOffset.type == TMQ_OFFSET__LOG) {
dataRsp.rspOffset.version = reqOffset.version;
} else if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEST) {
dataRsp.rspOffset.version = sver; // not consume yet, set the earliest position
} else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
dataRsp.rspOffset.version = ever;
} else {
tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s invalid offset type:%d", consumerId, vgId, req.subKey,
reqOffset.type);
terrno = TSDB_CODE_INVALID_PARA;
tDeleteMqDataRsp(&dataRsp);
return -1;
}
tqInfo("consumer:0x%" PRIx64 " vgId:%d subkey:%s get assignment from init:%"PRId64, consumerId, vgId, req.subKey, dataRsp.rspOffset.version);
tqDoSendDataRsp(&pMsg->info, &dataRsp, req.epoch, req.consumerId, TMQ_MSG_TYPE__WALINFO_RSP, sver, ever); tqDoSendDataRsp(&pMsg->info, &dataRsp, req.epoch, req.consumerId, TMQ_MSG_TYPE__WALINFO_RSP, sver, ever);
tDeleteMqDataRsp(&dataRsp);
return 0; return 0;
} }