From 6f4f6c14fa8eede394218fd768378c81645540f7 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Sat, 11 Jun 2022 14:02:48 +0800 Subject: [PATCH] enh(tmq): add log --- source/dnode/mnode/impl/src/mndConsumer.c | 7 +++++++ source/dnode/mnode/impl/src/mndOffset.c | 5 ++++- source/dnode/vnode/src/tq/tq.c | 12 +++++++++++- tests/system-test/fulltest.sh | 2 +- 4 files changed, 23 insertions(+), 3 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 7aabbf1e4f..4da3c906d7 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -447,6 +447,8 @@ static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { pConsumerOld = mndAcquireConsumer(pMnode, consumerId); if (pConsumerOld == NULL) { + mInfo("receive subscribe request from new consumer: %ld", consumerId); + pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup); tstrncpy(pConsumerNew->clientId, subscribe.clientId, 256); pConsumerNew->updateType = CONSUMER_UPDATE__MODIFY; @@ -463,7 +465,12 @@ static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { } else { /*taosRLockLatch(&pConsumerOld->lock);*/ + int32_t status = atomic_load_32(&pConsumerOld->status); + + mInfo("receive subscribe request from old consumer: %ld, current status: %s", consumerId, + mndConsumerStatusName(status)); + if (status != MQ_CONSUMER_STATUS__READY) { terrno = TSDB_CODE_MND_CONSUMER_NOT_READY; goto SUBSCRIBE_OVER; diff --git a/source/dnode/mnode/impl/src/mndOffset.c b/source/dnode/mnode/impl/src/mndOffset.c index 00c8bb30d0..18f2e993b2 100644 --- a/source/dnode/mnode/impl/src/mndOffset.c +++ b/source/dnode/mnode/impl/src/mndOffset.c @@ -183,7 +183,10 @@ static int32_t mndProcessCommitOffsetReq(SRpcMsg *pMsg) { for (int32_t i = 0; i < commitOffsetReq.num; i++) { SMqOffset *pOffset = &commitOffsetReq.offsets[i]; + mInfo("commit offset %ld to vg %d of consumer group %s on topic %s", pOffset->offset, pOffset->vgId, + pOffset->cgroup, pOffset->topicName); if (mndMakePartitionKey(key, pOffset->cgroup, pOffset->topicName, pOffset->vgId) < 0) { + mError("submit offset to topic %s failed", pOffset->topicName); return -1; } bool create = false; @@ -192,7 +195,7 @@ static int32_t mndProcessCommitOffsetReq(SRpcMsg *pMsg) { SMqTopicObj *pTopic = mndAcquireTopic(pMnode, pOffset->topicName); if (pTopic == NULL) { terrno = TSDB_CODE_MND_TOPIC_NOT_EXIST; - mError("submit offset to topic %s failed since %s", pOffset->topicName, terrstr()); + mError("submit offset to topic %s failed since %s", pOffset->topicName, terrstr()); continue; } pOffsetObj = taosMemoryMalloc(sizeof(SMqOffsetObj)); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 2f4d6c11c6..e810bb9908 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -130,7 +130,17 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { TD_VID(pTq->pVnode), pReq->currentOffset, fetchOffset); STqHandle* pHandle = taosHashGet(pTq->handles, pReq->subKey, strlen(pReq->subKey)); - ASSERT(pHandle); + /*ASSERT(pHandle);*/ + if (pHandle == NULL) { + tqError("tmq poll: no consumer handle for consumer %ld in vg %d, subkey %s", consumerId, pTq->pVnode->config.vgId, + pReq->subKey); + return -1; + } + if (pHandle->consumerId != consumerId) { + tqError("tmq poll: consumer handle mismatch for consumer %ld in vg %d, subkey %s, handle consumer id %ld", + consumerId, pTq->pVnode->config.vgId, pReq->subKey, pHandle->consumerId); + return -1; + } int32_t consumerEpoch = atomic_load_32(&pHandle->epoch); while (consumerEpoch < reqEpoch) { diff --git a/tests/system-test/fulltest.sh b/tests/system-test/fulltest.sh index 4d8a2a66eb..abc8a81248 100755 --- a/tests/system-test/fulltest.sh +++ b/tests/system-test/fulltest.sh @@ -98,7 +98,7 @@ python3 ./test.py -f 2-query/statecount.py python3 ./test.py -f 7-tmq/basic5.py python3 ./test.py -f 7-tmq/subscribeDb.py python3 ./test.py -f 7-tmq/subscribeDb0.py -#python3 ./test.py -f 7-tmq/subscribeDb1.py +python3 ./test.py -f 7-tmq/subscribeDb1.py python3 ./test.py -f 7-tmq/subscribeStb.py python3 ./test.py -f 7-tmq/subscribeStb0.py python3 ./test.py -f 7-tmq/subscribeStb1.py