fix compile
This commit is contained in:
parent
ca7eee7451
commit
6df2fae1a2
|
@ -1149,6 +1149,7 @@ SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t blockingTime, SMqClientTo
|
||||||
pReq->consumerId = tmq->consumerId;
|
pReq->consumerId = tmq->consumerId;
|
||||||
pReq->epoch = tmq->epoch;
|
pReq->epoch = tmq->epoch;
|
||||||
pReq->currentOffset = reqOffset;
|
pReq->currentOffset = reqOffset;
|
||||||
|
pReq->reqId = generateRequestId();
|
||||||
|
|
||||||
pReq->head.vgId = htonl(pVg->vgId);
|
pReq->head.vgId = htonl(pVg->vgId);
|
||||||
pReq->head.contLen = htonl(sizeof(SMqPollReq));
|
pReq->head.contLen = htonl(sizeof(SMqPollReq));
|
||||||
|
@ -1279,7 +1280,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) {
|
||||||
.len = sizeof(SMqPollReq),
|
.len = sizeof(SMqPollReq),
|
||||||
.handle = NULL,
|
.handle = NULL,
|
||||||
};
|
};
|
||||||
sendInfo->requestId = generateRequestId();
|
sendInfo->requestId = pReq->reqId;
|
||||||
sendInfo->requestObjRefId = 0;
|
sendInfo->requestObjRefId = 0;
|
||||||
sendInfo->param = pParam;
|
sendInfo->param = pParam;
|
||||||
sendInfo->fp = tmqPollCb;
|
sendInfo->fp = tmqPollCb;
|
||||||
|
@ -1288,7 +1289,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) {
|
||||||
int64_t transporterId = 0;
|
int64_t transporterId = 0;
|
||||||
/*printf("send poll\n");*/
|
/*printf("send poll\n");*/
|
||||||
atomic_add_fetch_32(&tmq->waitingRequest, 1);
|
atomic_add_fetch_32(&tmq->waitingRequest, 1);
|
||||||
tscDebug("consumer %ld send poll: vg %d, epoch %d, req offset %ld", tmq->consumerId, pVg->vgId, tmq->epoch, pVg->currentOffset);
|
tscDebug("consumer %ld send poll to %s : vg %d, epoch %d, req offset %ld, reqId %lu", tmq->consumerId, pTopic->topicName, pVg->vgId, tmq->epoch, pVg->currentOffset, pReq->reqId);
|
||||||
/*printf("send vg %d %ld\n", pVg->vgId, pVg->currentOffset);*/
|
/*printf("send vg %d %ld\n", pVg->vgId, pVg->currentOffset);*/
|
||||||
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
|
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
|
||||||
pVg->pollCnt++;
|
pVg->pollCnt++;
|
||||||
|
|
|
@ -60,7 +60,7 @@ static int32_t mndProcessResetOffsetReq(SNodeMsg *pMsg);
|
||||||
static int32_t mndPersistMqSetConnReq(SMnode *pMnode, STrans *pTrans, const SMqTopicObj *pTopic, const char *cgroup,
|
static int32_t mndPersistMqSetConnReq(SMnode *pMnode, STrans *pTrans, const SMqTopicObj *pTopic, const char *cgroup,
|
||||||
const SMqConsumerEp *pConsumerEp);
|
const SMqConsumerEp *pConsumerEp);
|
||||||
|
|
||||||
static int32_t mndPersistRebalanceMsg(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp);
|
static int32_t mndPersistRebalanceMsg(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp, const char* topicName);
|
||||||
static int32_t mndPersistCancelConnReq(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp, const char* oldTopicName);
|
static int32_t mndPersistCancelConnReq(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp, const char* oldTopicName);
|
||||||
|
|
||||||
int32_t mndInitSubscribe(SMnode *pMnode) {
|
int32_t mndInitSubscribe(SMnode *pMnode) {
|
||||||
|
@ -102,12 +102,13 @@ static SMqSubscribeObj *mndCreateSubscription(SMnode *pMnode, const SMqTopicObj
|
||||||
return pSub;
|
return pSub;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndBuildRebalanceMsg(void **pBuf, int32_t *pLen, const SMqConsumerEp *pConsumerEp) {
|
static int32_t mndBuildRebalanceMsg(void **pBuf, int32_t *pLen, const SMqConsumerEp *pConsumerEp, const char* topicName) {
|
||||||
SMqMVRebReq req = {
|
SMqMVRebReq req = {
|
||||||
.vgId = pConsumerEp->vgId,
|
.vgId = pConsumerEp->vgId,
|
||||||
.oldConsumerId = pConsumerEp->oldConsumerId,
|
.oldConsumerId = pConsumerEp->oldConsumerId,
|
||||||
.newConsumerId = pConsumerEp->consumerId,
|
.newConsumerId = pConsumerEp->consumerId,
|
||||||
};
|
};
|
||||||
|
req.topic = strdup(topicName);
|
||||||
|
|
||||||
int32_t tlen = tEncodeSMqMVRebReq(NULL, &req);
|
int32_t tlen = tEncodeSMqMVRebReq(NULL, &req);
|
||||||
void *buf = taosMemoryMalloc(sizeof(SMsgHead) + tlen);
|
void *buf = taosMemoryMalloc(sizeof(SMsgHead) + tlen);
|
||||||
|
@ -122,6 +123,7 @@ static int32_t mndBuildRebalanceMsg(void **pBuf, int32_t *pLen, const SMqConsume
|
||||||
|
|
||||||
void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
||||||
tEncodeSMqMVRebReq(&abuf, &req);
|
tEncodeSMqMVRebReq(&abuf, &req);
|
||||||
|
taosMemoryFree(req.topic);
|
||||||
|
|
||||||
*pBuf = buf;
|
*pBuf = buf;
|
||||||
*pLen = tlen;
|
*pLen = tlen;
|
||||||
|
@ -129,12 +131,12 @@ static int32_t mndBuildRebalanceMsg(void **pBuf, int32_t *pLen, const SMqConsume
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndPersistRebalanceMsg(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp) {
|
static int32_t mndPersistRebalanceMsg(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp, const char* topicName) {
|
||||||
ASSERT(pConsumerEp->oldConsumerId != -1);
|
ASSERT(pConsumerEp->oldConsumerId != -1);
|
||||||
|
|
||||||
void *buf;
|
void *buf;
|
||||||
int32_t tlen;
|
int32_t tlen;
|
||||||
if (mndBuildRebalanceMsg(&buf, &tlen, pConsumerEp) < 0) {
|
if (mndBuildRebalanceMsg(&buf, &tlen, pConsumerEp, topicName) < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -502,10 +504,10 @@ static int32_t mndProcessDoRebalanceMsg(SNodeMsg *pMsg) {
|
||||||
pConsumerEp->epoch = 0;
|
pConsumerEp->epoch = 0;
|
||||||
taosArrayPush(pSubConsumer->vgInfo, pConsumerEp);
|
taosArrayPush(pSubConsumer->vgInfo, pConsumerEp);
|
||||||
|
|
||||||
|
char topic[TSDB_TOPIC_FNAME_LEN];
|
||||||
|
char cgroup[TSDB_CGROUP_LEN];
|
||||||
|
mndSplitSubscribeKey(pSub->key, topic, cgroup);
|
||||||
if (pConsumerEp->oldConsumerId == -1) {
|
if (pConsumerEp->oldConsumerId == -1) {
|
||||||
char topic[TSDB_TOPIC_FNAME_LEN];
|
|
||||||
char cgroup[TSDB_CGROUP_LEN];
|
|
||||||
mndSplitSubscribeKey(pSub->key, topic, cgroup);
|
|
||||||
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);
|
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);
|
||||||
|
|
||||||
mInfo("mq set conn: assign vgroup %d of topic %s to consumer %" PRId64 " cgroup: %s", pConsumerEp->vgId,
|
mInfo("mq set conn: assign vgroup %d of topic %s to consumer %" PRId64 " cgroup: %s", pConsumerEp->vgId,
|
||||||
|
@ -517,7 +519,7 @@ static int32_t mndProcessDoRebalanceMsg(SNodeMsg *pMsg) {
|
||||||
mInfo("mq rebalance: assign vgroup %d, from consumer %" PRId64 " to consumer %" PRId64 "",
|
mInfo("mq rebalance: assign vgroup %d, from consumer %" PRId64 " to consumer %" PRId64 "",
|
||||||
pConsumerEp->vgId, pConsumerEp->oldConsumerId, pConsumerEp->consumerId);
|
pConsumerEp->vgId, pConsumerEp->oldConsumerId, pConsumerEp->consumerId);
|
||||||
|
|
||||||
mndPersistRebalanceMsg(pMnode, pTrans, pConsumerEp);
|
mndPersistRebalanceMsg(pMnode, pTrans, pConsumerEp, topic);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -849,7 +851,7 @@ static int32_t mndProcessSubscribeReq(SNodeMsg *pMsg) {
|
||||||
pConsumerEp->consumerId);
|
pConsumerEp->consumerId);
|
||||||
mndPersistMqSetConnReq(pMnode, pTrans, pTopic, cgroup, pConsumerEp);
|
mndPersistMqSetConnReq(pMnode, pTrans, pTopic, cgroup, pConsumerEp);
|
||||||
} else {
|
} else {
|
||||||
mndPersistRebalanceMsg(pMnode, pTrans, pConsumerEp);
|
mndPersistRebalanceMsg(pMnode, pTrans, pConsumerEp, newTopicName);
|
||||||
}
|
}
|
||||||
// to trigger rebalance at once, do not set status active
|
// to trigger rebalance at once, do not set status active
|
||||||
/*atomic_store_32(&pConsumer->status, MQ_CONSUMER_STATUS__ACTIVE);*/
|
/*atomic_store_32(&pConsumer->status, MQ_CONSUMER_STATUS__ACTIVE);*/
|
||||||
|
|
|
@ -265,7 +265,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
||||||
fetchOffset = pReq->currentOffset + 1;
|
fetchOffset = pReq->currentOffset + 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
vDebug("tmq poll: consumer %ld (epoch %d) recv poll req in vg %d, req %ld %ld", consumerId, pReq->epoch, pTq->pVnode->vgId, pReq->currentOffset, fetchOffset);
|
vDebug("tmq poll: consumer %ld (epoch %d) recv poll req in vg %d, offset %ld %ld, reqId %lu", consumerId, pReq->epoch, pTq->pVnode->vgId, pReq->currentOffset, fetchOffset, pReq->reqId);
|
||||||
|
|
||||||
SMqPollRsp rsp = {
|
SMqPollRsp rsp = {
|
||||||
/*.consumerId = consumerId,*/
|
/*.consumerId = consumerId,*/
|
||||||
|
@ -308,7 +308,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
vDebug("poll topic %s from consumer %ld (epoch %d)", pTopic->topicName, consumerId, pReq->epoch);
|
vDebug("poll topic %s from consumer %ld (epoch %d) %s", pTopic->topicName, consumerId, pReq->epoch, pTopic->topicName);
|
||||||
|
|
||||||
rsp.reqOffset = pReq->currentOffset;
|
rsp.reqOffset = pReq->currentOffset;
|
||||||
rsp.skipLogNum = 0;
|
rsp.skipLogNum = 0;
|
||||||
|
|
|
@ -943,8 +943,8 @@ void transSendRequest(void* shandle, const char* ip, uint32_t port, STransMsg* p
|
||||||
|
|
||||||
SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[index];
|
SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[index];
|
||||||
|
|
||||||
tDebug("send request at thread:%d %p, dst: %s:%d", index, pMsg, ip, port);
|
tDebug("send request at thread:%d %p, dst: %s:%d, app:%p", index, pMsg, ip, port, pMsg->ahandle);
|
||||||
transSendAsync(thrd->asyncPool, &(cliMsg->q));
|
ASSERT(transSendAsync(thrd->asyncPool, &(cliMsg->q)) == 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
void transSendRecv(void* shandle, const char* ip, uint32_t port, STransMsg* pReq, STransMsg* pRsp) {
|
void transSendRecv(void* shandle, const char* ip, uint32_t port, STransMsg* pReq, STransMsg* pRsp) {
|
||||||
|
|
Loading…
Reference in New Issue