refactor: update some logs.
This commit is contained in:
parent
f4c8444b8f
commit
3645c52b25
|
@ -1799,8 +1799,7 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
|
||||||
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
|
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
|
||||||
|
|
||||||
if (pollRspWrapper->dataRsp.blockNum == 0) {
|
if (pollRspWrapper->dataRsp.blockNum == 0) {
|
||||||
tscDebug("consumer:0x%" PRIx64 " empty block received in poll rsp", tmq->consumerId);
|
tscDebug("consumer:0x%" PRIx64 " empty block received, vgId:%d", tmq->consumerId, pVg->vgId);
|
||||||
|
|
||||||
taosFreeQitem(pollRspWrapper);
|
taosFreeQitem(pollRspWrapper);
|
||||||
rspWrapper = NULL;
|
rspWrapper = NULL;
|
||||||
continue;
|
continue;
|
||||||
|
@ -1913,11 +1912,11 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
|
||||||
if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__RECOVER) {
|
if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__RECOVER) {
|
||||||
int32_t retryCnt = 0;
|
int32_t retryCnt = 0;
|
||||||
while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) {
|
while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) {
|
||||||
if (retryCnt++ > 10) {
|
if (retryCnt++ > 40) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
tscDebug("consumer:0x%" PRIx64 " not ready, retry:%d/10 in 500ms", tmq->consumerId, retryCnt);
|
tscDebug("consumer:0x%" PRIx64 " not ready, retry:%d/40 in 500ms", tmq->consumerId, retryCnt);
|
||||||
taosMsleep(500);
|
taosMsleep(500);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -621,7 +621,8 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
|
||||||
|
|
||||||
pExistedConsumer = mndAcquireConsumer(pMnode, consumerId);
|
pExistedConsumer = mndAcquireConsumer(pMnode, consumerId);
|
||||||
if (pExistedConsumer == NULL) {
|
if (pExistedConsumer == NULL) {
|
||||||
mInfo("receive subscribe request from new consumer:0x%" PRIx64" cgroup:%s", consumerId, subscribe.cgroup);
|
mInfo("receive subscribe request from new consumer:0x%" PRIx64" cgroup:%s, numOfTopics:%d", consumerId,
|
||||||
|
subscribe.cgroup, (int32_t) taosArrayGetSize(pTopicList));
|
||||||
|
|
||||||
pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup);
|
pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup);
|
||||||
tstrncpy(pConsumerNew->clientId, subscribe.clientId, 256);
|
tstrncpy(pConsumerNew->clientId, subscribe.clientId, 256);
|
||||||
|
|
Loading…
Reference in New Issue