From 6df2fae1a231b9d5d7960f5f34be0eb1bb9da758 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Fri, 8 Apr 2022 15:26:02 +0800 Subject: [PATCH] fix compile --- source/client/src/tmq.c | 5 +++-- source/dnode/mnode/impl/src/mndSubscribe.c | 20 +++++++++++--------- source/dnode/vnode/src/tq/tq.c | 4 ++-- source/libs/transport/src/transCli.c | 4 ++-- 4 files changed, 18 insertions(+), 15 deletions(-) diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index a6e5fee2d1..5893a14bd5 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -1149,6 +1149,7 @@ SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t blockingTime, SMqClientTo pReq->consumerId = tmq->consumerId; pReq->epoch = tmq->epoch; pReq->currentOffset = reqOffset; + pReq->reqId = generateRequestId(); pReq->head.vgId = htonl(pVg->vgId); pReq->head.contLen = htonl(sizeof(SMqPollReq)); @@ -1279,7 +1280,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) { .len = sizeof(SMqPollReq), .handle = NULL, }; - sendInfo->requestId = generateRequestId(); + sendInfo->requestId = pReq->reqId; sendInfo->requestObjRefId = 0; sendInfo->param = pParam; sendInfo->fp = tmqPollCb; @@ -1288,7 +1289,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) { int64_t transporterId = 0; /*printf("send poll\n");*/ atomic_add_fetch_32(&tmq->waitingRequest, 1); - tscDebug("consumer %ld send poll: vg %d, epoch %d, req offset %ld", tmq->consumerId, pVg->vgId, tmq->epoch, pVg->currentOffset); + tscDebug("consumer %ld send poll to %s : vg %d, epoch %d, req offset %ld, reqId %lu", tmq->consumerId, pTopic->topicName, pVg->vgId, tmq->epoch, pVg->currentOffset, pReq->reqId); /*printf("send vg %d %ld\n", pVg->vgId, pVg->currentOffset);*/ asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo); pVg->pollCnt++; diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 8078338238..dbd5e43b6d 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -60,7 +60,7 @@ static int32_t mndProcessResetOffsetReq(SNodeMsg *pMsg); static int32_t mndPersistMqSetConnReq(SMnode *pMnode, STrans *pTrans, const SMqTopicObj *pTopic, const char *cgroup, const SMqConsumerEp *pConsumerEp); -static int32_t mndPersistRebalanceMsg(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp); +static int32_t mndPersistRebalanceMsg(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp, const char* topicName); static int32_t mndPersistCancelConnReq(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp, const char* oldTopicName); int32_t mndInitSubscribe(SMnode *pMnode) { @@ -102,12 +102,13 @@ static SMqSubscribeObj *mndCreateSubscription(SMnode *pMnode, const SMqTopicObj return pSub; } -static int32_t mndBuildRebalanceMsg(void **pBuf, int32_t *pLen, const SMqConsumerEp *pConsumerEp) { +static int32_t mndBuildRebalanceMsg(void **pBuf, int32_t *pLen, const SMqConsumerEp *pConsumerEp, const char* topicName) { SMqMVRebReq req = { .vgId = pConsumerEp->vgId, .oldConsumerId = pConsumerEp->oldConsumerId, .newConsumerId = pConsumerEp->consumerId, }; + req.topic = strdup(topicName); int32_t tlen = tEncodeSMqMVRebReq(NULL, &req); void *buf = taosMemoryMalloc(sizeof(SMsgHead) + tlen); @@ -122,6 +123,7 @@ static int32_t mndBuildRebalanceMsg(void **pBuf, int32_t *pLen, const SMqConsume void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); tEncodeSMqMVRebReq(&abuf, &req); + taosMemoryFree(req.topic); *pBuf = buf; *pLen = tlen; @@ -129,12 +131,12 @@ static int32_t mndBuildRebalanceMsg(void **pBuf, int32_t *pLen, const SMqConsume return 0; } -static int32_t mndPersistRebalanceMsg(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp) { +static int32_t mndPersistRebalanceMsg(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp, const char* topicName) { ASSERT(pConsumerEp->oldConsumerId != -1); void *buf; int32_t tlen; - if (mndBuildRebalanceMsg(&buf, &tlen, pConsumerEp) < 0) { + if (mndBuildRebalanceMsg(&buf, &tlen, pConsumerEp, topicName) < 0) { return -1; } @@ -502,10 +504,10 @@ static int32_t mndProcessDoRebalanceMsg(SNodeMsg *pMsg) { pConsumerEp->epoch = 0; taosArrayPush(pSubConsumer->vgInfo, pConsumerEp); + char topic[TSDB_TOPIC_FNAME_LEN]; + char cgroup[TSDB_CGROUP_LEN]; + mndSplitSubscribeKey(pSub->key, topic, cgroup); if (pConsumerEp->oldConsumerId == -1) { - char topic[TSDB_TOPIC_FNAME_LEN]; - char cgroup[TSDB_CGROUP_LEN]; - mndSplitSubscribeKey(pSub->key, topic, cgroup); SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic); mInfo("mq set conn: assign vgroup %d of topic %s to consumer %" PRId64 " cgroup: %s", pConsumerEp->vgId, @@ -517,7 +519,7 @@ static int32_t mndProcessDoRebalanceMsg(SNodeMsg *pMsg) { mInfo("mq rebalance: assign vgroup %d, from consumer %" PRId64 " to consumer %" PRId64 "", pConsumerEp->vgId, pConsumerEp->oldConsumerId, pConsumerEp->consumerId); - mndPersistRebalanceMsg(pMnode, pTrans, pConsumerEp); + mndPersistRebalanceMsg(pMnode, pTrans, pConsumerEp, topic); } } } @@ -849,7 +851,7 @@ static int32_t mndProcessSubscribeReq(SNodeMsg *pMsg) { pConsumerEp->consumerId); mndPersistMqSetConnReq(pMnode, pTrans, pTopic, cgroup, pConsumerEp); } else { - mndPersistRebalanceMsg(pMnode, pTrans, pConsumerEp); + mndPersistRebalanceMsg(pMnode, pTrans, pConsumerEp, newTopicName); } // to trigger rebalance at once, do not set status active /*atomic_store_32(&pConsumer->status, MQ_CONSUMER_STATUS__ACTIVE);*/ diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index fbb515f29c..71a41c13d6 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -265,7 +265,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { fetchOffset = pReq->currentOffset + 1; } - vDebug("tmq poll: consumer %ld (epoch %d) recv poll req in vg %d, req %ld %ld", consumerId, pReq->epoch, pTq->pVnode->vgId, pReq->currentOffset, fetchOffset); + vDebug("tmq poll: consumer %ld (epoch %d) recv poll req in vg %d, offset %ld %ld, reqId %lu", consumerId, pReq->epoch, pTq->pVnode->vgId, pReq->currentOffset, fetchOffset, pReq->reqId); SMqPollRsp rsp = { /*.consumerId = consumerId,*/ @@ -308,7 +308,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { return 0; } - vDebug("poll topic %s from consumer %ld (epoch %d)", pTopic->topicName, consumerId, pReq->epoch); + vDebug("poll topic %s from consumer %ld (epoch %d) %s", pTopic->topicName, consumerId, pReq->epoch, pTopic->topicName); rsp.reqOffset = pReq->currentOffset; rsp.skipLogNum = 0; diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index d9c288a39b..9bd86a2e22 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -943,8 +943,8 @@ void transSendRequest(void* shandle, const char* ip, uint32_t port, STransMsg* p SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[index]; - tDebug("send request at thread:%d %p, dst: %s:%d", index, pMsg, ip, port); - transSendAsync(thrd->asyncPool, &(cliMsg->q)); + tDebug("send request at thread:%d %p, dst: %s:%d, app:%p", index, pMsg, ip, port, pMsg->ahandle); + ASSERT(transSendAsync(thrd->asyncPool, &(cliMsg->q)) == 0); } void transSendRecv(void* shandle, const char* ip, uint32_t port, STransMsg* pReq, STransMsg* pRsp) {