fix:vginfo assigment not in same time,we should get vginfo by vg,not all vg
This commit is contained in:
parent
bdfeb32923
commit
34a62c3f38
|
@ -2565,15 +2565,15 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
|
||||||
}
|
}
|
||||||
|
|
||||||
bool needFetch = false;
|
bool needFetch = false;
|
||||||
|
int32_t index = 0;
|
||||||
for (int32_t j = 0; j < (*numOfAssignment); ++j) {
|
for (int32_t j = 0; j < (*numOfAssignment); ++j) {
|
||||||
SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, j);
|
SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, j);
|
||||||
if (!pClientVg->receivedInfoFromVnode) {
|
if (!pClientVg->receivedInfoFromVnode) {
|
||||||
needFetch = true;
|
needFetch = true;
|
||||||
break;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
tmq_topic_assignment* pAssignment = &(*assignment)[j];
|
tmq_topic_assignment* pAssignment = &(*assignment)[index++];
|
||||||
if (pClientVg->offsetInfo.currentOffset.type == TMQ_OFFSET__LOG) {
|
if (pClientVg->offsetInfo.currentOffset.type == TMQ_OFFSET__LOG) {
|
||||||
pAssignment->currentOffset = pClientVg->offsetInfo.currentOffset.version;
|
pAssignment->currentOffset = pClientVg->offsetInfo.currentOffset.version;
|
||||||
} else {
|
} else {
|
||||||
|
@ -2603,7 +2603,9 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
for (int32_t i = 0; i < (*numOfAssignment); ++i) {
|
for (int32_t i = 0; i < (*numOfAssignment); ++i) {
|
||||||
SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, i);
|
SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, i);
|
||||||
|
if (pClientVg->receivedInfoFromVnode) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
SMqVgWalInfoParam* pParam = taosMemoryMalloc(sizeof(SMqVgWalInfoParam));
|
SMqVgWalInfoParam* pParam = taosMemoryMalloc(sizeof(SMqVgWalInfoParam));
|
||||||
if (pParam == NULL) {
|
if (pParam == NULL) {
|
||||||
destroyCommonInfo(pCommon);
|
destroyCommonInfo(pCommon);
|
||||||
|
@ -2674,11 +2676,11 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
|
||||||
} else {
|
} else {
|
||||||
int32_t num = taosArrayGetSize(pCommon->pList);
|
int32_t num = taosArrayGetSize(pCommon->pList);
|
||||||
for(int32_t i = 0; i < num; ++i) {
|
for(int32_t i = 0; i < num; ++i) {
|
||||||
(*assignment)[i] = *(tmq_topic_assignment*)taosArrayGet(pCommon->pList, i);
|
(*assignment)[index++] = *(tmq_topic_assignment*)taosArrayGet(pCommon->pList, i);
|
||||||
tscInfo("consumer:0x%" PRIx64 " get assignment from server:%d->%" PRId64, tmq->consumerId,
|
tscInfo("consumer:0x%" PRIx64 " get assignment from server:%d->%" PRId64, tmq->consumerId,
|
||||||
(*assignment)[i].vgId, (*assignment)[i].currentOffset);
|
(*assignment)[i].vgId, (*assignment)[i].currentOffset);
|
||||||
}
|
}
|
||||||
*numOfAssignment = num;
|
*numOfAssignment = index;
|
||||||
}
|
}
|
||||||
|
|
||||||
// for (int32_t j = 0; j < (*numOfAssignment); ++j) {
|
// for (int32_t j = 0; j < (*numOfAssignment); ++j) {
|
||||||
|
|
|
@ -275,7 +275,7 @@ static void doAddNewConsumers(SMqRebOutputObj *pOutput, const SMqRebInputObj *pI
|
||||||
|
|
||||||
taosHashPut(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t), &newConsumerEp, sizeof(SMqConsumerEp));
|
taosHashPut(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t), &newConsumerEp, sizeof(SMqConsumerEp));
|
||||||
taosArrayPush(pOutput->newConsumers, &consumerId);
|
taosArrayPush(pOutput->newConsumers, &consumerId);
|
||||||
mInfo("sub:%s mq rebalance add new consumer:%" PRIx64, pSubKey, consumerId);
|
mInfo("sub:%s mq rebalance add new consumer:0x%" PRIx64, pSubKey, consumerId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -571,7 +571,7 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
|
|
||||||
dataRsp.rspOffset.type = TMQ_OFFSET__LOG;
|
dataRsp.rspOffset.type = TMQ_OFFSET__LOG;
|
||||||
dataRsp.rspOffset.version = pOffset->val.version;
|
dataRsp.rspOffset.version = pOffset->val.version;
|
||||||
tqInfo("consumer:0x%" PRIx64 " vgId:%d subkey:%s get assignment from stroe:%"PRId64, consumerId, vgId, req.subKey, dataRsp.rspOffset.version);
|
tqInfo("consumer:0x%" PRIx64 " vgId:%d subkey:%s get assignment from store:%"PRId64, consumerId, vgId, req.subKey, dataRsp.rspOffset.version);
|
||||||
} else {
|
} else {
|
||||||
if (req.useSnapshot == true) {
|
if (req.useSnapshot == true) {
|
||||||
tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s snapshot not support wal info", consumerId, vgId, req.subKey);
|
tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s snapshot not support wal info", consumerId, vgId, req.subKey);
|
||||||
|
@ -593,6 +593,7 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
tDeleteMqDataRsp(&dataRsp);
|
tDeleteMqDataRsp(&dataRsp);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
tqInfo("consumer:0x%" PRIx64 " vgId:%d subkey:%s get assignment from init:%"PRId64, consumerId, vgId, req.subKey, dataRsp.rspOffset.version);
|
||||||
}
|
}
|
||||||
|
|
||||||
tqDoSendDataRsp(&pMsg->info, &dataRsp, req.epoch, req.consumerId, TMQ_MSG_TYPE__WALINFO_RSP, sver, ever);
|
tqDoSendDataRsp(&pMsg->info, &dataRsp, req.epoch, req.consumerId, TMQ_MSG_TYPE__WALINFO_RSP, sver, ever);
|
||||||
|
|
Loading…
Reference in New Issue