From f51295a23c0b10796269913ac97fc70ee1a892d1 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Sat, 12 Feb 2022 11:28:22 +0800 Subject: [PATCH] seperate rebalance msg from set conn msg --- example/src/tmq.c | 38 +++++++------- include/util/taoserror.h | 6 ++- source/client/src/tmq.c | 9 ++-- source/dnode/mnode/impl/src/mndConsumer.c | 3 +- source/dnode/mnode/impl/src/mndSubscribe.c | 60 +++++++++++++--------- source/dnode/vnode/src/tq/tq.c | 12 ++--- 6 files changed, 73 insertions(+), 55 deletions(-) diff --git a/example/src/tmq.c b/example/src/tmq.c index c36675fa7e..99e0c443dd 100644 --- a/example/src/tmq.c +++ b/example/src/tmq.c @@ -44,35 +44,35 @@ int32_t init_env() { } taos_free_result(pRes); - pRes = taos_query(pConn, "create stable st1 (ts timestamp, k int) tags(a int)"); + /*pRes = taos_query(pConn, "create stable if not exists st1 (ts timestamp, k int) tags(a int)");*/ /*if (taos_errno(pRes) != 0) {*/ /*printf("failed to create super table 123_$^), reason:%s\n", taos_errstr(pRes));*/ /*return -1;*/ /*}*/ - taos_free_result(pRes); + /*taos_free_result(pRes);*/ - pRes = taos_query(pConn, "create table tu using st1 tags(1)"); - if (taos_errno(pRes) != 0) { - printf("failed to create child table tu, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); + /*pRes = taos_query(pConn, "create table if not exists tu using st1 tags(1)");*/ + /*if (taos_errno(pRes) != 0) {*/ + /*printf("failed to create child table tu, reason:%s\n", taos_errstr(pRes));*/ + /*return -1;*/ + /*}*/ + /*taos_free_result(pRes);*/ - pRes = taos_query(pConn, "create table tu2 using st1 tags(2)"); - if (taos_errno(pRes) != 0) { - printf("failed to create child table tu2, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); + /*pRes = taos_query(pConn, "create table if not exists tu2 using st1 tags(2)");*/ + /*if (taos_errno(pRes) != 0) {*/ + /*printf("failed to create child table tu2, reason:%s\n", taos_errstr(pRes));*/ + /*return -1;*/ + /*}*/ + /*taos_free_result(pRes);*/ const char* sql = "select * from st1"; pRes = tmq_create_topic(pConn, "test_stb_topic_1", sql, strlen(sql)); - /*if (taos_errno(pRes) != 0) {*/ - /*printf("failed to create topic test_stb_topic_1, reason:%s\n", taos_errstr(pRes));*/ - /*return -1;*/ - /*}*/ - /*taos_free_result(pRes);*/ + if (taos_errno(pRes) != 0) { + printf("failed to create topic test_stb_topic_1, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); taos_close(pConn); return 0; } diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 88d6f77f72..64e3a2433e 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -246,7 +246,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_MND_TRANS_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03D0) #define TSDB_CODE_MND_TRANS_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03D1) -// mnode-topic +// mnode-mq #define TSDB_CODE_MND_TOPIC_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03E0) #define TSDB_CODE_MND_TOPIC_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03E1) #define TSDB_CODE_MND_TOO_MANY_TOPICS TAOS_DEF_ERROR_CODE(0, 0x03E2) @@ -255,7 +255,9 @@ int32_t* taosGetErrno(); #define TSDB_CODE_MND_TOPIC_OPTION_UNCHNAGED TAOS_DEF_ERROR_CODE(0, 0x03E5) #define TSDB_CODE_MND_NAME_CONFLICT_WITH_STB TAOS_DEF_ERROR_CODE(0, 0x03E6) #define TSDB_CODE_MND_CONSUMER_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03E7) -#define TSDB_CODE_MND_UNSUPPORTED_TOPIC TAOS_DEF_ERROR_CODE(0, 0x03E7) +#define TSDB_CODE_MND_UNSUPPORTED_TOPIC TAOS_DEF_ERROR_CODE(0, 0x03E8) +#define TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03E9) +#define TSDB_CODE_MND_MQ_PLACEHOLDER TAOS_DEF_ERROR_CODE(0, 0x03F0) // dnode #define TSDB_CODE_DND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0400) diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index c0f0cf4dfe..9ff099fae0 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -506,7 +506,7 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { SMqConsumeCbParam* pParam = (SMqConsumeCbParam*)param; SMqClientVg* pVg = pParam->pVg; if (code != 0) { - /*printf("msg discard\n");*/ + printf("msg discard\n"); tsem_post(&pParam->rspSem); return 0; } @@ -517,7 +517,7 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { return -1; } tDecodeSMqConsumeRsp(pMsg->pData, pRsp); - /*printf("rsp %ld %ld %d\n", pRsp->committedOffset, pRsp->rspOffset, pRsp->numOfTopics);*/ + /*printf("rsp commit off:%ld rsp off:%ld has data:%d\n", pRsp->committedOffset, pRsp->rspOffset, pRsp->numOfTopics);*/ if (pRsp->numOfTopics == 0) { /*printf("no data\n");*/ free(pRsp); @@ -671,16 +671,19 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { } tmq->nextTopicIdx = (tmq->nextTopicIdx + 1) % taosArrayGetSize(tmq->clientTopics); - pTopic->nextVgIdx = (pTopic->nextVgIdx + 1 % taosArrayGetSize(pTopic->vgs)); + pTopic->nextVgIdx = (pTopic->nextVgIdx + 1) % taosArrayGetSize(pTopic->vgs); SMqClientVg* pVg = taosArrayGet(pTopic->vgs, pTopic->nextVgIdx); + /*printf("consume vg %d, offset %ld\n", pVg->vgId, pVg->currentOffset);*/ SMqConsumeReq* pReq = tmqBuildConsumeReqImpl(tmq, blocking_time, TMQ_REQ_TYPE_CONSUME_ONLY, pTopic, pVg); if (pReq == NULL) { + ASSERT(false); usleep(blocking_time * 1000); return NULL; } SMqConsumeCbParam* param = malloc(sizeof(SMqConsumeCbParam)); if (param == NULL) { + ASSERT(false); usleep(blocking_time * 1000); return NULL; } diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index e87021b44d..21ebea258d 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -59,6 +59,7 @@ SMqConsumerObj* mndCreateConsumer(int64_t consumerId, const char* cgroup) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } + pConsumer->recentRemovedTopics = taosArrayInit(0, sizeof(char*)); pConsumer->epoch = 1; pConsumer->consumerId = consumerId; atomic_store_32(&pConsumer->status, MQ_CONSUMER_STATUS__INIT); @@ -169,7 +170,7 @@ SMqConsumerObj *mndAcquireConsumer(SMnode *pMnode, int64_t consumerId) { SSdb *pSdb = pMnode->pSdb; SMqConsumerObj *pConsumer = sdbAcquire(pSdb, SDB_CONSUMER, &consumerId); if (pConsumer == NULL) { - /*terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;*/ + terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST; } return pConsumer; } diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index c5bebb3249..28e769a7ff 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -55,7 +55,7 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg); static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg); static int mndPersistMqSetConnReq(SMnode *pMnode, STrans *pTrans, const SMqTopicObj *pTopic, const char *cgroup, - const SMqConsumerEp *pSub); + const SMqConsumerEp *pConsumerEp); static int32_t mndPersistRebalanceMsg(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp); @@ -133,6 +133,8 @@ static int32_t mndBuildRebalanceMsg(void **pBuf, int32_t *pLen, const SMqConsume } static int32_t mndPersistRebalanceMsg(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp) { + + ASSERT(pConsumerEp->oldConsumerId != -1); int32_t vgId = pConsumerEp->vgId; SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId); @@ -146,7 +148,7 @@ static int32_t mndPersistRebalanceMsg(SMnode *pMnode, STrans *pTrans, const SMqC action.epSet = mndGetVgroupEpset(pMnode, pVgObj); action.pCont = buf; action.contLen = sizeof(SMsgHead) + tlen; - action.msgType = TDMT_VND_MQ_SET_CONN; + action.msgType = TDMT_VND_MQ_REB; mndReleaseVgroup(pMnode, pVgObj); if (mndTransAppendRedoAction(pTrans, &action) != 0) { @@ -220,7 +222,7 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) { //TODO int32_t hbStatus = atomic_load_32(&pConsumer->hbStatus); - mInfo("try to get sub ep, old val: %d", hbStatus); + mTrace("try to get sub ep, old val: %d", hbStatus); atomic_store_32(&pConsumer->hbStatus, 0); /*SSdbRaw *pConsumerRaw = mndConsumerActionEncode(pConsumer);*/ /*sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY);*/ @@ -311,7 +313,7 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) { while (1) { pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer); if (pIter == NULL) break; - int32_t hbStatus = atomic_fetch_add_32(&pConsumer->hbStatus, 1); + int32_t hbStatus = atomic_add_fetch_32(&pConsumer->hbStatus, 1); if (hbStatus > MND_SUBSCRIBE_REBALANCE_CNT) { int32_t old = atomic_val_compare_exchange_32(&pConsumer->status, MQ_CONSUMER_STATUS__ACTIVE, MQ_CONSUMER_STATUS__LOST); @@ -324,10 +326,6 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) { SMqRebSubscribe *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key); taosArrayPush(pRebSub->lostConsumers, &pConsumer->consumerId); } - /*pRebMsg->consumerId = pConsumer->consumerId;*/ - /*SRpcMsg rpcMsg = {.msgType = TDMT_MND_MQ_DO_REBALANCE, .pCont = pRebMsg, .contLen = - * sizeof(SMqDoRebalanceMsg)};*/ - /*pMnode->putReqToMWriteQFp(pMnode->pDnode, &rpcMsg);*/ } } int32_t status = atomic_load_32(&pConsumer->status); @@ -402,8 +400,6 @@ static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) { int32_t vgEachConsumer = vgNum / consumerNum; int32_t imbalanceVg = vgNum % consumerNum; int32_t imbalanceSolved = 0; - /*SArray *unassignedVgStash = taosArrayInit(0, sizeof(SMqConsumerEp));*/ - /*SArray *unassignedConsumerIdx = taosArrayInit(0, sizeof(int32_t));*/ // iterate all consumers, set unassignedVgStash for (int i = 0; i < consumerNum; i++) { @@ -413,7 +409,7 @@ static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) { if (i < imbalanceVg) vgThisConsumerAfterRb = vgEachConsumer + 1; else vgThisConsumerAfterRb = vgEachConsumer; - mInfo("mq consumer:%ld, connectted vgroup change from %d %d", pSubConsumer->consumerId, vgThisConsumerBeforeRb, vgThisConsumerAfterRb); + mInfo("mq consumer:%ld, connectted vgroup number change from %d to %d", pSubConsumer->consumerId, vgThisConsumerBeforeRb, vgThisConsumerAfterRb); while(taosArrayGetSize(pSubConsumer->vgInfo) > vgThisConsumerAfterRb) { SMqConsumerEp *pConsumerEp = taosArrayPop(pSubConsumer->vgInfo); @@ -435,7 +431,7 @@ static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) { atomic_store_32(&pRebConsumer->status, MQ_CONSUMER_STATUS__IDLE); } - mInfo("mq consumer:%ld, status change from %d %d", pRebConsumer->consumerId, status, pRebConsumer->status); + mInfo("mq consumer:%ld, status change from %d to %d", pRebConsumer->consumerId, status, pRebConsumer->status); SSdbRaw *pConsumerRaw = mndConsumerActionEncode(pRebConsumer); sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY); @@ -461,9 +457,23 @@ static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) { pConsumerEp->consumerId = pSubConsumer->consumerId; taosArrayPush(pSubConsumer->vgInfo, pConsumerEp); - mInfo("mq consumer:%ld , assign vgroup %d, previously assigned to consumer %ld", pSubConsumer->consumerId, pConsumerEp->vgId, pConsumerEp->oldConsumerId); + if (pConsumerEp->oldConsumerId == -1) { + char* topic; + char* cgroup; + mndSplitSubscribeKey(pSub->key, &topic, &cgroup); + SMqTopicObj* pTopic = mndAcquireTopic(pMnode, topic); - mndPersistRebalanceMsg(pMnode, pTrans, pConsumerEp); + mInfo("mq set conn: assign vgroup %d of topic %s to consumer %ld", pConsumerEp->vgId, topic, pConsumerEp->consumerId); + + mndPersistMqSetConnReq(pMnode, pTrans, pTopic, cgroup, pConsumerEp); + mndReleaseTopic(pMnode, pTopic); + free(topic); + free(cgroup); + } else { + mInfo("mq rebalance: assign vgroup %d, from consumer %ld to consumer %ld", pConsumerEp->vgId, pConsumerEp->oldConsumerId, pConsumerEp->consumerId); + + mndPersistRebalanceMsg(pMnode, pTrans, pConsumerEp); + } } } } @@ -707,11 +717,6 @@ static int mndInitUnassignedVg(SMnode *pMnode, const SMqTopicObj *pTopic, SMqSub mError("unsupport topic: %s, sql: %s", pTopic->name, pTopic->sql); return -1; } - /*if (pArray && taosArrayGetSize(pArray) != 1) {*/ - /*terrno = TSDB_CODE_MND_UNSUPPORTED_TOPIC;*/ - /*mError("unsupport topic: %s, sql: %s, plan level: %ld", pTopic->name, pTopic->sql, taosArrayGetSize(pArray));*/ - /*return -1;*/ - /*}*/ SMqConsumerEp consumerEp = {0}; consumerEp.status = 0; @@ -733,12 +738,14 @@ static int mndInitUnassignedVg(SMnode *pMnode, const SMqTopicObj *pTopic, SMqSub static int mndPersistMqSetConnReq(SMnode *pMnode, STrans *pTrans, const SMqTopicObj *pTopic, const char *cgroup, const SMqConsumerEp *pConsumerEp) { + + ASSERT(pConsumerEp->oldConsumerId == -1); int32_t vgId = pConsumerEp->vgId; SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId); SMqSetCVgReq req = { .vgId = vgId, - .oldConsumerId = pConsumerEp->oldConsumerId, + .oldConsumerId = -1, .newConsumerId = pConsumerEp->consumerId, .sql = pTopic->sql, .logicalPlan = pTopic->logicalPlan, @@ -892,7 +899,7 @@ SMqSubscribeObj *mndAcquireSubscribe(SMnode *pMnode, const char *cgroup, const c SMqSubscribeObj *pSub = sdbAcquire(pSdb, SDB_SUBSCRIBE, key); free(key); if (pSub == NULL) { - /*terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;*/ + terrno = TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST; } return pSub; } @@ -901,7 +908,7 @@ SMqSubscribeObj *mndAcquireSubscribeByKey(SMnode *pMnode, const char *key) { SSdb *pSdb = pMnode->pSdb; SMqSubscribeObj *pSub = sdbAcquire(pSdb, SDB_SUBSCRIBE, key); if (pSub == NULL) { - /*terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;*/ + terrno = TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST; } return pSub; } @@ -1030,8 +1037,13 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { pConsumerEp->oldConsumerId = pConsumerEp->consumerId; pConsumerEp->consumerId = consumerId; taosArrayPush(mqSubConsumer.vgInfo, pConsumerEp); - mndPersistMqSetConnReq(pMnode, pTrans, pTopic, cgroup, pConsumerEp); - atomic_store_32(&pConsumer->status, MQ_CONSUMER_STATUS__ACTIVE); + if (pConsumerEp->oldConsumerId == -1) { + mndPersistMqSetConnReq(pMnode, pTrans, pTopic, cgroup, pConsumerEp); + } else { + mndPersistRebalanceMsg(pMnode, pTrans, pConsumerEp); + } + // do not set status active to trigger rebalance + /*atomic_store_32(&pConsumer->status, MQ_CONSUMER_STATUS__ACTIVE);*/ } SSdbRaw *pRaw = mndSubActionEncode(pSub); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 79ce5370f2..de4929b7df 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -144,6 +144,8 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { SMqConsumeRsp rsp = {.consumerId = consumerId, .numOfTopics = 0, .pBlockData = NULL}; + /*printf("vg %d get consume req\n", pReq->head.vgId);*/ + STqConsumerHandle* pConsumer = tqHandleGet(pTq->tqMeta, consumerId); if (pConsumer == NULL) { pMsg->pCont = NULL; @@ -158,6 +160,7 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { STqTopicHandle* pTopic = taosArrayGet(pConsumer->topics, i); // TODO: support multiple topic in one req if (strcmp(pTopic->topicName, pReq->topic) != 0) { + ASSERT(false); continue; } @@ -181,6 +184,7 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { if (fetchOffset <= pTopic->committedOffset) { fetchOffset = pTopic->committedOffset + 1; } + /*printf("vg %d fetch Offset %ld\n", pReq->head.vgId, fetchOffset);*/ int8_t pos; int8_t skip = 0; SWalHead* pHead; @@ -297,20 +301,16 @@ int32_t tqProcessRebReq(STQ* pTq, char* msg) { int32_t tqProcessSetConnReq(STQ* pTq, char* msg) { SMqSetCVgReq req = {0}; tDecodeSMqSetCVgReq(msg, &req); + ASSERT(req.oldConsumerId == -1); STqConsumerHandle* pConsumer = tqHandleGet(pTq->tqMeta, req.oldConsumerId); + /*printf("vg %d set to consumer from %ld to %ld\n", req.vgId, req.oldConsumerId, req.newConsumerId);*/ if (pConsumer == NULL) { pConsumer = calloc(sizeof(STqConsumerHandle), 1); if (pConsumer == NULL) { terrno = TSDB_CODE_TQ_OUT_OF_MEMORY; return -1; } - } else { - tqHandleMovePut(pTq->tqMeta, req.newConsumerId, pConsumer); - tqHandleCommit(pTq->tqMeta, req.newConsumerId); - tqHandlePurge(pTq->tqMeta, req.oldConsumerId); - terrno = TSDB_CODE_SUCCESS; - return 0; } strcpy(pConsumer->cgroup, req.cgroup); pConsumer->topics = taosArrayInit(0, sizeof(STqTopicHandle));