Merge pull request #25466 from taosdata/enh/TD-29758
enh:[TD-29758]optimize log in tmq
This commit is contained in:
commit
ab2c9aaf5b
|
@ -660,13 +660,13 @@ static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* us
|
||||||
|
|
||||||
taosRLockLatch(&tmq->lock);
|
taosRLockLatch(&tmq->lock);
|
||||||
int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
|
int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
|
||||||
tscInfo("consumer:0x%" PRIx64 " start to commit offset for %d topics", tmq->consumerId, numOfTopics);
|
tscDebug("consumer:0x%" PRIx64 " start to commit offset for %d topics", tmq->consumerId, numOfTopics);
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfTopics; i++) {
|
for (int32_t i = 0; i < numOfTopics; i++) {
|
||||||
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
|
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
|
||||||
int32_t numOfVgroups = taosArrayGetSize(pTopic->vgs);
|
int32_t numOfVgroups = taosArrayGetSize(pTopic->vgs);
|
||||||
|
|
||||||
tscInfo("consumer:0x%" PRIx64 " commit offset for topics:%s, numOfVgs:%d", tmq->consumerId, pTopic->topicName,
|
tscDebug("consumer:0x%" PRIx64 " commit offset for topics:%s, numOfVgs:%d", tmq->consumerId, pTopic->topicName,
|
||||||
numOfVgroups);
|
numOfVgroups);
|
||||||
for (int32_t j = 0; j < numOfVgroups; j++) {
|
for (int32_t j = 0; j < numOfVgroups; j++) {
|
||||||
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
|
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
|
||||||
|
@ -688,19 +688,19 @@ static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* us
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
tscInfo("consumer:0x%" PRIx64
|
tscDebug("consumer:0x%" PRIx64
|
||||||
" topic:%s on vgId:%d send commit msg success, send offset:%s committed:%s, ordinal:%d/%d",
|
" topic:%s on vgId:%d send commit msg success, send offset:%s committed:%s, ordinal:%d/%d",
|
||||||
tmq->consumerId, pTopic->topicName, pVg->vgId, offsetBuf, commitBuf, j + 1, numOfVgroups);
|
tmq->consumerId, pTopic->topicName, pVg->vgId, offsetBuf, commitBuf, j + 1, numOfVgroups);
|
||||||
tOffsetCopy(&pVg->offsetInfo.committedOffset, &pVg->offsetInfo.endOffset);
|
tOffsetCopy(&pVg->offsetInfo.committedOffset, &pVg->offsetInfo.endOffset);
|
||||||
} else {
|
} else {
|
||||||
tscInfo("consumer:0x%" PRIx64 " topic:%s vgId:%d, no commit, current:%" PRId64 ", ordinal:%d/%d",
|
tscDebug("consumer:0x%" PRIx64 " topic:%s vgId:%d, no commit, current:%" PRId64 ", ordinal:%d/%d",
|
||||||
tmq->consumerId, pTopic->topicName, pVg->vgId, pVg->offsetInfo.endOffset.version, j + 1, numOfVgroups);
|
tmq->consumerId, pTopic->topicName, pVg->vgId, pVg->offsetInfo.endOffset.version, j + 1, numOfVgroups);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
taosRUnLockLatch(&tmq->lock);
|
taosRUnLockLatch(&tmq->lock);
|
||||||
|
|
||||||
tscInfo("consumer:0x%" PRIx64 " total commit:%d for %d topics", tmq->consumerId, pParamSet->waitingRspNum - 1,
|
tscDebug("consumer:0x%" PRIx64 " total commit:%d for %d topics", tmq->consumerId, pParamSet->waitingRspNum - 1,
|
||||||
numOfTopics);
|
numOfTopics);
|
||||||
|
|
||||||
// request is sent
|
// request is sent
|
||||||
|
@ -815,7 +815,7 @@ void tmqSendHbReq(void* param, void* tmrId) {
|
||||||
offRows->ever = pVg->offsetInfo.walVerEnd;
|
offRows->ever = pVg->offsetInfo.walVerEnd;
|
||||||
char buf[TSDB_OFFSET_LEN] = {0};
|
char buf[TSDB_OFFSET_LEN] = {0};
|
||||||
tFormatOffset(buf, TSDB_OFFSET_LEN, &offRows->offset);
|
tFormatOffset(buf, TSDB_OFFSET_LEN, &offRows->offset);
|
||||||
tscInfo("consumer:0x%" PRIx64 ",report offset, group:%s vgId:%d, offset:%s/%" PRId64 ", rows:%" PRId64,
|
tscDebug("consumer:0x%" PRIx64 ",report offset, group:%s vgId:%d, offset:%s/%" PRId64 ", rows:%" PRId64,
|
||||||
tmq->consumerId, tmq->groupId, offRows->vgId, buf, offRows->ever, offRows->rows);
|
tmq->consumerId, tmq->groupId, offRows->vgId, buf, offRows->ever, offRows->rows);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1058,6 +1058,7 @@ static void tmqMgmtInit(void) {
|
||||||
|
|
||||||
#define SET_ERROR_MSG_TMQ(MSG) \
|
#define SET_ERROR_MSG_TMQ(MSG) \
|
||||||
if (errstr != NULL) snprintf(errstr, errstrLen, MSG);
|
if (errstr != NULL) snprintf(errstr, errstrLen, MSG);
|
||||||
|
|
||||||
tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
|
tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
|
||||||
if (conf == NULL) {
|
if (conf == NULL) {
|
||||||
SET_ERROR_MSG_TMQ("configure is null")
|
SET_ERROR_MSG_TMQ("configure is null")
|
||||||
|
@ -1504,7 +1505,7 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp)
|
||||||
|
|
||||||
int32_t topicNumGet = taosArrayGetSize(pRsp->topics);
|
int32_t topicNumGet = taosArrayGetSize(pRsp->topics);
|
||||||
if (epoch < tmq->epoch || (epoch == tmq->epoch && topicNumGet == 0)) {
|
if (epoch < tmq->epoch || (epoch == tmq->epoch && topicNumGet == 0)) {
|
||||||
tscInfo("consumer:0x%" PRIx64 " no update ep epoch from %d to epoch %d, incoming topics:%d", tmq->consumerId,
|
tscDebug("consumer:0x%" PRIx64 " no update ep epoch from %d to epoch %d, incoming topics:%d", tmq->consumerId,
|
||||||
tmq->epoch, epoch, topicNumGet);
|
tmq->epoch, epoch, topicNumGet);
|
||||||
if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__RECOVER) {
|
if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__RECOVER) {
|
||||||
atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__READY);
|
atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__READY);
|
||||||
|
@ -1800,14 +1801,14 @@ static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
|
||||||
for (int j = 0; j < numOfVg; j++) {
|
for (int j = 0; j < numOfVg; j++) {
|
||||||
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
|
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
|
||||||
if (taosGetTimestampMs() - pVg->emptyBlockReceiveTs < EMPTY_BLOCK_POLL_IDLE_DURATION) { // less than 10ms
|
if (taosGetTimestampMs() - pVg->emptyBlockReceiveTs < EMPTY_BLOCK_POLL_IDLE_DURATION) { // less than 10ms
|
||||||
tscTrace("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for 10ms before start next poll", tmq->consumerId,
|
tscDebug("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for 10ms before start next poll", tmq->consumerId,
|
||||||
tmq->epoch, pVg->vgId);
|
tmq->epoch, pVg->vgId);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tmq->replayEnable &&
|
if (tmq->replayEnable &&
|
||||||
taosGetTimestampMs() - pVg->blockReceiveTs < pVg->blockSleepForReplay) { // less than 10ms
|
taosGetTimestampMs() - pVg->blockReceiveTs < pVg->blockSleepForReplay) { // less than 10ms
|
||||||
tscTrace("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for %" PRId64 "ms before start next poll when replay",
|
tscDebug("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for %" PRId64 "ms before start next poll when replay",
|
||||||
tmq->consumerId, tmq->epoch, pVg->vgId, pVg->blockSleepForReplay);
|
tmq->consumerId, tmq->epoch, pVg->vgId, pVg->blockSleepForReplay);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -1815,7 +1816,7 @@ static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
|
||||||
int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
|
int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
|
||||||
if (vgStatus == TMQ_VG_STATUS__WAIT) {
|
if (vgStatus == TMQ_VG_STATUS__WAIT) {
|
||||||
int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1);
|
int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1);
|
||||||
tscTrace("consumer:0x%" PRIx64 " epoch %d wait poll-rsp, skip vgId:%d skip cnt %d", tmq->consumerId, tmq->epoch,
|
tscDebug("consumer:0x%" PRIx64 " epoch %d wait poll-rsp, skip vgId:%d skip cnt %d", tmq->consumerId, tmq->epoch,
|
||||||
pVg->vgId, vgSkipCnt);
|
pVg->vgId, vgSkipCnt);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -1875,7 +1876,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) {
|
||||||
SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper;
|
SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper;
|
||||||
if (pRspWrapper->code == TSDB_CODE_TMQ_CONSUMER_MISMATCH) {
|
if (pRspWrapper->code == TSDB_CODE_TMQ_CONSUMER_MISMATCH) {
|
||||||
atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__RECOVER);
|
atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__RECOVER);
|
||||||
tscDebug("consumer:0x%" PRIx64 " wait for the re-balance, set status to be RECOVER", tmq->consumerId);
|
tscDebug("consumer:0x%" PRIx64 " wait for the rebalance, set status to be RECOVER", tmq->consumerId);
|
||||||
} else if (pRspWrapper->code == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
|
} else if (pRspWrapper->code == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
|
||||||
terrno = pRspWrapper->code;
|
terrno = pRspWrapper->code;
|
||||||
tscError("consumer:0x%" PRIx64 " unexpected rsp from poll, code:%s", tmq->consumerId,
|
tscError("consumer:0x%" PRIx64 " unexpected rsp from poll, code:%s", tmq->consumerId,
|
||||||
|
@ -2476,7 +2477,7 @@ int32_t askEpCb(void* param, SDataBuf* pMsg, int32_t code) {
|
||||||
|
|
||||||
SMqRspHead* head = pMsg->pData;
|
SMqRspHead* head = pMsg->pData;
|
||||||
int32_t epoch = atomic_load_32(&tmq->epoch);
|
int32_t epoch = atomic_load_32(&tmq->epoch);
|
||||||
tscInfo("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d", tmq->consumerId, head->epoch, epoch);
|
tscDebug("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d", tmq->consumerId, head->epoch, epoch);
|
||||||
if (pParam->sync) {
|
if (pParam->sync) {
|
||||||
SMqAskEpRsp rsp = {0};
|
SMqAskEpRsp rsp = {0};
|
||||||
tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp);
|
tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp);
|
||||||
|
@ -2581,7 +2582,7 @@ void askEp(tmq_t* pTmq, void* param, bool sync, bool updateEpSet) {
|
||||||
sendInfo->msgType = TDMT_MND_TMQ_ASK_EP;
|
sendInfo->msgType = TDMT_MND_TMQ_ASK_EP;
|
||||||
|
|
||||||
SEpSet epSet = getEpSet_s(&pTmq->pTscObj->pAppInfo->mgmtEp);
|
SEpSet epSet = getEpSet_s(&pTmq->pTscObj->pAppInfo->mgmtEp);
|
||||||
tscInfo("consumer:0x%" PRIx64 " ask ep from mnode, reqId:0x%" PRIx64, pTmq->consumerId, sendInfo->requestId);
|
tscDebug("consumer:0x%" PRIx64 " ask ep from mnode, reqId:0x%" PRIx64, pTmq->consumerId, sendInfo->requestId);
|
||||||
|
|
||||||
int64_t transporterId = 0;
|
int64_t transporterId = 0;
|
||||||
code = asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
|
code = asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
|
||||||
|
|
|
@ -438,10 +438,10 @@ static void processSubOffsetRows(SMnode *pMnode, const SMqRebInputObj *pInput, S
|
||||||
}
|
}
|
||||||
|
|
||||||
static void printRebalanceLog(SMqRebOutputObj *pOutput){
|
static void printRebalanceLog(SMqRebOutputObj *pOutput){
|
||||||
mInfo("sub:%s mq re-balance calculation completed, re-balanced vg", pOutput->pSub->key);
|
mInfo("sub:%s mq rebalance calculation completed, re-balanced vg", pOutput->pSub->key);
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(pOutput->rebVgs); i++) {
|
for (int32_t i = 0; i < taosArrayGetSize(pOutput->rebVgs); i++) {
|
||||||
SMqRebOutputVg *pOutputRebVg = taosArrayGet(pOutput->rebVgs, i);
|
SMqRebOutputVg *pOutputRebVg = taosArrayGet(pOutput->rebVgs, i);
|
||||||
mInfo("sub:%s mq re-balance vgId:%d, moved from consumer:0x%" PRIx64 ", to consumer:0x%" PRIx64, pOutput->pSub->key,
|
mInfo("sub:%s mq rebalance vgId:%d, moved from consumer:0x%" PRIx64 ", to consumer:0x%" PRIx64, pOutput->pSub->key,
|
||||||
pOutputRebVg->pVgEp->vgId, pOutputRebVg->oldConsumerId, pOutputRebVg->newConsumerId);
|
pOutputRebVg->pVgEp->vgId, pOutputRebVg->oldConsumerId, pOutputRebVg->newConsumerId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -451,10 +451,10 @@ static void printRebalanceLog(SMqRebOutputObj *pOutput){
|
||||||
if (pIter == NULL) break;
|
if (pIter == NULL) break;
|
||||||
SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
|
SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
|
||||||
int32_t sz = taosArrayGetSize(pConsumerEp->vgs);
|
int32_t sz = taosArrayGetSize(pConsumerEp->vgs);
|
||||||
mInfo("sub:%s mq re-balance final cfg: consumer:0x%" PRIx64 " has %d vg", pOutput->pSub->key, pConsumerEp->consumerId, sz);
|
mInfo("sub:%s mq rebalance final cfg: consumer:0x%" PRIx64 " has %d vg", pOutput->pSub->key, pConsumerEp->consumerId, sz);
|
||||||
for (int32_t i = 0; i < sz; i++) {
|
for (int32_t i = 0; i < sz; i++) {
|
||||||
SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, i);
|
SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, i);
|
||||||
mInfo("sub:%s mq re-balance final cfg: vg %d to consumer:0x%" PRIx64, pOutput->pSub->key, pVgEp->vgId,
|
mInfo("sub:%s mq rebalance final cfg: vg %d to consumer:0x%" PRIx64, pOutput->pSub->key, pVgEp->vgId,
|
||||||
pConsumerEp->consumerId);
|
pConsumerEp->consumerId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -762,18 +762,18 @@ static void mndCheckConsumer(SRpcMsg *pMsg, SHashObj* rebSubHash) {
|
||||||
|
|
||||||
bool mndRebTryStart() {
|
bool mndRebTryStart() {
|
||||||
int32_t old = atomic_val_compare_exchange_32(&mqRebInExecCnt, 0, 1);
|
int32_t old = atomic_val_compare_exchange_32(&mqRebInExecCnt, 0, 1);
|
||||||
mInfo("rebalance counter old val:%d", old);
|
if (old > 0) mInfo("[rebalance] counter old val:%d", old)
|
||||||
return old == 0;
|
return old == 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void mndRebCntInc() {
|
void mndRebCntInc() {
|
||||||
int32_t val = atomic_add_fetch_32(&mqRebInExecCnt, 1);
|
int32_t val = atomic_add_fetch_32(&mqRebInExecCnt, 1);
|
||||||
mInfo("rebalance cnt inc, value:%d", val);
|
if (val > 0) mInfo("[rebalance] cnt inc, value:%d", val)
|
||||||
}
|
}
|
||||||
|
|
||||||
void mndRebCntDec() {
|
void mndRebCntDec() {
|
||||||
int32_t val = atomic_sub_fetch_32(&mqRebInExecCnt, 1);
|
int32_t val = atomic_sub_fetch_32(&mqRebInExecCnt, 1);
|
||||||
mInfo("rebalance cnt sub, value:%d", val);
|
if (val > 0) mInfo("[rebalance] cnt sub, value:%d", val)
|
||||||
}
|
}
|
||||||
|
|
||||||
static void clearRebOutput(SMqRebOutputObj *rebOutput){
|
static void clearRebOutput(SMqRebOutputObj *rebOutput){
|
||||||
|
@ -848,10 +848,10 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
|
||||||
int code = 0;
|
int code = 0;
|
||||||
void *pIter = NULL;
|
void *pIter = NULL;
|
||||||
SMnode *pMnode = pMsg->info.node;
|
SMnode *pMnode = pMsg->info.node;
|
||||||
mInfo("[rebalance] start to process mq timer");
|
mDebug("[rebalance] start to process mq timer")
|
||||||
|
|
||||||
if (!mndRebTryStart()) {
|
if (!mndRebTryStart()) {
|
||||||
mInfo("[rebalance] mq rebalance already in progress, do nothing");
|
mInfo("[rebalance] mq rebalance already in progress, do nothing")
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -863,7 +863,9 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
|
||||||
taosHashSetFreeFp(rebSubHash, freeRebalanceItem);
|
taosHashSetFreeFp(rebSubHash, freeRebalanceItem);
|
||||||
|
|
||||||
mndCheckConsumer(pMsg, rebSubHash);
|
mndCheckConsumer(pMsg, rebSubHash);
|
||||||
mInfo("[rebalance] mq re-balance start, total required re-balanced trans:%d", taosHashGetSize(rebSubHash));
|
if (taosHashGetSize(rebSubHash) > 0) {
|
||||||
|
mInfo("[rebalance] mq rebalance start, total required re-balanced trans:%d", taosHashGetSize(rebSubHash))
|
||||||
|
}
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
pIter = taosHashIterate(rebSubHash, pIter);
|
pIter = taosHashIterate(rebSubHash, pIter);
|
||||||
|
@ -887,13 +889,15 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
|
||||||
mndDoRebalance(pMnode, &rebInput, &rebOutput);
|
mndDoRebalance(pMnode, &rebInput, &rebOutput);
|
||||||
|
|
||||||
if (mndPersistRebResult(pMnode, pMsg, &rebOutput) != 0) {
|
if (mndPersistRebResult(pMnode, pMsg, &rebOutput) != 0) {
|
||||||
mError("mq re-balance persist output error, possibly vnode splitted or dropped,msg:%s", terrstr());
|
mError("mq rebalance persist output error, possibly vnode splitted or dropped,msg:%s", terrstr())
|
||||||
}
|
}
|
||||||
|
|
||||||
clearRebOutput(&rebOutput);
|
clearRebOutput(&rebOutput);
|
||||||
}
|
}
|
||||||
|
|
||||||
mInfo("[rebalance] mq re-balance completed successfully, wait trans finish");
|
if (taosHashGetSize(rebSubHash) > 0) {
|
||||||
|
mInfo("[rebalance] mq rebalance completed successfully, wait trans finish")
|
||||||
|
}
|
||||||
|
|
||||||
END:
|
END:
|
||||||
taosHashCancelIterate(rebSubHash, pIter);
|
taosHashCancelIterate(rebSubHash, pIter);
|
||||||
|
|
|
@ -367,7 +367,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
} while (0);
|
} while (0);
|
||||||
}
|
}
|
||||||
|
|
||||||
// 2. check re-balance status
|
// 2. check rebalance status
|
||||||
if (pHandle->consumerId != consumerId) {
|
if (pHandle->consumerId != consumerId) {
|
||||||
tqError("ERROR tmq poll: consumer:0x%" PRIx64
|
tqError("ERROR tmq poll: consumer:0x%" PRIx64
|
||||||
" vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
|
" vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
|
||||||
|
@ -485,7 +485,7 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
// 2. check re-balance status
|
// 2. check rebalance status
|
||||||
if (pHandle->consumerId != consumerId) {
|
if (pHandle->consumerId != consumerId) {
|
||||||
tqDebug("ERROR consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
|
tqDebug("ERROR consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
|
||||||
consumerId, vgId, req.subKey, pHandle->consumerId);
|
consumerId, vgId, req.subKey, pHandle->consumerId);
|
||||||
|
@ -666,7 +666,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
||||||
req.vgId, req.subKey, req.newConsumerId, req.oldConsumerId);
|
req.vgId, req.subKey, req.newConsumerId, req.oldConsumerId);
|
||||||
}
|
}
|
||||||
if (req.newConsumerId == -1) {
|
if (req.newConsumerId == -1) {
|
||||||
tqError("vgId:%d, tq invalid re-balance request, new consumerId %" PRId64 "", req.vgId, req.newConsumerId);
|
tqError("vgId:%d, tq invalid rebalance request, new consumerId %" PRId64 "", req.vgId, req.newConsumerId);
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
STqHandle handle = {0};
|
STqHandle handle = {0};
|
||||||
|
|
Loading…
Reference in New Issue