Merge pull request #9862 from taosdata/feature/tq

fix hb crash
This commit is contained in:
Liu Jicong 2022-01-17 20:12:50 +08:00 committed by GitHub
commit 7135af5c69
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 258 additions and 33 deletions

View File

@ -234,6 +234,92 @@ static FORCE_INLINE void* taosDecodeSClientHbKey(void* buf, SClientHbKey* pKey)
return buf; return buf;
} }
typedef struct SMqHbVgInfo {
int32_t vgId;
} SMqHbVgInfo;
static FORCE_INLINE int taosEncodeSMqVgInfo(void** buf, const SMqHbVgInfo* pVgInfo) {
int tlen = 0;
tlen += taosEncodeFixedI32(buf, pVgInfo->vgId);
return tlen;
}
static FORCE_INLINE void* taosDecodeSMqVgInfo(void* buf, SMqHbVgInfo* pVgInfo) {
buf = taosDecodeFixedI32(buf, &pVgInfo->vgId);
return buf;
}
typedef struct SMqHbTopicInfo {
int32_t epoch;
int64_t topicUid;
char name[TSDB_TOPIC_FNAME_LEN];
SArray* pVgInfo;
} SMqHbTopicInfo;
static FORCE_INLINE int taosEncodeSMqHbTopicInfoMsg(void** buf, const SMqHbTopicInfo* pTopicInfo) {
int tlen = 0;
tlen += taosEncodeFixedI32(buf, pTopicInfo->epoch);
tlen += taosEncodeFixedI64(buf, pTopicInfo->topicUid);
tlen += taosEncodeString(buf, pTopicInfo->name);
int32_t sz = taosArrayGetSize(pTopicInfo->pVgInfo);
tlen += taosEncodeFixedI32(buf, sz);
for (int32_t i = 0; i < sz; i++) {
SMqHbVgInfo* pVgInfo = (SMqHbVgInfo*)taosArrayGet(pTopicInfo->pVgInfo, i);
tlen += taosEncodeSMqVgInfo(buf, pVgInfo);
}
return tlen;
}
static FORCE_INLINE void* taosDecodeSMqHbTopicInfoMsg(void* buf, SMqHbTopicInfo* pTopicInfo) {
buf = taosDecodeFixedI32(buf, &pTopicInfo->epoch);
buf = taosDecodeFixedI64(buf, &pTopicInfo->topicUid);
buf = taosDecodeStringTo(buf, pTopicInfo->name);
int32_t sz;
buf = taosDecodeFixedI32(buf, &sz);
pTopicInfo->pVgInfo = taosArrayInit(sz, sizeof(SMqHbVgInfo));
for (int32_t i = 0; i < sz; i++) {
SMqHbVgInfo vgInfo;
buf = taosDecodeSMqVgInfo(buf, &vgInfo);
taosArrayPush(pTopicInfo->pVgInfo, &vgInfo);
}
return buf;
}
typedef struct SMqHbMsg {
int32_t status; // ask hb endpoint
int32_t epoch;
int64_t consumerId;
SArray* pTopics; // SArray<SMqHbTopicInfo>
} SMqHbMsg;
static FORCE_INLINE int taosEncodeSMqMsg(void** buf, const SMqHbMsg* pMsg) {
int tlen = 0;
tlen += taosEncodeFixedI32(buf, pMsg->status);
tlen += taosEncodeFixedI32(buf, pMsg->epoch);
tlen += taosEncodeFixedI64(buf, pMsg->consumerId);
int32_t sz = taosArrayGetSize(pMsg->pTopics);
tlen += taosEncodeFixedI32(buf, sz);
for (int i = 0; i < sz; i++) {
SMqHbTopicInfo* topicInfo = (SMqHbTopicInfo*)taosArrayGet(pMsg->pTopics, i);
tlen += taosEncodeSMqHbTopicInfoMsg(buf, topicInfo);
}
return tlen;
}
static FORCE_INLINE void* taosDecodeSMqMsg(void* buf, SMqHbMsg* pMsg) {
buf = taosDecodeFixedI32(buf, &pMsg->status);
buf = taosDecodeFixedI32(buf, &pMsg->epoch);
buf = taosDecodeFixedI64(buf, &pMsg->consumerId);
int32_t sz;
buf = taosDecodeFixedI32(buf, &sz);
pMsg->pTopics = taosArrayInit(sz, sizeof(SMqHbTopicInfo));
for (int i = 0; i < sz; i++) {
SMqHbTopicInfo topicInfo;
buf = taosDecodeSMqHbTopicInfoMsg(buf, &topicInfo);
taosArrayPush(pMsg->pTopics, &topicInfo);
}
return buf;
}
typedef struct { typedef struct {
int32_t vgId; int32_t vgId;
@ -399,6 +485,66 @@ static FORCE_INLINE void* taosDecodeSMqHbRsp(void* buf, SMqHbRsp* pRsp) {
return buf; return buf;
} }
typedef struct SMqHbOneTopicBatchRsp {
char topicName[TSDB_TOPIC_FNAME_LEN];
SArray* rsps; // SArray<SMqHbRsp>
} SMqHbOneTopicBatchRsp;
static FORCE_INLINE int taosEncodeSMqHbOneTopicBatchRsp(void** buf, const SMqHbOneTopicBatchRsp* pBatchRsp) {
int tlen = 0;
tlen += taosEncodeString(buf, pBatchRsp->topicName);
int32_t sz = taosArrayGetSize(pBatchRsp->rsps);
tlen += taosEncodeFixedI32(buf, sz);
for (int32_t i = 0; i < sz; i++) {
SMqHbRsp* pRsp = (SMqHbRsp*)taosArrayGet(pBatchRsp->rsps, i);
tlen += taosEncodeSMqHbRsp(buf, pRsp);
}
return tlen;
}
static FORCE_INLINE void* taosDecodeSMqHbOneTopicBatchRsp(void* buf, SMqHbOneTopicBatchRsp* pBatchRsp) {
int32_t sz;
buf = taosDecodeStringTo(buf, pBatchRsp->topicName);
buf = taosDecodeFixedI32(buf, &sz);
pBatchRsp->rsps = taosArrayInit(sz, sizeof(SMqHbRsp));
for (int32_t i = 0; i < sz; i++) {
SMqHbRsp rsp;
buf = taosDecodeSMqHbRsp(buf, &rsp);
buf = taosArrayPush(pBatchRsp->rsps, &rsp);
}
return buf;
}
typedef struct SMqHbBatchRsp {
int64_t consumerId;
SArray* batchRsps; // SArray<SMqHbOneTopicBatchRsp>
} SMqHbBatchRsp;
static FORCE_INLINE int taosEncodeSMqHbBatchRsp(void** buf, const SMqHbBatchRsp* pBatchRsp) {
int tlen = 0;
tlen += taosEncodeFixedI64(buf, pBatchRsp->consumerId);
int32_t sz;
tlen += taosEncodeFixedI32(buf, sz);
for (int32_t i = 0; i < sz; i++) {
SMqHbOneTopicBatchRsp* pRsp = (SMqHbOneTopicBatchRsp*) taosArrayGet(pBatchRsp->batchRsps, i);
tlen += taosEncodeSMqHbOneTopicBatchRsp(buf, pRsp);
}
return tlen;
}
static FORCE_INLINE void* taosDecodeSMqHbBatchRsp(void* buf, SMqHbBatchRsp* pBatchRsp) {
buf = taosDecodeFixedI64(buf, &pBatchRsp->consumerId);
int32_t sz;
buf = taosDecodeFixedI32(buf, &sz);
pBatchRsp->batchRsps = taosArrayInit(sz, sizeof(SMqHbOneTopicBatchRsp));
for (int32_t i = 0; i < sz; i++) {
SMqHbOneTopicBatchRsp rsp;
buf = taosDecodeSMqHbOneTopicBatchRsp(buf, &rsp);
buf = taosArrayPush(pBatchRsp->batchRsps, &rsp);
}
return buf;
}
typedef struct { typedef struct {
int32_t acctId; int32_t acctId;
int64_t clusterId; int64_t clusterId;

View File

@ -322,24 +322,27 @@ SArray* tmqGetConnInfo(SClientHbKey connKey, void* param) {
taosArrayDestroy(pArray); taosArrayDestroy(pArray);
return NULL; return NULL;
} }
strcpy(kv.key, "groupId"); strcpy(kv.key, "mq-tmp");
kv.keyLen = strlen("groupId") + 1; kv.keyLen = strlen("mq-tmp") + 1;
kv.value = malloc(256); SMqHbMsg* pMqHb = malloc(sizeof(SMqHbMsg));
if (kv.value == NULL) { if (pMqHb == NULL) {
free(kv.key); return pArray;
taosArrayDestroy(pArray);
return NULL;
} }
strcpy(kv.value, pTmq->groupId); pMqHb->consumerId = connKey.connId;
kv.valueLen = strlen(pTmq->groupId) + 1; SArray* clientTopics = pTmq->clientTopics;
int sz = taosArrayGetSize(clientTopics);
for (int i = 0; i < sz; i++) {
SMqClientTopic* pCTopic = taosArrayGet(clientTopics, i);
if (pCTopic->vgId == -1) {
pMqHb->status = 1;
break;
}
}
kv.value = pMqHb;
kv.valueLen = sizeof(SMqHbMsg);
taosArrayPush(pArray, &kv); taosArrayPush(pArray, &kv);
strcpy(kv.key, "clientUid");
kv.keyLen = strlen("clientUid") + 1;
*(uint32_t*)kv.value = pTmq->pTscObj->connId;
kv.valueLen = sizeof(uint32_t);
return NULL; return pArray;
} }
tmq_t* tmqCreateConsumerImpl(TAOS* conn, tmq_conf_t* conf) { tmq_t* tmqCreateConsumerImpl(TAOS* conn, tmq_conf_t* conf) {

View File

@ -277,12 +277,15 @@ TEST(testCase, connect_Test) {
// ASSERT_EQ(numOfFields, 0); // ASSERT_EQ(numOfFields, 0);
// //
// taos_free_result(pRes); // taos_free_result(pRes);
// taos_close(pConn);
//}
// //
// pRes = taos_query(pConn, "create stable if not exists abc1.`123_$^)` (ts timestamp, `abc` int) tags(a int)"); // pRes = taos_query(pConn, "create stable if not exists abc1.`123_$^)` (ts timestamp, `abc` int) tags(a int)");
// if (taos_errno(pRes) != 0) { // if (taos_errno(pRes) != 0) {
// printf("failed to create super table 123_$^), reason:%s\n", taos_errstr(pRes)); // printf("failed to create super table 123_$^), reason:%s\n", taos_errstr(pRes));
// } // }
// //
// TAOS_RES* pRes = taos_query(pConn, "use abc1");
// taos_free_result(pRes); // taos_free_result(pRes);
// pRes = taos_query(pConn, "drop stable `123_$^)`"); // pRes = taos_query(pConn, "drop stable `123_$^)`");
// if (taos_errno(pRes) != 0) { // if (taos_errno(pRes) != 0) {

View File

@ -92,7 +92,7 @@ int tSerializeSClientHbReq(void **buf, const SClientHbReq *pReq) {
int32_t kvNum = taosHashGetSize(pReq->info); int32_t kvNum = taosHashGetSize(pReq->info);
tlen += taosEncodeFixedI32(buf, kvNum); tlen += taosEncodeFixedI32(buf, kvNum);
SKv kv; SKv kv;
void* pIter = taosHashIterate(pReq->info, pIter); void* pIter = taosHashIterate(pReq->info, NULL);
while (pIter != NULL) { while (pIter != NULL) {
taosHashGetKey(pIter, &kv.key, (size_t *)&kv.keyLen); taosHashGetKey(pIter, &kv.key, (size_t *)&kv.keyLen);
kv.valueLen = taosHashGetDataLen(pIter); kv.valueLen = taosHashGetDataLen(pIter);

View File

@ -346,19 +346,23 @@ typedef struct SMqTopicObj {
char *logicalPlan; char *logicalPlan;
char *physicalPlan; char *physicalPlan;
SHashObj *cgroups; // SHashObj<SMqCGroup> SHashObj *cgroups; // SHashObj<SMqCGroup>
SHashObj *consumers; // SHashObj<SMqConsumerObj>
} SMqTopicObj; } SMqTopicObj;
// TODO: add cache and change name to id // TODO: add cache and change name to id
typedef struct SMqConsumerTopic { typedef struct SMqConsumerTopic {
char name[TSDB_TOPIC_NAME_LEN]; int32_t epoch;
SList *vgroups; // SList<int32_t> char name[TSDB_TOPIC_NAME_LEN];
//TODO: replace with something with ep
SList *vgroups; // SList<int32_t>
} SMqConsumerTopic; } SMqConsumerTopic;
typedef struct SMqConsumerObj { typedef struct SMqConsumerObj {
SRWLatch lock;
int64_t consumerId; int64_t consumerId;
SRWLatch lock;
char cgroup[TSDB_CONSUMER_GROUP_LEN]; char cgroup[TSDB_CONSUMER_GROUP_LEN];
SArray *topics; // SArray<SMqConsumerTopic> SArray *topics; // SArray<SMqConsumerTopic>
SHashObj *topicHash;
} SMqConsumerObj; } SMqConsumerObj;
typedef struct SMqSubConsumerObj { typedef struct SMqSubConsumerObj {

View File

@ -15,10 +15,13 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "mndProfile.h" #include "mndProfile.h"
#include "mndConsumer.h"
#include "mndDb.h" #include "mndDb.h"
#include "mndMnode.h" #include "mndMnode.h"
#include "mndShow.h" #include "mndShow.h"
#include "mndTopic.h"
#include "mndUser.h" #include "mndUser.h"
#include "mndVgroup.h"
#define QUERY_ID_SIZE 20 #define QUERY_ID_SIZE 20
#define QUERY_OBJ_ID_SIZE 18 #define QUERY_OBJ_ID_SIZE 18
@ -269,29 +272,95 @@ static int32_t mndSaveQueryStreamList(SConnObj *pConn, SHeartBeatReq *pReq) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static SClientHbRsp* mndMqHbBuildRsp(SMnode* pMnode, SClientHbReq* pReq) {
SClientHbRsp* pRsp = malloc(sizeof(SClientHbRsp));
if (pRsp == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
pRsp->connKey = pReq->connKey;
SMqHbBatchRsp batchRsp;
batchRsp.batchRsps = taosArrayInit(0, sizeof(SMqHbRsp));
if (batchRsp.batchRsps == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
SClientHbKey connKey = pReq->connKey;
SHashObj* pObj = pReq->info;
SKv* pKv = taosHashGet(pObj, "mq-tmp", strlen("mq-tmp") + 1);
if (pKv == NULL) {
free(pRsp);
return NULL;
}
SMqHbMsg mqHb;
taosDecodeSMqMsg(pKv->value, &mqHb);
/*int64_t clientUid = htonl(pKv->value);*/
/*if (mqHb.epoch )*/
int sz = taosArrayGetSize(mqHb.pTopics);
SMqConsumerObj* pConsumer = mndAcquireConsumer(pMnode, mqHb.consumerId);
for (int i = 0; i < sz; i++) {
SMqHbOneTopicBatchRsp innerBatchRsp;
innerBatchRsp.rsps = taosArrayInit(sz, sizeof(SMqHbRsp));
if (innerBatchRsp.rsps == NULL) {
//TODO
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
SMqHbTopicInfo* topicInfo = taosArrayGet(mqHb.pTopics, i);
SMqConsumerTopic* pConsumerTopic = taosHashGet(pConsumer->topicHash, topicInfo->name, strlen(topicInfo->name)+1);
if (pConsumerTopic->epoch != topicInfo->epoch) {
//add new vgids into rsp
int vgSz = taosArrayGetSize(topicInfo->pVgInfo);
for (int j = 0; j < vgSz; j++) {
SMqHbRsp innerRsp;
SMqHbVgInfo* pVgInfo = taosArrayGet(topicInfo->pVgInfo, i);
SVgObj* pVgObj = mndAcquireVgroup(pMnode, pVgInfo->vgId);
innerRsp.epSet = mndGetVgroupEpset(pMnode, pVgObj);
taosArrayPush(innerBatchRsp.rsps, &innerRsp);
}
}
taosArrayPush(batchRsp.batchRsps, &innerBatchRsp);
}
int32_t tlen = taosEncodeSMqHbBatchRsp(NULL, &batchRsp);
void* buf = malloc(tlen);
if (buf == NULL) {
//TODO
return NULL;
}
void* abuf = buf;
taosEncodeSMqHbBatchRsp(&abuf, &batchRsp);
pRsp->body = buf;
pRsp->bodyLen = tlen;
return pRsp;
}
static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) { static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) {
SMnode *pMnode = pReq->pMnode; SMnode *pMnode = pReq->pMnode;
char *batchReqStr = pReq->rpcMsg.pCont; char *batchReqStr = pReq->rpcMsg.pCont;
SClientHbBatchReq batchReq = {0}; SClientHbBatchReq batchReq = {0};
tDeserializeSClientHbBatchReq(batchReqStr, &batchReq); tDeserializeSClientHbBatchReq(batchReqStr, &batchReq);
SArray *pArray = batchReq.reqs; SArray *pArray = batchReq.reqs;
int sz = taosArrayGetSize(pArray); int sz = taosArrayGetSize(pArray);
SClientHbBatchRsp batchRsp = {0}; SClientHbBatchRsp batchRsp = {0};
batchRsp.rsps = taosArrayInit(0, sizeof(SClientHbRsp)); batchRsp.rsps = taosArrayInit(0, sizeof(SClientHbRsp));
for (int i = 0; i < sz; i++) { for (int i = 0; i < sz; i++) {
SClientHbReq *pHbReq = taosArrayGet(pArray, i); SClientHbReq* pHbReq = taosArrayGet(pArray, i);
if (pHbReq->connKey.hbType == HEARTBEAT_TYPE_QUERY) { if (pHbReq->connKey.hbType == HEARTBEAT_TYPE_QUERY) {
} else if (pHbReq->connKey.hbType == HEARTBEAT_TYPE_MQ) { } else if (pHbReq->connKey.hbType == HEARTBEAT_TYPE_MQ) {
SClientHbRsp rsp = {.status = 0, .connKey = pHbReq->connKey, .bodyLen = 0, .body = NULL}; SClientHbRsp *pRsp = mndMqHbBuildRsp(pMnode, pHbReq);
taosArrayPush(batchRsp.rsps, &rsp); if (pRsp != NULL) {
taosArrayPush(batchRsp.rsps, pRsp);
free(pRsp);
}
} }
} }
int32_t tlen = tSerializeSClientHbBatchRsp(NULL, &batchRsp); int32_t tlen = tSerializeSClientHbBatchRsp(NULL, &batchRsp);
void *buf = rpcMallocCont(tlen); void* buf = rpcMallocCont(tlen);
void *bufCopy = buf; void* abuf = buf;
tSerializeSClientHbBatchRsp(&bufCopy, &batchRsp); tSerializeSClientHbBatchRsp(&abuf, &batchRsp);
pReq->contLen = tlen; pReq->contLen = tlen;
pReq->pCont = buf; pReq->pCont = buf;
return 0; return 0;

View File

@ -121,11 +121,11 @@ TEST_F(MndTestProfile, 04_HeartBeatMsg) {
SClientHbBatchRsp rsp = {0}; SClientHbBatchRsp rsp = {0};
tDeserializeSClientHbBatchRsp(pRspChar, &rsp); tDeserializeSClientHbBatchRsp(pRspChar, &rsp);
int sz = taosArrayGetSize(rsp.rsps); int sz = taosArrayGetSize(rsp.rsps);
ASSERT_EQ(sz, 1); ASSERT_EQ(sz, 0);
SClientHbRsp* pRsp = (SClientHbRsp*) taosArrayGet(rsp.rsps, 0); //SClientHbRsp* pRsp = (SClientHbRsp*) taosArrayGet(rsp.rsps, 0);
EXPECT_EQ(pRsp->connKey.connId, 123); //EXPECT_EQ(pRsp->connKey.connId, 123);
EXPECT_EQ(pRsp->connKey.hbType, HEARTBEAT_TYPE_MQ); //EXPECT_EQ(pRsp->connKey.hbType, HEARTBEAT_TYPE_MQ);
EXPECT_EQ(pRsp->status, 0); //EXPECT_EQ(pRsp->status, 0);
#if 0 #if 0
int32_t contLen = sizeof(SHeartBeatReq); int32_t contLen = sizeof(SHeartBeatReq);