enh(tmq): add log
This commit is contained in:
parent
d53b9529eb
commit
6f4f6c14fa
|
@ -447,6 +447,8 @@ static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
|
||||||
|
|
||||||
pConsumerOld = mndAcquireConsumer(pMnode, consumerId);
|
pConsumerOld = mndAcquireConsumer(pMnode, consumerId);
|
||||||
if (pConsumerOld == NULL) {
|
if (pConsumerOld == NULL) {
|
||||||
|
mInfo("receive subscribe request from new consumer: %ld", consumerId);
|
||||||
|
|
||||||
pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup);
|
pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup);
|
||||||
tstrncpy(pConsumerNew->clientId, subscribe.clientId, 256);
|
tstrncpy(pConsumerNew->clientId, subscribe.clientId, 256);
|
||||||
pConsumerNew->updateType = CONSUMER_UPDATE__MODIFY;
|
pConsumerNew->updateType = CONSUMER_UPDATE__MODIFY;
|
||||||
|
@ -463,7 +465,12 @@ static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
/*taosRLockLatch(&pConsumerOld->lock);*/
|
/*taosRLockLatch(&pConsumerOld->lock);*/
|
||||||
|
|
||||||
int32_t status = atomic_load_32(&pConsumerOld->status);
|
int32_t status = atomic_load_32(&pConsumerOld->status);
|
||||||
|
|
||||||
|
mInfo("receive subscribe request from old consumer: %ld, current status: %s", consumerId,
|
||||||
|
mndConsumerStatusName(status));
|
||||||
|
|
||||||
if (status != MQ_CONSUMER_STATUS__READY) {
|
if (status != MQ_CONSUMER_STATUS__READY) {
|
||||||
terrno = TSDB_CODE_MND_CONSUMER_NOT_READY;
|
terrno = TSDB_CODE_MND_CONSUMER_NOT_READY;
|
||||||
goto SUBSCRIBE_OVER;
|
goto SUBSCRIBE_OVER;
|
||||||
|
|
|
@ -183,7 +183,10 @@ static int32_t mndProcessCommitOffsetReq(SRpcMsg *pMsg) {
|
||||||
|
|
||||||
for (int32_t i = 0; i < commitOffsetReq.num; i++) {
|
for (int32_t i = 0; i < commitOffsetReq.num; i++) {
|
||||||
SMqOffset *pOffset = &commitOffsetReq.offsets[i];
|
SMqOffset *pOffset = &commitOffsetReq.offsets[i];
|
||||||
|
mInfo("commit offset %ld to vg %d of consumer group %s on topic %s", pOffset->offset, pOffset->vgId,
|
||||||
|
pOffset->cgroup, pOffset->topicName);
|
||||||
if (mndMakePartitionKey(key, pOffset->cgroup, pOffset->topicName, pOffset->vgId) < 0) {
|
if (mndMakePartitionKey(key, pOffset->cgroup, pOffset->topicName, pOffset->vgId) < 0) {
|
||||||
|
mError("submit offset to topic %s failed", pOffset->topicName);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
bool create = false;
|
bool create = false;
|
||||||
|
|
|
@ -130,7 +130,17 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
||||||
TD_VID(pTq->pVnode), pReq->currentOffset, fetchOffset);
|
TD_VID(pTq->pVnode), pReq->currentOffset, fetchOffset);
|
||||||
|
|
||||||
STqHandle* pHandle = taosHashGet(pTq->handles, pReq->subKey, strlen(pReq->subKey));
|
STqHandle* pHandle = taosHashGet(pTq->handles, pReq->subKey, strlen(pReq->subKey));
|
||||||
ASSERT(pHandle);
|
/*ASSERT(pHandle);*/
|
||||||
|
if (pHandle == NULL) {
|
||||||
|
tqError("tmq poll: no consumer handle for consumer %ld in vg %d, subkey %s", consumerId, pTq->pVnode->config.vgId,
|
||||||
|
pReq->subKey);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
if (pHandle->consumerId != consumerId) {
|
||||||
|
tqError("tmq poll: consumer handle mismatch for consumer %ld in vg %d, subkey %s, handle consumer id %ld",
|
||||||
|
consumerId, pTq->pVnode->config.vgId, pReq->subKey, pHandle->consumerId);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t consumerEpoch = atomic_load_32(&pHandle->epoch);
|
int32_t consumerEpoch = atomic_load_32(&pHandle->epoch);
|
||||||
while (consumerEpoch < reqEpoch) {
|
while (consumerEpoch < reqEpoch) {
|
||||||
|
|
|
@ -98,7 +98,7 @@ python3 ./test.py -f 2-query/statecount.py
|
||||||
python3 ./test.py -f 7-tmq/basic5.py
|
python3 ./test.py -f 7-tmq/basic5.py
|
||||||
python3 ./test.py -f 7-tmq/subscribeDb.py
|
python3 ./test.py -f 7-tmq/subscribeDb.py
|
||||||
python3 ./test.py -f 7-tmq/subscribeDb0.py
|
python3 ./test.py -f 7-tmq/subscribeDb0.py
|
||||||
#python3 ./test.py -f 7-tmq/subscribeDb1.py
|
python3 ./test.py -f 7-tmq/subscribeDb1.py
|
||||||
python3 ./test.py -f 7-tmq/subscribeStb.py
|
python3 ./test.py -f 7-tmq/subscribeStb.py
|
||||||
python3 ./test.py -f 7-tmq/subscribeStb0.py
|
python3 ./test.py -f 7-tmq/subscribeStb0.py
|
||||||
python3 ./test.py -f 7-tmq/subscribeStb1.py
|
python3 ./test.py -f 7-tmq/subscribeStb1.py
|
||||||
|
|
Loading…
Reference in New Issue