diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index accaa8fa1d..62cb5d2bb2 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -2565,15 +2565,15 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a } bool needFetch = false; - + int32_t index = 0; for (int32_t j = 0; j < (*numOfAssignment); ++j) { SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, j); if (!pClientVg->receivedInfoFromVnode) { needFetch = true; - break; + continue; } - tmq_topic_assignment* pAssignment = &(*assignment)[j]; + tmq_topic_assignment* pAssignment = &(*assignment)[index++]; if (pClientVg->offsetInfo.currentOffset.type == TMQ_OFFSET__LOG) { pAssignment->currentOffset = pClientVg->offsetInfo.currentOffset.version; } else { @@ -2603,7 +2603,9 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a terrno = TSDB_CODE_OUT_OF_MEMORY; for (int32_t i = 0; i < (*numOfAssignment); ++i) { SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, i); - + if (pClientVg->receivedInfoFromVnode) { + continue; + } SMqVgWalInfoParam* pParam = taosMemoryMalloc(sizeof(SMqVgWalInfoParam)); if (pParam == NULL) { destroyCommonInfo(pCommon); @@ -2674,11 +2676,11 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a } else { int32_t num = taosArrayGetSize(pCommon->pList); for(int32_t i = 0; i < num; ++i) { - (*assignment)[i] = *(tmq_topic_assignment*)taosArrayGet(pCommon->pList, i); + (*assignment)[index++] = *(tmq_topic_assignment*)taosArrayGet(pCommon->pList, i); tscInfo("consumer:0x%" PRIx64 " get assignment from server:%d->%" PRId64, tmq->consumerId, (*assignment)[i].vgId, (*assignment)[i].currentOffset); } - *numOfAssignment = num; + *numOfAssignment = index; } // for (int32_t j = 0; j < (*numOfAssignment); ++j) { diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 7ecd994b5a..48de21199b 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -275,7 +275,7 @@ static void doAddNewConsumers(SMqRebOutputObj *pOutput, const SMqRebInputObj *pI taosHashPut(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t), &newConsumerEp, sizeof(SMqConsumerEp)); taosArrayPush(pOutput->newConsumers, &consumerId); - mInfo("sub:%s mq rebalance add new consumer:%" PRIx64, pSubKey, consumerId); + mInfo("sub:%s mq rebalance add new consumer:0x%" PRIx64, pSubKey, consumerId); } } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 0989b980ea..baba735d6c 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -571,7 +571,7 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) { dataRsp.rspOffset.type = TMQ_OFFSET__LOG; dataRsp.rspOffset.version = pOffset->val.version; - tqInfo("consumer:0x%" PRIx64 " vgId:%d subkey:%s get assignment from stroe:%"PRId64, consumerId, vgId, req.subKey, dataRsp.rspOffset.version); + tqInfo("consumer:0x%" PRIx64 " vgId:%d subkey:%s get assignment from store:%"PRId64, consumerId, vgId, req.subKey, dataRsp.rspOffset.version); } else { if (req.useSnapshot == true) { tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s snapshot not support wal info", consumerId, vgId, req.subKey); @@ -593,6 +593,7 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) { tDeleteMqDataRsp(&dataRsp); return -1; } + tqInfo("consumer:0x%" PRIx64 " vgId:%d subkey:%s get assignment from init:%"PRId64, consumerId, vgId, req.subKey, dataRsp.rspOffset.version); } tqDoSendDataRsp(&pMsg->info, &dataRsp, req.epoch, req.consumerId, TMQ_MSG_TYPE__WALINFO_RSP, sver, ever);