diff --git a/include/common/tmsg.h b/include/common/tmsg.h index e6defca724..f32fdcbae7 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -234,6 +234,92 @@ static FORCE_INLINE void* taosDecodeSClientHbKey(void* buf, SClientHbKey* pKey) return buf; } +typedef struct SMqHbVgInfo { + int32_t vgId; +} SMqHbVgInfo; + +static FORCE_INLINE int taosEncodeSMqVgInfo(void** buf, const SMqHbVgInfo* pVgInfo) { + int tlen = 0; + tlen += taosEncodeFixedI32(buf, pVgInfo->vgId); + return tlen; +} + +static FORCE_INLINE void* taosDecodeSMqVgInfo(void* buf, SMqHbVgInfo* pVgInfo) { + buf = taosDecodeFixedI32(buf, &pVgInfo->vgId); + return buf; +} + +typedef struct SMqHbTopicInfo { + int32_t epoch; + int64_t topicUid; + char name[TSDB_TOPIC_FNAME_LEN]; + SArray* pVgInfo; +} SMqHbTopicInfo; + +static FORCE_INLINE int taosEncodeSMqHbTopicInfoMsg(void** buf, const SMqHbTopicInfo* pTopicInfo) { + int tlen = 0; + tlen += taosEncodeFixedI32(buf, pTopicInfo->epoch); + tlen += taosEncodeFixedI64(buf, pTopicInfo->topicUid); + tlen += taosEncodeString(buf, pTopicInfo->name); + int32_t sz = taosArrayGetSize(pTopicInfo->pVgInfo); + tlen += taosEncodeFixedI32(buf, sz); + for (int32_t i = 0; i < sz; i++) { + SMqHbVgInfo* pVgInfo = (SMqHbVgInfo*)taosArrayGet(pTopicInfo->pVgInfo, i); + tlen += taosEncodeSMqVgInfo(buf, pVgInfo); + } + return tlen; +} + +static FORCE_INLINE void* taosDecodeSMqHbTopicInfoMsg(void* buf, SMqHbTopicInfo* pTopicInfo) { + buf = taosDecodeFixedI32(buf, &pTopicInfo->epoch); + buf = taosDecodeFixedI64(buf, &pTopicInfo->topicUid); + buf = taosDecodeStringTo(buf, pTopicInfo->name); + int32_t sz; + buf = taosDecodeFixedI32(buf, &sz); + pTopicInfo->pVgInfo = taosArrayInit(sz, sizeof(SMqHbVgInfo)); + for (int32_t i = 0; i < sz; i++) { + SMqHbVgInfo vgInfo; + buf = taosDecodeSMqVgInfo(buf, &vgInfo); + taosArrayPush(pTopicInfo->pVgInfo, &vgInfo); + } + return buf; +} + +typedef struct SMqHbMsg { + int32_t status; // ask hb endpoint + int32_t epoch; + int64_t consumerId; + SArray* pTopics; // SArray +} SMqHbMsg; + +static FORCE_INLINE int taosEncodeSMqMsg(void** buf, const SMqHbMsg* pMsg) { + int tlen = 0; + tlen += taosEncodeFixedI32(buf, pMsg->status); + tlen += taosEncodeFixedI32(buf, pMsg->epoch); + tlen += taosEncodeFixedI64(buf, pMsg->consumerId); + int32_t sz = taosArrayGetSize(pMsg->pTopics); + tlen += taosEncodeFixedI32(buf, sz); + for (int i = 0; i < sz; i++) { + SMqHbTopicInfo* topicInfo = (SMqHbTopicInfo*)taosArrayGet(pMsg->pTopics, i); + tlen += taosEncodeSMqHbTopicInfoMsg(buf, topicInfo); + } + return tlen; +} + +static FORCE_INLINE void* taosDecodeSMqMsg(void* buf, SMqHbMsg* pMsg) { + buf = taosDecodeFixedI32(buf, &pMsg->status); + buf = taosDecodeFixedI32(buf, &pMsg->epoch); + buf = taosDecodeFixedI64(buf, &pMsg->consumerId); + int32_t sz; + buf = taosDecodeFixedI32(buf, &sz); + pMsg->pTopics = taosArrayInit(sz, sizeof(SMqHbTopicInfo)); + for (int i = 0; i < sz; i++) { + SMqHbTopicInfo topicInfo; + buf = taosDecodeSMqHbTopicInfoMsg(buf, &topicInfo); + taosArrayPush(pMsg->pTopics, &topicInfo); + } + return buf; +} typedef struct { int32_t vgId; @@ -399,6 +485,66 @@ static FORCE_INLINE void* taosDecodeSMqHbRsp(void* buf, SMqHbRsp* pRsp) { return buf; } +typedef struct SMqHbOneTopicBatchRsp { + char topicName[TSDB_TOPIC_FNAME_LEN]; + SArray* rsps; // SArray +} SMqHbOneTopicBatchRsp; + +static FORCE_INLINE int taosEncodeSMqHbOneTopicBatchRsp(void** buf, const SMqHbOneTopicBatchRsp* pBatchRsp) { + int tlen = 0; + tlen += taosEncodeString(buf, pBatchRsp->topicName); + int32_t sz = taosArrayGetSize(pBatchRsp->rsps); + tlen += taosEncodeFixedI32(buf, sz); + for (int32_t i = 0; i < sz; i++) { + SMqHbRsp* pRsp = (SMqHbRsp*)taosArrayGet(pBatchRsp->rsps, i); + tlen += taosEncodeSMqHbRsp(buf, pRsp); + } + return tlen; +} + +static FORCE_INLINE void* taosDecodeSMqHbOneTopicBatchRsp(void* buf, SMqHbOneTopicBatchRsp* pBatchRsp) { + int32_t sz; + buf = taosDecodeStringTo(buf, pBatchRsp->topicName); + buf = taosDecodeFixedI32(buf, &sz); + pBatchRsp->rsps = taosArrayInit(sz, sizeof(SMqHbRsp)); + for (int32_t i = 0; i < sz; i++) { + SMqHbRsp rsp; + buf = taosDecodeSMqHbRsp(buf, &rsp); + buf = taosArrayPush(pBatchRsp->rsps, &rsp); + } + return buf; +} + +typedef struct SMqHbBatchRsp { + int64_t consumerId; + SArray* batchRsps; // SArray +} SMqHbBatchRsp; + +static FORCE_INLINE int taosEncodeSMqHbBatchRsp(void** buf, const SMqHbBatchRsp* pBatchRsp) { + int tlen = 0; + tlen += taosEncodeFixedI64(buf, pBatchRsp->consumerId); + int32_t sz; + tlen += taosEncodeFixedI32(buf, sz); + for (int32_t i = 0; i < sz; i++) { + SMqHbOneTopicBatchRsp* pRsp = (SMqHbOneTopicBatchRsp*) taosArrayGet(pBatchRsp->batchRsps, i); + tlen += taosEncodeSMqHbOneTopicBatchRsp(buf, pRsp); + } + return tlen; +} + +static FORCE_INLINE void* taosDecodeSMqHbBatchRsp(void* buf, SMqHbBatchRsp* pBatchRsp) { + buf = taosDecodeFixedI64(buf, &pBatchRsp->consumerId); + int32_t sz; + buf = taosDecodeFixedI32(buf, &sz); + pBatchRsp->batchRsps = taosArrayInit(sz, sizeof(SMqHbOneTopicBatchRsp)); + for (int32_t i = 0; i < sz; i++) { + SMqHbOneTopicBatchRsp rsp; + buf = taosDecodeSMqHbOneTopicBatchRsp(buf, &rsp); + buf = taosArrayPush(pBatchRsp->batchRsps, &rsp); + } + return buf; +} + typedef struct { int32_t acctId; int64_t clusterId; diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index e2143d20df..51f267e884 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -322,24 +322,27 @@ SArray* tmqGetConnInfo(SClientHbKey connKey, void* param) { taosArrayDestroy(pArray); return NULL; } - strcpy(kv.key, "groupId"); - kv.keyLen = strlen("groupId") + 1; - kv.value = malloc(256); - if (kv.value == NULL) { - free(kv.key); - taosArrayDestroy(pArray); - return NULL; + strcpy(kv.key, "mq-tmp"); + kv.keyLen = strlen("mq-tmp") + 1; + SMqHbMsg* pMqHb = malloc(sizeof(SMqHbMsg)); + if (pMqHb == NULL) { + return pArray; } - strcpy(kv.value, pTmq->groupId); - kv.valueLen = strlen(pTmq->groupId) + 1; - + pMqHb->consumerId = connKey.connId; + SArray* clientTopics = pTmq->clientTopics; + int sz = taosArrayGetSize(clientTopics); + for (int i = 0; i < sz; i++) { + SMqClientTopic* pCTopic = taosArrayGet(clientTopics, i); + if (pCTopic->vgId == -1) { + pMqHb->status = 1; + break; + } + } + kv.value = pMqHb; + kv.valueLen = sizeof(SMqHbMsg); taosArrayPush(pArray, &kv); - strcpy(kv.key, "clientUid"); - kv.keyLen = strlen("clientUid") + 1; - *(uint32_t*)kv.value = pTmq->pTscObj->connId; - kv.valueLen = sizeof(uint32_t); - return NULL; + return pArray; } tmq_t* tmqCreateConsumerImpl(TAOS* conn, tmq_conf_t* conf) { diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index 4170401f00..0ca2941297 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -277,12 +277,15 @@ TEST(testCase, connect_Test) { // ASSERT_EQ(numOfFields, 0); // // taos_free_result(pRes); +// taos_close(pConn); +//} // // pRes = taos_query(pConn, "create stable if not exists abc1.`123_$^)` (ts timestamp, `abc` int) tags(a int)"); // if (taos_errno(pRes) != 0) { // printf("failed to create super table 123_$^), reason:%s\n", taos_errstr(pRes)); // } // +// TAOS_RES* pRes = taos_query(pConn, "use abc1"); // taos_free_result(pRes); // pRes = taos_query(pConn, "drop stable `123_$^)`"); // if (taos_errno(pRes) != 0) { diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 4d1a07be21..48e9dce3c1 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -92,7 +92,7 @@ int tSerializeSClientHbReq(void **buf, const SClientHbReq *pReq) { int32_t kvNum = taosHashGetSize(pReq->info); tlen += taosEncodeFixedI32(buf, kvNum); SKv kv; - void* pIter = taosHashIterate(pReq->info, pIter); + void* pIter = taosHashIterate(pReq->info, NULL); while (pIter != NULL) { taosHashGetKey(pIter, &kv.key, (size_t *)&kv.keyLen); kv.valueLen = taosHashGetDataLen(pIter); diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 0c2524f48c..de101b0f06 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -346,19 +346,23 @@ typedef struct SMqTopicObj { char *logicalPlan; char *physicalPlan; SHashObj *cgroups; // SHashObj + SHashObj *consumers; // SHashObj } SMqTopicObj; // TODO: add cache and change name to id typedef struct SMqConsumerTopic { - char name[TSDB_TOPIC_NAME_LEN]; - SList *vgroups; // SList + int32_t epoch; + char name[TSDB_TOPIC_NAME_LEN]; + //TODO: replace with something with ep + SList *vgroups; // SList } SMqConsumerTopic; typedef struct SMqConsumerObj { - SRWLatch lock; int64_t consumerId; + SRWLatch lock; char cgroup[TSDB_CONSUMER_GROUP_LEN]; SArray *topics; // SArray + SHashObj *topicHash; } SMqConsumerObj; typedef struct SMqSubConsumerObj { diff --git a/source/dnode/mnode/impl/src/mndProfile.c b/source/dnode/mnode/impl/src/mndProfile.c index b458403dbf..902eaa5c1c 100644 --- a/source/dnode/mnode/impl/src/mndProfile.c +++ b/source/dnode/mnode/impl/src/mndProfile.c @@ -15,10 +15,13 @@ #define _DEFAULT_SOURCE #include "mndProfile.h" +#include "mndConsumer.h" #include "mndDb.h" #include "mndMnode.h" #include "mndShow.h" +#include "mndTopic.h" #include "mndUser.h" +#include "mndVgroup.h" #define QUERY_ID_SIZE 20 #define QUERY_OBJ_ID_SIZE 18 @@ -269,29 +272,95 @@ static int32_t mndSaveQueryStreamList(SConnObj *pConn, SHeartBeatReq *pReq) { return TSDB_CODE_SUCCESS; } +static SClientHbRsp* mndMqHbBuildRsp(SMnode* pMnode, SClientHbReq* pReq) { + SClientHbRsp* pRsp = malloc(sizeof(SClientHbRsp)); + if (pRsp == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + pRsp->connKey = pReq->connKey; + SMqHbBatchRsp batchRsp; + batchRsp.batchRsps = taosArrayInit(0, sizeof(SMqHbRsp)); + if (batchRsp.batchRsps == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + SClientHbKey connKey = pReq->connKey; + SHashObj* pObj = pReq->info; + SKv* pKv = taosHashGet(pObj, "mq-tmp", strlen("mq-tmp") + 1); + if (pKv == NULL) { + free(pRsp); + return NULL; + } + SMqHbMsg mqHb; + taosDecodeSMqMsg(pKv->value, &mqHb); + /*int64_t clientUid = htonl(pKv->value);*/ + /*if (mqHb.epoch )*/ + int sz = taosArrayGetSize(mqHb.pTopics); + SMqConsumerObj* pConsumer = mndAcquireConsumer(pMnode, mqHb.consumerId); + for (int i = 0; i < sz; i++) { + SMqHbOneTopicBatchRsp innerBatchRsp; + innerBatchRsp.rsps = taosArrayInit(sz, sizeof(SMqHbRsp)); + if (innerBatchRsp.rsps == NULL) { + //TODO + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + SMqHbTopicInfo* topicInfo = taosArrayGet(mqHb.pTopics, i); + SMqConsumerTopic* pConsumerTopic = taosHashGet(pConsumer->topicHash, topicInfo->name, strlen(topicInfo->name)+1); + if (pConsumerTopic->epoch != topicInfo->epoch) { + //add new vgids into rsp + int vgSz = taosArrayGetSize(topicInfo->pVgInfo); + for (int j = 0; j < vgSz; j++) { + SMqHbRsp innerRsp; + SMqHbVgInfo* pVgInfo = taosArrayGet(topicInfo->pVgInfo, i); + SVgObj* pVgObj = mndAcquireVgroup(pMnode, pVgInfo->vgId); + innerRsp.epSet = mndGetVgroupEpset(pMnode, pVgObj); + taosArrayPush(innerBatchRsp.rsps, &innerRsp); + } + } + taosArrayPush(batchRsp.batchRsps, &innerBatchRsp); + } + int32_t tlen = taosEncodeSMqHbBatchRsp(NULL, &batchRsp); + void* buf = malloc(tlen); + if (buf == NULL) { + //TODO + return NULL; + } + void* abuf = buf; + taosEncodeSMqHbBatchRsp(&abuf, &batchRsp); + pRsp->body = buf; + pRsp->bodyLen = tlen; + return pRsp; +} + static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) { - SMnode *pMnode = pReq->pMnode; - char *batchReqStr = pReq->rpcMsg.pCont; + SMnode *pMnode = pReq->pMnode; + char *batchReqStr = pReq->rpcMsg.pCont; SClientHbBatchReq batchReq = {0}; tDeserializeSClientHbBatchReq(batchReqStr, &batchReq); SArray *pArray = batchReq.reqs; - int sz = taosArrayGetSize(pArray); + int sz = taosArrayGetSize(pArray); SClientHbBatchRsp batchRsp = {0}; batchRsp.rsps = taosArrayInit(0, sizeof(SClientHbRsp)); for (int i = 0; i < sz; i++) { - SClientHbReq *pHbReq = taosArrayGet(pArray, i); + SClientHbReq* pHbReq = taosArrayGet(pArray, i); if (pHbReq->connKey.hbType == HEARTBEAT_TYPE_QUERY) { + } else if (pHbReq->connKey.hbType == HEARTBEAT_TYPE_MQ) { - SClientHbRsp rsp = {.status = 0, .connKey = pHbReq->connKey, .bodyLen = 0, .body = NULL}; - taosArrayPush(batchRsp.rsps, &rsp); + SClientHbRsp *pRsp = mndMqHbBuildRsp(pMnode, pHbReq); + if (pRsp != NULL) { + taosArrayPush(batchRsp.rsps, pRsp); + free(pRsp); + } } } int32_t tlen = tSerializeSClientHbBatchRsp(NULL, &batchRsp); - void *buf = rpcMallocCont(tlen); - void *bufCopy = buf; - tSerializeSClientHbBatchRsp(&bufCopy, &batchRsp); + void* buf = rpcMallocCont(tlen); + void* abuf = buf; + tSerializeSClientHbBatchRsp(&abuf, &batchRsp); pReq->contLen = tlen; pReq->pCont = buf; return 0; diff --git a/source/dnode/mnode/impl/test/profile/profile.cpp b/source/dnode/mnode/impl/test/profile/profile.cpp index 4b329886eb..4ad979cdd3 100644 --- a/source/dnode/mnode/impl/test/profile/profile.cpp +++ b/source/dnode/mnode/impl/test/profile/profile.cpp @@ -121,11 +121,11 @@ TEST_F(MndTestProfile, 04_HeartBeatMsg) { SClientHbBatchRsp rsp = {0}; tDeserializeSClientHbBatchRsp(pRspChar, &rsp); int sz = taosArrayGetSize(rsp.rsps); - ASSERT_EQ(sz, 1); - SClientHbRsp* pRsp = (SClientHbRsp*) taosArrayGet(rsp.rsps, 0); - EXPECT_EQ(pRsp->connKey.connId, 123); - EXPECT_EQ(pRsp->connKey.hbType, HEARTBEAT_TYPE_MQ); - EXPECT_EQ(pRsp->status, 0); + ASSERT_EQ(sz, 0); + //SClientHbRsp* pRsp = (SClientHbRsp*) taosArrayGet(rsp.rsps, 0); + //EXPECT_EQ(pRsp->connKey.connId, 123); + //EXPECT_EQ(pRsp->connKey.hbType, HEARTBEAT_TYPE_MQ); + //EXPECT_EQ(pRsp->status, 0); #if 0 int32_t contLen = sizeof(SHeartBeatReq);