From 798c3a9f825239103f572b5ca0e2bbe07a59b75f Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Sat, 2 Apr 2022 23:03:12 +0800 Subject: [PATCH] add more log --- source/client/src/tmq.c | 12 ++++++----- source/dnode/mnode/impl/src/mndConsumer.c | 4 +--- source/dnode/mnode/impl/src/mndScheduler.c | 3 +++ source/dnode/mnode/impl/src/mndSubscribe.c | 24 ++++++++++++++++------ 4 files changed, 29 insertions(+), 14 deletions(-) diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 6c0bdd4bab..602ecdf6ab 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -917,10 +917,10 @@ CREATE_MSG_FAIL: bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) { /*printf("call update ep %d\n", epoch);*/ - tscDebug("consumer %ld update ep epoch %d to epoch %d", tmq->consumerId, tmq->epoch, epoch); bool set = false; int32_t topicNumGet = taosArrayGetSize(pRsp->topics); char vgKey[TSDB_TOPIC_FNAME_LEN + 22]; + tscDebug("consumer %ld update ep epoch %d to epoch %d, topic num: %d", tmq->consumerId, tmq->epoch, epoch, topicNumGet); SArray* newTopics = taosArrayInit(topicNumGet, sizeof(SMqClientTopic)); if (newTopics == NULL) { return false; @@ -938,17 +938,19 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) { taosHashClear(pHash); topic.topicName = strdup(pTopicEp->topic); + tscDebug("consumer %ld update topic: %s", tmq->consumerId, topic.topicName); int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics); for (int32_t j = 0; j < topicNumCur; j++) { // find old topic SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, j); if (pTopicCur->vgs && strcmp(pTopicCur->topicName, pTopicEp->topic) == 0) { int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs); + tscDebug("consumer %ld new vg num: %d", tmq->consumerId, vgNumCur); if (vgNumCur == 0) break; for (int32_t k = 0; k < vgNumCur; k++) { SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, k); sprintf(vgKey, "%s:%d", topic.topicName, pVgCur->vgId); - tscDebug("consumer %ld epoch %d vg %d build %s\n", tmq->consumerId, epoch, pVgCur->vgId, vgKey); + tscDebug("consumer %ld epoch %d vg %d build %s", tmq->consumerId, epoch, pVgCur->vgId, vgKey); taosHashPut(pHash, vgKey, strlen(vgKey), &pVgCur->currentOffset, sizeof(int64_t)); } break; @@ -962,10 +964,10 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) { sprintf(vgKey, "%s:%d", topic.topicName, pVgEp->vgId); int64_t* pOffset = taosHashGet(pHash, vgKey, strlen(vgKey)); int64_t offset = pVgEp->offset; - tscDebug("consumer %ld epoch %d vg %d offset og to %ld\n", tmq->consumerId, epoch, pVgEp->vgId, offset); + tscDebug("consumer %ld epoch %d vg %d offset og to %ld", tmq->consumerId, epoch, pVgEp->vgId, offset); if (pOffset != NULL) { offset = *pOffset; - tscDebug("consumer %ld epoch %d vg %d found %s\n", tmq->consumerId, epoch, pVgEp->vgId, vgKey); + tscDebug("consumer %ld epoch %d vg %d found %s", tmq->consumerId, epoch, pVgEp->vgId, vgKey); } tscDebug("consumer %ld epoch %d vg %d offset set to %ld\n", tmq->consumerId, epoch, pVgEp->vgId, offset); SMqClientVg clientVg = { @@ -1231,7 +1233,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) { int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT); if (vgStatus != TMQ_VG_STATUS__IDLE) { int64_t skipCnt = atomic_add_fetch_64(&pVg->skipCnt, 1); - tscDebug("consumer %ld skip vg %d skip cnt %ld", tmq->consumerId, pVg->vgId, skipCnt); + tscDebug("consumer %ld epoch %d skip vg %d skip cnt %ld", tmq->consumerId, tmq->epoch, pVg->vgId, skipCnt); continue; #if 0 if (skipCnt < 30000) { diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index c4c93b7fc9..6080ec7710 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -160,10 +160,8 @@ static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer) { static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, SMqConsumerObj *pNewConsumer) { mTrace("consumer:%" PRId64 ", perform update action", pOldConsumer->consumerId); - pOldConsumer->epoch++; - - // TODO handle update /*taosWLockLatch(&pOldConsumer->lock);*/ + atomic_add_fetch_32(&pOldConsumer->epoch, 1); /*taosWUnLockLatch(&pOldConsumer->lock);*/ return 0; diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 4562d9e5d3..df7946a0c1 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -471,6 +471,9 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib consumerEp.consumerId = -1; consumerEp.epSet = plan->execNode.epSet; consumerEp.vgId = plan->execNode.nodeId; + + mDebug("init subscribption %s, assign vg: %d", pSub->key, consumerEp.vgId); + int32_t msgLen; if (qSubPlanToString(plan, &consumerEp.qmsg, &msgLen) < 0) { sdbRelease(pSdb, pVgroup); diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index ae9a4198a4..57d98396ea 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -212,19 +212,24 @@ static int32_t mndProcessGetSubEpReq(SNodeMsg *pMsg) { terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST; return -1; } + //TODO add lock ASSERT(strcmp(pReq->cgroup, pConsumer->cgroup) == 0); + int32_t serverEpoch = pConsumer->epoch; // TODO int32_t hbStatus = atomic_load_32(&pConsumer->hbStatus); - mTrace("consumer %ld epoch(%d) try to get sub ep, server epoch %d, old val: %d", consumerId, epoch, pConsumer->epoch, hbStatus); + mDebug("consumer %ld epoch(%d) try to get sub ep, server epoch %d, old val: %d", consumerId, epoch, serverEpoch, hbStatus); atomic_store_32(&pConsumer->hbStatus, 0); /*SSdbRaw *pConsumerRaw = mndConsumerActionEncode(pConsumer);*/ /*sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY);*/ /*sdbWrite(pMnode->pSdb, pConsumerRaw);*/ strcpy(rsp.cgroup, pReq->cgroup); - if (epoch != pConsumer->epoch) { - mInfo("send new assignment to consumer, consumer epoch %d, server epoch %d", epoch, pConsumer->epoch); + if (epoch != serverEpoch) { + mInfo("send new assignment to consumer %ld, consumer epoch %d, server epoch %d", pConsumer->consumerId, epoch, serverEpoch); + mDebug("consumer %ld try r lock", consumerId); + taosRLockLatch(&pConsumer->lock); + mDebug("consumer %ld r locked", consumerId); SArray *pTopics = pConsumer->currentTopics; int32_t sz = taosArrayGetSize(pTopics); rsp.topics = taosArrayInit(sz, sizeof(SMqSubTopicEp)); @@ -238,7 +243,7 @@ static int32_t mndProcessGetSubEpReq(SNodeMsg *pMsg) { SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, j); if (consumerId == pSubConsumer->consumerId) { int32_t vgsz = taosArrayGetSize(pSubConsumer->vgInfo); - mInfo("topic %s has %d vg", topicName, pConsumer->epoch); + mInfo("topic %s has %d vg", topicName, serverEpoch); SMqSubTopicEp topicEp; strcpy(topicEp.topic, topicName); topicEp.vgs = taosArrayInit(vgsz, sizeof(SMqSubVgEp)); @@ -264,6 +269,8 @@ static int32_t mndProcessGetSubEpReq(SNodeMsg *pMsg) { } mndReleaseSubscribe(pMnode, pSub); } + taosRUnLockLatch(&pConsumer->lock); + mDebug("consumer %ld r unlock", consumerId); } int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqCMGetSubEpRsp(NULL, &rsp); void *buf = rpcMallocCont(tlen); @@ -272,7 +279,7 @@ static int32_t mndProcessGetSubEpReq(SNodeMsg *pMsg) { return -1; } ((SMqRspHead *)buf)->mqMsgType = TMQ_MSG_TYPE__EP_RSP; - ((SMqRspHead *)buf)->epoch = pConsumer->epoch; + ((SMqRspHead *)buf)->epoch = serverEpoch; ((SMqRspHead *)buf)->consumerId = pConsumer->consumerId; void *abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead)); @@ -395,7 +402,7 @@ static int32_t mndProcessDoRebalanceMsg(SNodeMsg *pMsg) { SMqSubscribeObj *pSub = mndAcquireSubscribeByKey(pMnode, pRebSub->key); taosMemoryFreeClear(pRebSub->key); - mInfo("mq rebalance subscription: %s", pSub->key); + mInfo("mq rebalance subscription: %s, vgNum: %d, unassignedVg: %d", pSub->key, pSub->vgNum, (int32_t)taosArrayGetSize(pSub->unassignedVg)); // remove lost consumer for (int32_t i = 0; i < taosArrayGetSize(pRebSub->lostConsumers); i++) { @@ -442,6 +449,9 @@ static int32_t mndProcessDoRebalanceMsg(SNodeMsg *pMsg) { } SMqConsumerObj *pRebConsumer = mndAcquireConsumer(pMnode, pSubConsumer->consumerId); + mDebug("consumer %ld try w lock", pRebConsumer->consumerId); + taosWLockLatch(&pRebConsumer->lock); + mDebug("consumer %ld w locked", pRebConsumer->consumerId); int32_t status = atomic_load_32(&pRebConsumer->status); if (vgThisConsumerAfterRb != vgThisConsumerBeforeRb || (vgThisConsumerAfterRb != 0 && status != MQ_CONSUMER_STATUS__ACTIVE) || @@ -462,6 +472,8 @@ static int32_t mndProcessDoRebalanceMsg(SNodeMsg *pMsg) { sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY); mndTransAppendCommitlog(pTrans, pConsumerRaw); } + taosWUnLockLatch(&pRebConsumer->lock); + mDebug("consumer %ld w unlock", pRebConsumer->consumerId); mndReleaseConsumer(pMnode, pRebConsumer); }