Merge pull request #10228 from taosdata/feature/tq
seperate rebalance msg from set conn msg
This commit is contained in:
commit
9b7f20a458
|
@ -44,35 +44,35 @@ int32_t init_env() {
|
||||||
}
|
}
|
||||||
taos_free_result(pRes);
|
taos_free_result(pRes);
|
||||||
|
|
||||||
pRes = taos_query(pConn, "create stable st1 (ts timestamp, k int) tags(a int)");
|
/*pRes = taos_query(pConn, "create stable if not exists st1 (ts timestamp, k int) tags(a int)");*/
|
||||||
/*if (taos_errno(pRes) != 0) {*/
|
/*if (taos_errno(pRes) != 0) {*/
|
||||||
/*printf("failed to create super table 123_$^), reason:%s\n", taos_errstr(pRes));*/
|
/*printf("failed to create super table 123_$^), reason:%s\n", taos_errstr(pRes));*/
|
||||||
/*return -1;*/
|
/*return -1;*/
|
||||||
/*}*/
|
/*}*/
|
||||||
taos_free_result(pRes);
|
/*taos_free_result(pRes);*/
|
||||||
|
|
||||||
pRes = taos_query(pConn, "create table tu using st1 tags(1)");
|
/*pRes = taos_query(pConn, "create table if not exists tu using st1 tags(1)");*/
|
||||||
if (taos_errno(pRes) != 0) {
|
/*if (taos_errno(pRes) != 0) {*/
|
||||||
printf("failed to create child table tu, reason:%s\n", taos_errstr(pRes));
|
/*printf("failed to create child table tu, reason:%s\n", taos_errstr(pRes));*/
|
||||||
return -1;
|
/*return -1;*/
|
||||||
}
|
/*}*/
|
||||||
taos_free_result(pRes);
|
/*taos_free_result(pRes);*/
|
||||||
|
|
||||||
pRes = taos_query(pConn, "create table tu2 using st1 tags(2)");
|
/*pRes = taos_query(pConn, "create table if not exists tu2 using st1 tags(2)");*/
|
||||||
if (taos_errno(pRes) != 0) {
|
/*if (taos_errno(pRes) != 0) {*/
|
||||||
printf("failed to create child table tu2, reason:%s\n", taos_errstr(pRes));
|
/*printf("failed to create child table tu2, reason:%s\n", taos_errstr(pRes));*/
|
||||||
return -1;
|
/*return -1;*/
|
||||||
}
|
/*}*/
|
||||||
taos_free_result(pRes);
|
/*taos_free_result(pRes);*/
|
||||||
|
|
||||||
|
|
||||||
const char* sql = "select * from st1";
|
const char* sql = "select * from st1";
|
||||||
pRes = tmq_create_topic(pConn, "test_stb_topic_1", sql, strlen(sql));
|
pRes = tmq_create_topic(pConn, "test_stb_topic_1", sql, strlen(sql));
|
||||||
/*if (taos_errno(pRes) != 0) {*/
|
if (taos_errno(pRes) != 0) {
|
||||||
/*printf("failed to create topic test_stb_topic_1, reason:%s\n", taos_errstr(pRes));*/
|
printf("failed to create topic test_stb_topic_1, reason:%s\n", taos_errstr(pRes));
|
||||||
/*return -1;*/
|
return -1;
|
||||||
/*}*/
|
}
|
||||||
/*taos_free_result(pRes);*/
|
taos_free_result(pRes);
|
||||||
taos_close(pConn);
|
taos_close(pConn);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -247,7 +247,7 @@ int32_t* taosGetErrno();
|
||||||
#define TSDB_CODE_MND_TRANS_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03D0)
|
#define TSDB_CODE_MND_TRANS_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03D0)
|
||||||
#define TSDB_CODE_MND_TRANS_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03D1)
|
#define TSDB_CODE_MND_TRANS_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03D1)
|
||||||
|
|
||||||
// mnode-topic
|
// mnode-mq
|
||||||
#define TSDB_CODE_MND_TOPIC_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03E0)
|
#define TSDB_CODE_MND_TOPIC_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03E0)
|
||||||
#define TSDB_CODE_MND_TOPIC_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03E1)
|
#define TSDB_CODE_MND_TOPIC_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03E1)
|
||||||
#define TSDB_CODE_MND_TOO_MANY_TOPICS TAOS_DEF_ERROR_CODE(0, 0x03E2)
|
#define TSDB_CODE_MND_TOO_MANY_TOPICS TAOS_DEF_ERROR_CODE(0, 0x03E2)
|
||||||
|
@ -256,7 +256,9 @@ int32_t* taosGetErrno();
|
||||||
#define TSDB_CODE_MND_TOPIC_OPTION_UNCHNAGED TAOS_DEF_ERROR_CODE(0, 0x03E5)
|
#define TSDB_CODE_MND_TOPIC_OPTION_UNCHNAGED TAOS_DEF_ERROR_CODE(0, 0x03E5)
|
||||||
#define TSDB_CODE_MND_NAME_CONFLICT_WITH_STB TAOS_DEF_ERROR_CODE(0, 0x03E6)
|
#define TSDB_CODE_MND_NAME_CONFLICT_WITH_STB TAOS_DEF_ERROR_CODE(0, 0x03E6)
|
||||||
#define TSDB_CODE_MND_CONSUMER_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03E7)
|
#define TSDB_CODE_MND_CONSUMER_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03E7)
|
||||||
#define TSDB_CODE_MND_UNSUPPORTED_TOPIC TAOS_DEF_ERROR_CODE(0, 0x03E7)
|
#define TSDB_CODE_MND_UNSUPPORTED_TOPIC TAOS_DEF_ERROR_CODE(0, 0x03E8)
|
||||||
|
#define TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03E9)
|
||||||
|
#define TSDB_CODE_MND_MQ_PLACEHOLDER TAOS_DEF_ERROR_CODE(0, 0x03F0)
|
||||||
|
|
||||||
// dnode
|
// dnode
|
||||||
#define TSDB_CODE_DND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0400)
|
#define TSDB_CODE_DND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0400)
|
||||||
|
|
|
@ -506,7 +506,7 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||||
SMqConsumeCbParam* pParam = (SMqConsumeCbParam*)param;
|
SMqConsumeCbParam* pParam = (SMqConsumeCbParam*)param;
|
||||||
SMqClientVg* pVg = pParam->pVg;
|
SMqClientVg* pVg = pParam->pVg;
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
/*printf("msg discard\n");*/
|
printf("msg discard\n");
|
||||||
tsem_post(&pParam->rspSem);
|
tsem_post(&pParam->rspSem);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -517,7 +517,7 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
tDecodeSMqConsumeRsp(pMsg->pData, pRsp);
|
tDecodeSMqConsumeRsp(pMsg->pData, pRsp);
|
||||||
/*printf("rsp %ld %ld %d\n", pRsp->committedOffset, pRsp->rspOffset, pRsp->numOfTopics);*/
|
/*printf("rsp commit off:%ld rsp off:%ld has data:%d\n", pRsp->committedOffset, pRsp->rspOffset, pRsp->numOfTopics);*/
|
||||||
if (pRsp->numOfTopics == 0) {
|
if (pRsp->numOfTopics == 0) {
|
||||||
/*printf("no data\n");*/
|
/*printf("no data\n");*/
|
||||||
free(pRsp);
|
free(pRsp);
|
||||||
|
@ -671,16 +671,19 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) {
|
||||||
}
|
}
|
||||||
|
|
||||||
tmq->nextTopicIdx = (tmq->nextTopicIdx + 1) % taosArrayGetSize(tmq->clientTopics);
|
tmq->nextTopicIdx = (tmq->nextTopicIdx + 1) % taosArrayGetSize(tmq->clientTopics);
|
||||||
pTopic->nextVgIdx = (pTopic->nextVgIdx + 1 % taosArrayGetSize(pTopic->vgs));
|
pTopic->nextVgIdx = (pTopic->nextVgIdx + 1) % taosArrayGetSize(pTopic->vgs);
|
||||||
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, pTopic->nextVgIdx);
|
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, pTopic->nextVgIdx);
|
||||||
|
/*printf("consume vg %d, offset %ld\n", pVg->vgId, pVg->currentOffset);*/
|
||||||
SMqConsumeReq* pReq = tmqBuildConsumeReqImpl(tmq, blocking_time, TMQ_REQ_TYPE_CONSUME_ONLY, pTopic, pVg);
|
SMqConsumeReq* pReq = tmqBuildConsumeReqImpl(tmq, blocking_time, TMQ_REQ_TYPE_CONSUME_ONLY, pTopic, pVg);
|
||||||
if (pReq == NULL) {
|
if (pReq == NULL) {
|
||||||
|
ASSERT(false);
|
||||||
usleep(blocking_time * 1000);
|
usleep(blocking_time * 1000);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
SMqConsumeCbParam* param = malloc(sizeof(SMqConsumeCbParam));
|
SMqConsumeCbParam* param = malloc(sizeof(SMqConsumeCbParam));
|
||||||
if (param == NULL) {
|
if (param == NULL) {
|
||||||
|
ASSERT(false);
|
||||||
usleep(blocking_time * 1000);
|
usleep(blocking_time * 1000);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
|
@ -59,6 +59,7 @@ SMqConsumerObj* mndCreateConsumer(int64_t consumerId, const char* cgroup) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
pConsumer->recentRemovedTopics = taosArrayInit(0, sizeof(char*));
|
||||||
pConsumer->epoch = 1;
|
pConsumer->epoch = 1;
|
||||||
pConsumer->consumerId = consumerId;
|
pConsumer->consumerId = consumerId;
|
||||||
atomic_store_32(&pConsumer->status, MQ_CONSUMER_STATUS__INIT);
|
atomic_store_32(&pConsumer->status, MQ_CONSUMER_STATUS__INIT);
|
||||||
|
@ -169,7 +170,7 @@ SMqConsumerObj *mndAcquireConsumer(SMnode *pMnode, int64_t consumerId) {
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
SMqConsumerObj *pConsumer = sdbAcquire(pSdb, SDB_CONSUMER, &consumerId);
|
SMqConsumerObj *pConsumer = sdbAcquire(pSdb, SDB_CONSUMER, &consumerId);
|
||||||
if (pConsumer == NULL) {
|
if (pConsumer == NULL) {
|
||||||
/*terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;*/
|
terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
|
||||||
}
|
}
|
||||||
return pConsumer;
|
return pConsumer;
|
||||||
}
|
}
|
||||||
|
|
|
@ -55,7 +55,7 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg);
|
||||||
static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg);
|
static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg);
|
||||||
|
|
||||||
static int mndPersistMqSetConnReq(SMnode *pMnode, STrans *pTrans, const SMqTopicObj *pTopic, const char *cgroup,
|
static int mndPersistMqSetConnReq(SMnode *pMnode, STrans *pTrans, const SMqTopicObj *pTopic, const char *cgroup,
|
||||||
const SMqConsumerEp *pSub);
|
const SMqConsumerEp *pConsumerEp);
|
||||||
|
|
||||||
static int32_t mndPersistRebalanceMsg(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp);
|
static int32_t mndPersistRebalanceMsg(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp);
|
||||||
|
|
||||||
|
@ -133,6 +133,8 @@ static int32_t mndBuildRebalanceMsg(void **pBuf, int32_t *pLen, const SMqConsume
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndPersistRebalanceMsg(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp) {
|
static int32_t mndPersistRebalanceMsg(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp) {
|
||||||
|
|
||||||
|
ASSERT(pConsumerEp->oldConsumerId != -1);
|
||||||
int32_t vgId = pConsumerEp->vgId;
|
int32_t vgId = pConsumerEp->vgId;
|
||||||
SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
|
SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
|
||||||
|
|
||||||
|
@ -146,7 +148,7 @@ static int32_t mndPersistRebalanceMsg(SMnode *pMnode, STrans *pTrans, const SMqC
|
||||||
action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
|
action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
|
||||||
action.pCont = buf;
|
action.pCont = buf;
|
||||||
action.contLen = sizeof(SMsgHead) + tlen;
|
action.contLen = sizeof(SMsgHead) + tlen;
|
||||||
action.msgType = TDMT_VND_MQ_SET_CONN;
|
action.msgType = TDMT_VND_MQ_REB;
|
||||||
|
|
||||||
mndReleaseVgroup(pMnode, pVgObj);
|
mndReleaseVgroup(pMnode, pVgObj);
|
||||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||||
|
@ -220,7 +222,7 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) {
|
||||||
|
|
||||||
//TODO
|
//TODO
|
||||||
int32_t hbStatus = atomic_load_32(&pConsumer->hbStatus);
|
int32_t hbStatus = atomic_load_32(&pConsumer->hbStatus);
|
||||||
mInfo("try to get sub ep, old val: %d", hbStatus);
|
mTrace("try to get sub ep, old val: %d", hbStatus);
|
||||||
atomic_store_32(&pConsumer->hbStatus, 0);
|
atomic_store_32(&pConsumer->hbStatus, 0);
|
||||||
/*SSdbRaw *pConsumerRaw = mndConsumerActionEncode(pConsumer);*/
|
/*SSdbRaw *pConsumerRaw = mndConsumerActionEncode(pConsumer);*/
|
||||||
/*sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY);*/
|
/*sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY);*/
|
||||||
|
@ -311,7 +313,7 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
|
||||||
while (1) {
|
while (1) {
|
||||||
pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
|
pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
|
||||||
if (pIter == NULL) break;
|
if (pIter == NULL) break;
|
||||||
int32_t hbStatus = atomic_fetch_add_32(&pConsumer->hbStatus, 1);
|
int32_t hbStatus = atomic_add_fetch_32(&pConsumer->hbStatus, 1);
|
||||||
if (hbStatus > MND_SUBSCRIBE_REBALANCE_CNT) {
|
if (hbStatus > MND_SUBSCRIBE_REBALANCE_CNT) {
|
||||||
int32_t old =
|
int32_t old =
|
||||||
atomic_val_compare_exchange_32(&pConsumer->status, MQ_CONSUMER_STATUS__ACTIVE, MQ_CONSUMER_STATUS__LOST);
|
atomic_val_compare_exchange_32(&pConsumer->status, MQ_CONSUMER_STATUS__ACTIVE, MQ_CONSUMER_STATUS__LOST);
|
||||||
|
@ -324,10 +326,6 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
|
||||||
SMqRebSubscribe *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key);
|
SMqRebSubscribe *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key);
|
||||||
taosArrayPush(pRebSub->lostConsumers, &pConsumer->consumerId);
|
taosArrayPush(pRebSub->lostConsumers, &pConsumer->consumerId);
|
||||||
}
|
}
|
||||||
/*pRebMsg->consumerId = pConsumer->consumerId;*/
|
|
||||||
/*SRpcMsg rpcMsg = {.msgType = TDMT_MND_MQ_DO_REBALANCE, .pCont = pRebMsg, .contLen =
|
|
||||||
* sizeof(SMqDoRebalanceMsg)};*/
|
|
||||||
/*pMnode->putReqToMWriteQFp(pMnode->pDnode, &rpcMsg);*/
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
int32_t status = atomic_load_32(&pConsumer->status);
|
int32_t status = atomic_load_32(&pConsumer->status);
|
||||||
|
@ -402,8 +400,6 @@ static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) {
|
||||||
int32_t vgEachConsumer = vgNum / consumerNum;
|
int32_t vgEachConsumer = vgNum / consumerNum;
|
||||||
int32_t imbalanceVg = vgNum % consumerNum;
|
int32_t imbalanceVg = vgNum % consumerNum;
|
||||||
int32_t imbalanceSolved = 0;
|
int32_t imbalanceSolved = 0;
|
||||||
/*SArray *unassignedVgStash = taosArrayInit(0, sizeof(SMqConsumerEp));*/
|
|
||||||
/*SArray *unassignedConsumerIdx = taosArrayInit(0, sizeof(int32_t));*/
|
|
||||||
|
|
||||||
// iterate all consumers, set unassignedVgStash
|
// iterate all consumers, set unassignedVgStash
|
||||||
for (int i = 0; i < consumerNum; i++) {
|
for (int i = 0; i < consumerNum; i++) {
|
||||||
|
@ -413,7 +409,7 @@ static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) {
|
||||||
if (i < imbalanceVg) vgThisConsumerAfterRb = vgEachConsumer + 1;
|
if (i < imbalanceVg) vgThisConsumerAfterRb = vgEachConsumer + 1;
|
||||||
else vgThisConsumerAfterRb = vgEachConsumer;
|
else vgThisConsumerAfterRb = vgEachConsumer;
|
||||||
|
|
||||||
mInfo("mq consumer:%ld, connectted vgroup change from %d %d", pSubConsumer->consumerId, vgThisConsumerBeforeRb, vgThisConsumerAfterRb);
|
mInfo("mq consumer:%ld, connectted vgroup number change from %d to %d", pSubConsumer->consumerId, vgThisConsumerBeforeRb, vgThisConsumerAfterRb);
|
||||||
|
|
||||||
while(taosArrayGetSize(pSubConsumer->vgInfo) > vgThisConsumerAfterRb) {
|
while(taosArrayGetSize(pSubConsumer->vgInfo) > vgThisConsumerAfterRb) {
|
||||||
SMqConsumerEp *pConsumerEp = taosArrayPop(pSubConsumer->vgInfo);
|
SMqConsumerEp *pConsumerEp = taosArrayPop(pSubConsumer->vgInfo);
|
||||||
|
@ -435,7 +431,7 @@ static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) {
|
||||||
atomic_store_32(&pRebConsumer->status, MQ_CONSUMER_STATUS__IDLE);
|
atomic_store_32(&pRebConsumer->status, MQ_CONSUMER_STATUS__IDLE);
|
||||||
}
|
}
|
||||||
|
|
||||||
mInfo("mq consumer:%ld, status change from %d %d", pRebConsumer->consumerId, status, pRebConsumer->status);
|
mInfo("mq consumer:%ld, status change from %d to %d", pRebConsumer->consumerId, status, pRebConsumer->status);
|
||||||
|
|
||||||
SSdbRaw *pConsumerRaw = mndConsumerActionEncode(pRebConsumer);
|
SSdbRaw *pConsumerRaw = mndConsumerActionEncode(pRebConsumer);
|
||||||
sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY);
|
sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY);
|
||||||
|
@ -461,12 +457,26 @@ static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) {
|
||||||
pConsumerEp->consumerId = pSubConsumer->consumerId;
|
pConsumerEp->consumerId = pSubConsumer->consumerId;
|
||||||
taosArrayPush(pSubConsumer->vgInfo, pConsumerEp);
|
taosArrayPush(pSubConsumer->vgInfo, pConsumerEp);
|
||||||
|
|
||||||
mInfo("mq consumer:%ld , assign vgroup %d, previously assigned to consumer %ld", pSubConsumer->consumerId, pConsumerEp->vgId, pConsumerEp->oldConsumerId);
|
if (pConsumerEp->oldConsumerId == -1) {
|
||||||
|
char* topic;
|
||||||
|
char* cgroup;
|
||||||
|
mndSplitSubscribeKey(pSub->key, &topic, &cgroup);
|
||||||
|
SMqTopicObj* pTopic = mndAcquireTopic(pMnode, topic);
|
||||||
|
|
||||||
|
mInfo("mq set conn: assign vgroup %d of topic %s to consumer %ld", pConsumerEp->vgId, topic, pConsumerEp->consumerId);
|
||||||
|
|
||||||
|
mndPersistMqSetConnReq(pMnode, pTrans, pTopic, cgroup, pConsumerEp);
|
||||||
|
mndReleaseTopic(pMnode, pTopic);
|
||||||
|
free(topic);
|
||||||
|
free(cgroup);
|
||||||
|
} else {
|
||||||
|
mInfo("mq rebalance: assign vgroup %d, from consumer %ld to consumer %ld", pConsumerEp->vgId, pConsumerEp->oldConsumerId, pConsumerEp->consumerId);
|
||||||
|
|
||||||
mndPersistRebalanceMsg(pMnode, pTrans, pConsumerEp);
|
mndPersistRebalanceMsg(pMnode, pTrans, pConsumerEp);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
ASSERT(taosArrayGetSize(pSub->unassignedVg) == 0);
|
ASSERT(taosArrayGetSize(pSub->unassignedVg) == 0);
|
||||||
|
|
||||||
// TODO: log rebalance statistics
|
// TODO: log rebalance statistics
|
||||||
|
@ -707,11 +717,6 @@ static int mndInitUnassignedVg(SMnode *pMnode, const SMqTopicObj *pTopic, SMqSub
|
||||||
mError("unsupport topic: %s, sql: %s", pTopic->name, pTopic->sql);
|
mError("unsupport topic: %s, sql: %s", pTopic->name, pTopic->sql);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
/*if (pArray && taosArrayGetSize(pArray) != 1) {*/
|
|
||||||
/*terrno = TSDB_CODE_MND_UNSUPPORTED_TOPIC;*/
|
|
||||||
/*mError("unsupport topic: %s, sql: %s, plan level: %ld", pTopic->name, pTopic->sql, taosArrayGetSize(pArray));*/
|
|
||||||
/*return -1;*/
|
|
||||||
/*}*/
|
|
||||||
|
|
||||||
SMqConsumerEp consumerEp = {0};
|
SMqConsumerEp consumerEp = {0};
|
||||||
consumerEp.status = 0;
|
consumerEp.status = 0;
|
||||||
|
@ -733,12 +738,14 @@ static int mndInitUnassignedVg(SMnode *pMnode, const SMqTopicObj *pTopic, SMqSub
|
||||||
|
|
||||||
static int mndPersistMqSetConnReq(SMnode *pMnode, STrans *pTrans, const SMqTopicObj *pTopic, const char *cgroup,
|
static int mndPersistMqSetConnReq(SMnode *pMnode, STrans *pTrans, const SMqTopicObj *pTopic, const char *cgroup,
|
||||||
const SMqConsumerEp *pConsumerEp) {
|
const SMqConsumerEp *pConsumerEp) {
|
||||||
|
|
||||||
|
ASSERT(pConsumerEp->oldConsumerId == -1);
|
||||||
int32_t vgId = pConsumerEp->vgId;
|
int32_t vgId = pConsumerEp->vgId;
|
||||||
SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
|
SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
|
||||||
|
|
||||||
SMqSetCVgReq req = {
|
SMqSetCVgReq req = {
|
||||||
.vgId = vgId,
|
.vgId = vgId,
|
||||||
.oldConsumerId = pConsumerEp->oldConsumerId,
|
.oldConsumerId = -1,
|
||||||
.newConsumerId = pConsumerEp->consumerId,
|
.newConsumerId = pConsumerEp->consumerId,
|
||||||
.sql = pTopic->sql,
|
.sql = pTopic->sql,
|
||||||
.logicalPlan = pTopic->logicalPlan,
|
.logicalPlan = pTopic->logicalPlan,
|
||||||
|
@ -892,7 +899,7 @@ SMqSubscribeObj *mndAcquireSubscribe(SMnode *pMnode, const char *cgroup, const c
|
||||||
SMqSubscribeObj *pSub = sdbAcquire(pSdb, SDB_SUBSCRIBE, key);
|
SMqSubscribeObj *pSub = sdbAcquire(pSdb, SDB_SUBSCRIBE, key);
|
||||||
free(key);
|
free(key);
|
||||||
if (pSub == NULL) {
|
if (pSub == NULL) {
|
||||||
/*terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;*/
|
terrno = TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST;
|
||||||
}
|
}
|
||||||
return pSub;
|
return pSub;
|
||||||
}
|
}
|
||||||
|
@ -901,7 +908,7 @@ SMqSubscribeObj *mndAcquireSubscribeByKey(SMnode *pMnode, const char *key) {
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
SMqSubscribeObj *pSub = sdbAcquire(pSdb, SDB_SUBSCRIBE, key);
|
SMqSubscribeObj *pSub = sdbAcquire(pSdb, SDB_SUBSCRIBE, key);
|
||||||
if (pSub == NULL) {
|
if (pSub == NULL) {
|
||||||
/*terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;*/
|
terrno = TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST;
|
||||||
}
|
}
|
||||||
return pSub;
|
return pSub;
|
||||||
}
|
}
|
||||||
|
@ -1030,8 +1037,13 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
|
||||||
pConsumerEp->oldConsumerId = pConsumerEp->consumerId;
|
pConsumerEp->oldConsumerId = pConsumerEp->consumerId;
|
||||||
pConsumerEp->consumerId = consumerId;
|
pConsumerEp->consumerId = consumerId;
|
||||||
taosArrayPush(mqSubConsumer.vgInfo, pConsumerEp);
|
taosArrayPush(mqSubConsumer.vgInfo, pConsumerEp);
|
||||||
|
if (pConsumerEp->oldConsumerId == -1) {
|
||||||
mndPersistMqSetConnReq(pMnode, pTrans, pTopic, cgroup, pConsumerEp);
|
mndPersistMqSetConnReq(pMnode, pTrans, pTopic, cgroup, pConsumerEp);
|
||||||
atomic_store_32(&pConsumer->status, MQ_CONSUMER_STATUS__ACTIVE);
|
} else {
|
||||||
|
mndPersistRebalanceMsg(pMnode, pTrans, pConsumerEp);
|
||||||
|
}
|
||||||
|
// do not set status active to trigger rebalance
|
||||||
|
/*atomic_store_32(&pConsumer->status, MQ_CONSUMER_STATUS__ACTIVE);*/
|
||||||
}
|
}
|
||||||
|
|
||||||
SSdbRaw *pRaw = mndSubActionEncode(pSub);
|
SSdbRaw *pRaw = mndSubActionEncode(pSub);
|
||||||
|
|
|
@ -144,6 +144,8 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
|
|
||||||
SMqConsumeRsp rsp = {.consumerId = consumerId, .numOfTopics = 0, .pBlockData = NULL};
|
SMqConsumeRsp rsp = {.consumerId = consumerId, .numOfTopics = 0, .pBlockData = NULL};
|
||||||
|
|
||||||
|
/*printf("vg %d get consume req\n", pReq->head.vgId);*/
|
||||||
|
|
||||||
STqConsumerHandle* pConsumer = tqHandleGet(pTq->tqMeta, consumerId);
|
STqConsumerHandle* pConsumer = tqHandleGet(pTq->tqMeta, consumerId);
|
||||||
if (pConsumer == NULL) {
|
if (pConsumer == NULL) {
|
||||||
pMsg->pCont = NULL;
|
pMsg->pCont = NULL;
|
||||||
|
@ -158,6 +160,7 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
STqTopicHandle* pTopic = taosArrayGet(pConsumer->topics, i);
|
STqTopicHandle* pTopic = taosArrayGet(pConsumer->topics, i);
|
||||||
// TODO: support multiple topic in one req
|
// TODO: support multiple topic in one req
|
||||||
if (strcmp(pTopic->topicName, pReq->topic) != 0) {
|
if (strcmp(pTopic->topicName, pReq->topic) != 0) {
|
||||||
|
ASSERT(false);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -181,6 +184,7 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
if (fetchOffset <= pTopic->committedOffset) {
|
if (fetchOffset <= pTopic->committedOffset) {
|
||||||
fetchOffset = pTopic->committedOffset + 1;
|
fetchOffset = pTopic->committedOffset + 1;
|
||||||
}
|
}
|
||||||
|
/*printf("vg %d fetch Offset %ld\n", pReq->head.vgId, fetchOffset);*/
|
||||||
int8_t pos;
|
int8_t pos;
|
||||||
int8_t skip = 0;
|
int8_t skip = 0;
|
||||||
SWalHead* pHead;
|
SWalHead* pHead;
|
||||||
|
@ -297,20 +301,16 @@ int32_t tqProcessRebReq(STQ* pTq, char* msg) {
|
||||||
int32_t tqProcessSetConnReq(STQ* pTq, char* msg) {
|
int32_t tqProcessSetConnReq(STQ* pTq, char* msg) {
|
||||||
SMqSetCVgReq req = {0};
|
SMqSetCVgReq req = {0};
|
||||||
tDecodeSMqSetCVgReq(msg, &req);
|
tDecodeSMqSetCVgReq(msg, &req);
|
||||||
|
ASSERT(req.oldConsumerId == -1);
|
||||||
|
|
||||||
STqConsumerHandle* pConsumer = tqHandleGet(pTq->tqMeta, req.oldConsumerId);
|
STqConsumerHandle* pConsumer = tqHandleGet(pTq->tqMeta, req.oldConsumerId);
|
||||||
|
/*printf("vg %d set to consumer from %ld to %ld\n", req.vgId, req.oldConsumerId, req.newConsumerId);*/
|
||||||
if (pConsumer == NULL) {
|
if (pConsumer == NULL) {
|
||||||
pConsumer = calloc(sizeof(STqConsumerHandle), 1);
|
pConsumer = calloc(sizeof(STqConsumerHandle), 1);
|
||||||
if (pConsumer == NULL) {
|
if (pConsumer == NULL) {
|
||||||
terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
tqHandleMovePut(pTq->tqMeta, req.newConsumerId, pConsumer);
|
|
||||||
tqHandleCommit(pTq->tqMeta, req.newConsumerId);
|
|
||||||
tqHandlePurge(pTq->tqMeta, req.oldConsumerId);
|
|
||||||
terrno = TSDB_CODE_SUCCESS;
|
|
||||||
return 0;
|
|
||||||
}
|
}
|
||||||
strcpy(pConsumer->cgroup, req.cgroup);
|
strcpy(pConsumer->cgroup, req.cgroup);
|
||||||
pConsumer->topics = taosArrayInit(0, sizeof(STqTopicHandle));
|
pConsumer->topics = taosArrayInit(0, sizeof(STqTopicHandle));
|
||||||
|
|
Loading…
Reference in New Issue