From 0a83ecac8150ae69ce2ae1eff7b7a8aeb88ee9e8 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Tue, 4 Jan 2022 10:40:34 +0800 Subject: [PATCH] add sub structure --- include/common/tmsg.h | 104 ++++++++++++---- include/util/tdef.h | 1 + source/dnode/mnode/impl/inc/mndDef.h | 103 +++++++++++----- source/dnode/mnode/impl/src/mndConsumer.c | 140 +++++++++++++++++----- 4 files changed, 264 insertions(+), 84 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index ba8bb7ca2f..2b17e6db7a 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -320,14 +320,23 @@ typedef struct SEpSet { } SEpSet; static FORCE_INLINE int taosEncodeSEpSet(void** buf, const SEpSet* pEp) { - if(buf == NULL) return sizeof(SEpSet); - memcpy(buf, pEp, sizeof(SEpSet)); - //TODO: endian conversion - return sizeof(SEpSet); + int tlen = 0; + tlen += taosEncodeFixedI8(buf, pEp->inUse); + tlen += taosEncodeFixedI8(buf, pEp->numOfEps); + for (int i = 0; i < TSDB_MAX_REPLICA; i++) { + tlen += taosEncodeFixedU16(buf, pEp->port[i]); + tlen += taosEncodeString(buf, pEp->fqdn[i]); + } + return tlen; } -static FORCE_INLINE void* taosDecodeSEpSet(void* buf, SEpSet* pEpSet) { - memcpy(pEpSet, buf, sizeof(SEpSet)); +static FORCE_INLINE void* taosDecodeSEpSet(void* buf, SEpSet* pEp) { + buf = taosDecodeFixedI8(buf, &pEp->inUse); + buf = taosDecodeFixedI8(buf, &pEp->numOfEps); + for (int i = 0; i < TSDB_MAX_REPLICA; i++) { + buf = taosDecodeFixedU16(buf, &pEp->port[i]); + buf = taosDecodeStringTo(buf, pEp->fqdn[i]); + } return buf; } @@ -1088,16 +1097,16 @@ typedef struct STaskDropRsp { } STaskDropRsp; typedef struct { - int8_t igExists; - char* name; - char* physicalPlan; - char* logicalPlan; + int8_t igExists; + char* name; + char* physicalPlan; + char* logicalPlan; } SCMCreateTopicReq; static FORCE_INLINE int tSerializeSCMCreateTopicReq(void** buf, const SCMCreateTopicReq* pReq) { int tlen = 0; - tlen += taosEncodeString(buf, pReq->name); tlen += taosEncodeFixedI8(buf, pReq->igExists); + tlen += taosEncodeString(buf, pReq->name); tlen += taosEncodeString(buf, pReq->physicalPlan); tlen += taosEncodeString(buf, pReq->logicalPlan); return tlen; @@ -1127,41 +1136,62 @@ static FORCE_INLINE void* tDeserializeSCMCreateTopicRsp(void* buf, SCMCreateTopi } typedef struct { - char* topicName; - char* consumerGroup; + int32_t topicNum; int64_t consumerId; + char* consumerGroup; + char* topicName[]; } SCMSubscribeReq; static FORCE_INLINE int tSerializeSCMSubscribeReq(void** buf, const SCMSubscribeReq* pReq) { int tlen = 0; - tlen += taosEncodeString(buf, pReq->topicName); - tlen += taosEncodeString(buf, pReq->consumerGroup); + tlen += taosEncodeFixedI32(buf, pReq->topicNum); tlen += taosEncodeFixedI64(buf, pReq->consumerId); + tlen += taosEncodeString(buf, pReq->consumerGroup); + for(int i = 0; i < pReq->topicNum; i++) { + tlen += taosEncodeString(buf, pReq->topicName[i]); + } return tlen; } static FORCE_INLINE void* tDeserializeSCMSubscribeReq(void* buf, SCMSubscribeReq* pReq) { - buf = taosDecodeString(buf, &pReq->topicName); - buf = taosDecodeString(buf, &pReq->consumerGroup); + buf = taosDecodeFixedI32(buf, &pReq->topicNum); buf = taosDecodeFixedI64(buf, &pReq->consumerId); + buf = taosDecodeString(buf, &pReq->consumerGroup); + for(int i = 0; i < pReq->topicNum; i++) { + buf = taosDecodeString(buf, &pReq->topicName[i]); + } return buf; } -typedef struct { +typedef struct SMqSubTopic { int32_t vgId; - SEpSet pEpSet; + int64_t topicId; + SEpSet epSet; +} SMqSubTopic; + +typedef struct { + int32_t topicNum; + SMqSubTopic topics[]; } SCMSubscribeRsp; static FORCE_INLINE int tSerializeSCMSubscribeRsp(void** buf, const SCMSubscribeRsp* pRsp) { int tlen = 0; - tlen += taosEncodeFixedI32(buf, pRsp->vgId); - tlen += taosEncodeSEpSet(buf, &pRsp->pEpSet); + tlen += taosEncodeFixedI32(buf, pRsp->topicNum); + for(int i = 0; i < pRsp->topicNum; i++) { + tlen += taosEncodeFixedI32(buf, pRsp->topics[i].vgId); + tlen += taosEncodeFixedI64(buf, pRsp->topics[i].topicId); + tlen += taosEncodeSEpSet(buf, &pRsp->topics[i].epSet); + } return tlen; } static FORCE_INLINE void* tDeserializeSCMSubscribeRsp(void* buf, SCMSubscribeRsp* pRsp) { - buf = taosDecodeFixedI32(buf, &pRsp->vgId); - buf = taosDecodeSEpSet(buf, &pRsp->pEpSet); + buf = taosDecodeFixedI32(buf, &pRsp->topicNum); + for(int i = 0; i < pRsp->topicNum; i++) { + buf = taosDecodeFixedI32(buf, &pRsp->topics[i].vgId); + buf = taosDecodeFixedI64(buf, &pRsp->topics[i].topicId); + buf = taosDecodeSEpSet(buf, &pRsp->topics[i].epSet); + } return buf; } @@ -1170,10 +1200,36 @@ typedef struct { int64_t consumerId; int64_t consumerGroupId; int64_t offset; + char* sql; + char* logicalPlan; + char* physicalPlan; } SMVSubscribeReq; +static FORCE_INLINE int tSerializeSMVSubscribeReq(void* buf, SMVSubscribeReq* pReq) { + int tlen = 0; + tlen += taosEncodeFixedI64(buf, pReq->topicId); + tlen += taosEncodeFixedI64(buf, pReq->consumerId); + tlen += taosEncodeFixedI64(buf, pReq->consumerGroupId); + tlen += taosEncodeFixedI64(buf, pReq->offset); + tlen += taosEncodeString(buf, pReq->sql); + tlen += taosEncodeString(buf, pReq->logicalPlan); + tlen += taosEncodeString(buf, pReq->physicalPlan); + return tlen; +} + +static FORCE_INLINE void* tDeserializeSMVSubscribeReq(void* buf, SMVSubscribeReq* pReq) { + buf = taosDecodeFixedI64(buf, &pReq->topicId); + buf = taosDecodeFixedI64(buf, &pReq->consumerId); + buf = taosDecodeFixedI64(buf, &pReq->consumerGroupId); + buf = taosDecodeFixedI64(buf, &pReq->offset); + buf = taosDecodeString(buf, &pReq->sql); + buf = taosDecodeString(buf, &pReq->logicalPlan); + buf = taosDecodeString(buf, &pReq->physicalPlan); + return buf; +} + typedef struct { - int64_t newOffset; + int64_t status; } SMVSubscribeRsp; typedef struct { diff --git a/include/util/tdef.h b/include/util/tdef.h index 233e9f0f55..9f16b58e0d 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -177,6 +177,7 @@ do { \ #define TSDB_TYPE_STR_MAX_LEN 32 #define TSDB_TABLE_FNAME_LEN (TSDB_DB_FNAME_LEN + TSDB_TABLE_NAME_LEN + TSDB_NAME_DELIMITER_LEN) #define TSDB_TOPIC_FNAME_LEN TSDB_TABLE_FNAME_LEN +#define TSDB_CONSUMER_GROUP_LEN 192 #define TSDB_COL_NAME_LEN 65 #define TSDB_MAX_SAVED_SQL_LEN TSDB_MAX_COLUMNS * 64 #define TSDB_MAX_SQL_LEN TSDB_PAYLOAD_SIZE diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index a874e67210..986914a8e2 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -22,6 +22,7 @@ #include "sync.h" #include "tmsg.h" #include "thash.h" +#include "tlist.h" #include "tlog.h" #include "trpc.h" #include "ttimer.h" @@ -288,14 +289,81 @@ typedef struct { char payload[]; } SShowObj; +typedef struct SConsumerObj { + uint64_t uid; + int64_t createTime; + int64_t updateTime; + //uint64_t dbUid; + int32_t version; + SRWLatch lock; + SList* topics; +} SConsumerObj; + +typedef struct SMqSubConsumerObj { + int64_t consumerUid; // if -1, unassigned + SList* vgId; //SList +} SMqSubConsumerObj; + +typedef struct SMqSubCGroupObj { + char name[TSDB_CONSUMER_GROUP_LEN]; + SList* consumers; //SList +} SMqSubCGroupObj; + +typedef struct SMqSubTopicObj { + char name[TSDB_TOPIC_FNAME_LEN]; + char db[TSDB_DB_FNAME_LEN]; + int64_t createTime; + int64_t updateTime; + int64_t uid; + int64_t dbUid; + int32_t version; + SRWLatch lock; + int32_t sqlLen; + char* sql; + char* logicalPlan; + char* physicalPlan; + SList* cgroups; //SList +} SMqSubTopicObj; + +typedef struct SMqConsumerSubObj { + int64_t topicUid; + SList* vgIds; //SList +} SMqConsumerSubObj; + +typedef struct SMqConsumerHbObj { + int64_t consumerId; + SList* consumerSubs; //SList +} SMqConsumerHbObj; + +typedef struct SMqVGroupSubObj { + int64_t topicUid; + SList* consumerIds; //SList +} SMqVGroupSubObj; + +typedef struct SMqVGroupHbObj { + int64_t vgId; + SList* vgSubs; //SList +} SMqVGroupHbObj; + +typedef struct SCGroupObj { + char name[TSDB_TOPIC_FNAME_LEN]; + int64_t createTime; + int64_t updateTime; + uint64_t uid; + //uint64_t dbUid; + int32_t version; + SRWLatch lock; + SList* consumerIds; +} SCGroupObj; + typedef struct { - char name[TSDB_TOPIC_FNAME_LEN]; - char db[TSDB_DB_FNAME_LEN]; - int64_t createTime; - int64_t updateTime; + char name[TSDB_TOPIC_FNAME_LEN]; + char db[TSDB_DB_FNAME_LEN]; + int64_t createTime; + int64_t updateTime; uint64_t uid; uint64_t dbUid; - int32_t version; + int32_t version; SRWLatch lock; int32_t execLen; void* executor; @@ -303,32 +371,9 @@ typedef struct { char* sql; char* logicalPlan; char* physicalPlan; + SList* consumerIds; } STopicObj; -typedef struct { - char name[TSDB_TOPIC_FNAME_LEN]; - char db[TSDB_DB_FNAME_LEN]; - int64_t createTime; - int64_t updateTime; - uint64_t uid; - //uint64_t dbUid; - int32_t version; - SRWLatch lock; - -} SConsumerObj; - -typedef struct { - char name[TSDB_TOPIC_FNAME_LEN]; - char db[TSDB_DB_FNAME_LEN]; - int64_t createTime; - int64_t updateTime; - uint64_t uid; - //uint64_t dbUid; - int32_t version; - SRWLatch lock; - -} SCGroupObj; - typedef struct SMnodeMsg { char user[TSDB_USER_LEN]; char db[TSDB_DB_FNAME_LEN]; diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 9e7cdbf09e..5391b0f73e 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -66,17 +66,82 @@ int32_t mndInitConsumer(SMnode *pMnode) { void mndCleanupConsumer(SMnode *pMnode) {} +static SSdbRaw *mndCGroupActionEncode(SCGroupObj *pCGroup) { + int32_t size = sizeof(SConsumerObj) + MND_CONSUMER_RESERVE_SIZE; + SSdbRaw *pRaw = sdbAllocRaw(SDB_CONSUMER, MND_CONSUMER_VER_NUMBER, size); + if (pRaw == NULL) return NULL; + + int32_t dataPos = 0; + SDB_SET_BINARY(pRaw, dataPos, pCGroup->name, TSDB_TABLE_FNAME_LEN); + SDB_SET_INT64(pRaw, dataPos, pCGroup->createTime); + SDB_SET_INT64(pRaw, dataPos, pCGroup->updateTime); + SDB_SET_INT64(pRaw, dataPos, pCGroup->uid); + /*SDB_SET_INT64(pRaw, dataPos, pConsumer->dbUid);*/ + SDB_SET_INT32(pRaw, dataPos, pCGroup->version); + + int32_t sz = listNEles(pCGroup->consumerIds); + SDB_SET_INT32(pRaw, dataPos, sz); + + SListIter iter; + tdListInitIter(pCGroup->consumerIds, &iter, TD_LIST_FORWARD); + SListNode *pn = NULL; + while ((pn = tdListNext(&iter)) != NULL) { + int64_t consumerId = *(int64_t *)pn->data; + SDB_SET_INT64(pRaw, dataPos, consumerId); + } + + SDB_SET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE); + SDB_SET_DATALEN(pRaw, dataPos); + return pRaw; +} + +static SSdbRow *mndCGroupActionDecode(SSdbRaw *pRaw) { + int8_t sver = 0; + if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL; + + if (sver != MND_CONSUMER_VER_NUMBER) { + terrno = TSDB_CODE_SDB_INVALID_DATA_VER; + mError("failed to decode cgroup since %s", terrstr()); + return NULL; + } + + // TODO: maximum size is not known + int32_t size = sizeof(SCGroupObj) + 128 * sizeof(int64_t); + SSdbRow *pRow = sdbAllocRow(size); + SCGroupObj *pCGroup = sdbGetRowObj(pRow); + if (pCGroup == NULL) return NULL; + + int32_t dataPos = 0; + SDB_GET_BINARY(pRaw, pRow, dataPos, pCGroup->name, TSDB_TABLE_FNAME_LEN); + SDB_GET_INT64(pRaw, pRow, dataPos, &pCGroup->createTime); + SDB_GET_INT64(pRaw, pRow, dataPos, &pCGroup->updateTime); + SDB_GET_INT64(pRaw, pRow, dataPos, &pCGroup->uid); + /*SDB_GET_INT64(pRaw, pRow, dataPos, &pConsumer->dbUid);*/ + SDB_GET_INT32(pRaw, pRow, dataPos, &pCGroup->version); + + int32_t sz; + SDB_GET_INT32(pRaw, pRow, dataPos, &sz); + // TODO: free list when failing + tdListInit(pCGroup->consumerIds, sizeof(int64_t)); + for (int i = 0; i < sz; i++) { + int64_t consumerId; + SDB_GET_INT64(pRaw, pRow, dataPos, &consumerId); + tdListAppend(pCGroup->consumerIds, &consumerId); + } + + SDB_GET_RESERVE(pRaw, pRow, dataPos, MND_CONSUMER_RESERVE_SIZE); + return pRow; +} + static SSdbRaw *mndConsumerActionEncode(SConsumerObj *pConsumer) { int32_t size = sizeof(SConsumerObj) + MND_CONSUMER_RESERVE_SIZE; SSdbRaw *pRaw = sdbAllocRaw(SDB_CONSUMER, MND_CONSUMER_VER_NUMBER, size); if (pRaw == NULL) return NULL; int32_t dataPos = 0; - SDB_SET_BINARY(pRaw, dataPos, pConsumer->name, TSDB_TABLE_FNAME_LEN); - SDB_SET_BINARY(pRaw, dataPos, pConsumer->db, TSDB_DB_FNAME_LEN); + SDB_SET_INT64(pRaw, dataPos, pConsumer->uid); SDB_SET_INT64(pRaw, dataPos, pConsumer->createTime); SDB_SET_INT64(pRaw, dataPos, pConsumer->updateTime); - SDB_SET_INT64(pRaw, dataPos, pConsumer->uid); /*SDB_SET_INT64(pRaw, dataPos, pConsumer->dbUid);*/ SDB_SET_INT32(pRaw, dataPos, pConsumer->version); @@ -102,11 +167,9 @@ static SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) { if (pConsumer == NULL) return NULL; int32_t dataPos = 0; - SDB_GET_BINARY(pRaw, pRow, dataPos, pConsumer->name, TSDB_TABLE_FNAME_LEN); - SDB_GET_BINARY(pRaw, pRow, dataPos, pConsumer->db, TSDB_DB_FNAME_LEN); + SDB_GET_INT64(pRaw, pRow, dataPos, &pConsumer->uid); SDB_GET_INT64(pRaw, pRow, dataPos, &pConsumer->createTime); SDB_GET_INT64(pRaw, pRow, dataPos, &pConsumer->updateTime); - SDB_GET_INT64(pRaw, pRow, dataPos, &pConsumer->uid); /*SDB_GET_INT64(pRaw, pRow, dataPos, &pConsumer->dbUid);*/ SDB_GET_INT32(pRaw, pRow, dataPos, &pConsumer->version); @@ -116,17 +179,17 @@ static SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) { } static int32_t mndConsumerActionInsert(SSdb *pSdb, SConsumerObj *pConsumer) { - mTrace("consumer:%s, perform insert action", pConsumer->name); + mTrace("consumer:%ld, perform insert action", pConsumer->uid); return 0; } static int32_t mndConsumerActionDelete(SSdb *pSdb, SConsumerObj *pConsumer) { - mTrace("consumer:%s, perform delete action", pConsumer->name); + mTrace("consumer:%ld, perform delete action", pConsumer->uid); return 0; } static int32_t mndConsumerActionUpdate(SSdb *pSdb, SConsumerObj *pOldConsumer, SConsumerObj *pNewConsumer) { - mTrace("consumer:%s, perform update action", pOldConsumer->name); + mTrace("consumer:%ld, perform update action", pOldConsumer->uid); atomic_exchange_32(&pOldConsumer->updateTime, pNewConsumer->updateTime); atomic_exchange_32(&pOldConsumer->version, pNewConsumer->version); @@ -157,9 +220,34 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { char *msgStr = pMsg->rpcMsg.pCont; SCMSubscribeReq *pSubscribe; tDeserializeSCMSubscribeReq(msgStr, pSubscribe); - // add consumerGroupId -> list to sdb - // add consumerId -> list to sdb - // add consumer -> list to sdb + int topicNum = pSubscribe->topicNum; + int64_t consumerId = pSubscribe->consumerId; + char *consumerGroup = pSubscribe->consumerGroup; + // get consumer group and add client into it + SCGroupObj *pCGroup = sdbAcquire(pMnode->pSdb, SDB_CGROUP, consumerGroup); + if (pCGroup != NULL) { + // iterate the list until finding the consumer + // add consumer to cgroup list if not found + // put new record + } + + SConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId); + if (pConsumer != NULL) { + //reset topic list + } + + for (int i = 0; i < topicNum; i++) { + char *topicName = pSubscribe->topicName[i]; + STopicObj *pTopic = mndAcquireTopic(pMnode, topicName); + //get + // consumer id + SList *list = pTopic->consumerIds; + // add the consumer if not in the list + // + SList* topicList = pConsumer->topics; + //add to topic + } + return 0; } @@ -255,9 +343,7 @@ static int32_t mndGetNumOfConsumers(SMnode *pMnode, char *dbName, int32_t *pNumO pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer); if (pIter == NULL) break; - if (strcmp(pConsumer->db, dbName) == 0) { - numOfConsumers++; - } + numOfConsumers++; sdbRelease(pSdb, pConsumer); } @@ -316,11 +402,11 @@ static int32_t mndGetConsumerMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMs return 0; } -static int32_t mndRetrieveConsumer(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows) { +static int32_t mndRetrieveCGroup(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows) { SMnode *pMnode = pMsg->pMnode; SSdb *pSdb = pMnode->pSdb; int32_t numOfRows = 0; - SConsumerObj *pConsumer = NULL; + SCGroupObj *pCGroup = NULL; int32_t cols = 0; char *pWrite; char prefix[64] = {0}; @@ -330,36 +416,28 @@ static int32_t mndRetrieveConsumer(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t prefixLen = (int32_t)strlen(prefix); while (numOfRows < rows) { - pShow->pIter = sdbFetch(pSdb, SDB_CONSUMER, pShow->pIter, (void **)&pConsumer); + pShow->pIter = sdbFetch(pSdb, SDB_CONSUMER, pShow->pIter, (void **)&pCGroup); if (pShow->pIter == NULL) break; - if (strncmp(pConsumer->name, prefix, prefixLen) != 0) { - sdbRelease(pSdb, pConsumer); + if (strncmp(pCGroup->name, prefix, prefixLen) != 0) { + sdbRelease(pSdb, pCGroup); continue; } cols = 0; char consumerName[TSDB_TABLE_NAME_LEN] = {0}; - tstrncpy(consumerName, pConsumer->name + prefixLen, TSDB_TABLE_NAME_LEN); + tstrncpy(consumerName, pCGroup->name + prefixLen, TSDB_TABLE_NAME_LEN); pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; STR_TO_VARSTR(pWrite, consumerName); cols++; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - *(int64_t *)pWrite = pConsumer->createTime; + *(int64_t *)pWrite = pCGroup->createTime; cols++; - /*pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;*/ - /**(int32_t *)pWrite = pConsumer->numOfColumns;*/ - /*cols++;*/ - - /*pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;*/ - /**(int32_t *)pWrite = pConsumer->numOfTags;*/ - /*cols++;*/ - numOfRows++; - sdbRelease(pSdb, pConsumer); + sdbRelease(pSdb, pCGroup); } pShow->numOfReads += numOfRows;