refactor: add some logs for tmq.
This commit is contained in:
parent
85ee1318d2
commit
26925a1c13
|
@ -706,7 +706,7 @@ static int32_t tmqCommitConsumerImpl(tmq_t* tmq, int8_t automatic, int8_t async,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
tscDebug("consumer:0x%" PRIx64 " total commit:%d for %d topics", tmq->consumerId, pParamSet->waitingRspNum,
|
tscDebug("consumer:0x%" PRIx64 " total commit:%d for %d topics", tmq->consumerId, pParamSet->waitingRspNum - 1,
|
||||||
numOfTopics);
|
numOfTopics);
|
||||||
taosThreadMutexUnlock(&tmq->lock);
|
taosThreadMutexUnlock(&tmq->lock);
|
||||||
|
|
||||||
|
@ -2177,10 +2177,12 @@ int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet) {
|
||||||
|
|
||||||
void tmqCommitRspCountDown(SMqCommitCbParamSet* pParamSet, int64_t consumerId, const char* pTopic, int32_t vgId) {
|
void tmqCommitRspCountDown(SMqCommitCbParamSet* pParamSet, int64_t consumerId, const char* pTopic, int32_t vgId) {
|
||||||
int32_t waitingRspNum = atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1);
|
int32_t waitingRspNum = atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1);
|
||||||
|
if (waitingRspNum == 0) {
|
||||||
|
tscDebug("consumer:0x%" PRIx64 " topic:%s vgId:%d all commit-rsp received, commit completed", consumerId, pTopic,
|
||||||
|
vgId);
|
||||||
|
tmqCommitDone(pParamSet);
|
||||||
|
} else {
|
||||||
tscDebug("consumer:0x%" PRIx64 " topic:%s vgId:%d commit-rsp received, remain:%d", consumerId, pTopic, vgId,
|
tscDebug("consumer:0x%" PRIx64 " topic:%s vgId:%d commit-rsp received, remain:%d", consumerId, pTopic, vgId,
|
||||||
waitingRspNum);
|
waitingRspNum);
|
||||||
|
|
||||||
if (waitingRspNum == 0) {
|
|
||||||
tmqCommitDone(pParamSet);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue