add heartbeat framework
This commit is contained in:
parent
f7a4eb00a8
commit
8a437a89dc
|
@ -1126,7 +1126,7 @@ typedef struct {
|
||||||
int32_t topicNum;
|
int32_t topicNum;
|
||||||
int64_t consumerId;
|
int64_t consumerId;
|
||||||
char* consumerGroup;
|
char* consumerGroup;
|
||||||
char* topicName[];
|
SArray* topicNames; // SArray<char*>
|
||||||
} SCMSubscribeReq;
|
} SCMSubscribeReq;
|
||||||
|
|
||||||
static FORCE_INLINE int tSerializeSCMSubscribeReq(void** buf, const SCMSubscribeReq* pReq) {
|
static FORCE_INLINE int tSerializeSCMSubscribeReq(void** buf, const SCMSubscribeReq* pReq) {
|
||||||
|
@ -1134,8 +1134,9 @@ static FORCE_INLINE int tSerializeSCMSubscribeReq(void** buf, const SCMSubscribe
|
||||||
tlen += taosEncodeFixedI32(buf, pReq->topicNum);
|
tlen += taosEncodeFixedI32(buf, pReq->topicNum);
|
||||||
tlen += taosEncodeFixedI64(buf, pReq->consumerId);
|
tlen += taosEncodeFixedI64(buf, pReq->consumerId);
|
||||||
tlen += taosEncodeString(buf, pReq->consumerGroup);
|
tlen += taosEncodeString(buf, pReq->consumerGroup);
|
||||||
|
|
||||||
for(int i = 0; i < pReq->topicNum; i++) {
|
for(int i = 0; i < pReq->topicNum; i++) {
|
||||||
tlen += taosEncodeString(buf, pReq->topicName[i]);
|
tlen += taosEncodeString(buf, (char*)taosArrayGetP(pReq->topicNames, i));
|
||||||
}
|
}
|
||||||
return tlen;
|
return tlen;
|
||||||
}
|
}
|
||||||
|
@ -1144,8 +1145,11 @@ static FORCE_INLINE void* tDeserializeSCMSubscribeReq(void* buf, SCMSubscribeReq
|
||||||
buf = taosDecodeFixedI32(buf, &pReq->topicNum);
|
buf = taosDecodeFixedI32(buf, &pReq->topicNum);
|
||||||
buf = taosDecodeFixedI64(buf, &pReq->consumerId);
|
buf = taosDecodeFixedI64(buf, &pReq->consumerId);
|
||||||
buf = taosDecodeString(buf, &pReq->consumerGroup);
|
buf = taosDecodeString(buf, &pReq->consumerGroup);
|
||||||
|
pReq->topicNames = taosArrayInit(pReq->topicNum, sizeof(void*));
|
||||||
for(int i = 0; i < pReq->topicNum; i++) {
|
for(int i = 0; i < pReq->topicNum; i++) {
|
||||||
buf = taosDecodeString(buf, &pReq->topicName[i]);
|
char* name = NULL;
|
||||||
|
buf = taosDecodeString(buf, &name);
|
||||||
|
taosArrayPush(pReq->topicNames, &name);
|
||||||
}
|
}
|
||||||
return buf;
|
return buf;
|
||||||
}
|
}
|
||||||
|
|
|
@ -370,6 +370,29 @@ static FORCE_INLINE void *taosDecodeStringTo(void *buf, char *value) {
|
||||||
return POINTER_SHIFT(buf, size);
|
return POINTER_SHIFT(buf, size);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ---- binary
|
||||||
|
static FORCE_INLINE int taosEncodeBinary(void **buf, const void *value, int valueLen) {
|
||||||
|
int tlen = 0;
|
||||||
|
|
||||||
|
if (buf != NULL) {
|
||||||
|
memcpy(*buf, value, valueLen);
|
||||||
|
*buf = POINTER_SHIFT(*buf, valueLen);
|
||||||
|
}
|
||||||
|
tlen += (int)valueLen;
|
||||||
|
|
||||||
|
return tlen;
|
||||||
|
}
|
||||||
|
|
||||||
|
static FORCE_INLINE void *taosDecodeBinary(void *buf, void **value, int valueLen) {
|
||||||
|
uint64_t size = 0;
|
||||||
|
|
||||||
|
*value = malloc((size_t)valueLen);
|
||||||
|
if (*value == NULL) return NULL;
|
||||||
|
memcpy(*value, buf, (size_t)size);
|
||||||
|
|
||||||
|
return POINTER_SHIFT(buf, size);
|
||||||
|
}
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
|
|
|
@ -210,6 +210,14 @@ void taosHashCancelIterate(SHashObj *pHashObj, void *p);
|
||||||
*/
|
*/
|
||||||
int32_t taosHashGetKey(void *data, void** key, size_t* keyLen);
|
int32_t taosHashGetKey(void *data, void** key, size_t* keyLen);
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the corresponding data length for a given data in hash table
|
||||||
|
* @param data
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
int32_t taosHashGetDataLen(void *data);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* return the payload data with the specified key(reference number added)
|
* return the payload data with the specified key(reference number added)
|
||||||
*
|
*
|
||||||
|
|
|
@ -0,0 +1,148 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "os.h"
|
||||||
|
#include "tarray.h"
|
||||||
|
#include "thash.h"
|
||||||
|
#include "tmsg.h"
|
||||||
|
|
||||||
|
typedef enum {
|
||||||
|
mq = 0,
|
||||||
|
HEARTBEAT_TYPE_MAX
|
||||||
|
} EHbType;
|
||||||
|
|
||||||
|
typedef struct SKlv {
|
||||||
|
int32_t keyLen;
|
||||||
|
int32_t valueLen;
|
||||||
|
void* key;
|
||||||
|
void* value;
|
||||||
|
} SKlv;
|
||||||
|
|
||||||
|
static FORCE_INLINE int taosEncodeSKlv(void** buf, const SKlv* pKlv) {
|
||||||
|
int tlen = 0;
|
||||||
|
tlen += taosEncodeFixedI32(buf, pKlv->keyLen);
|
||||||
|
tlen += taosEncodeFixedI32(buf, pKlv->valueLen);
|
||||||
|
tlen += taosEncodeBinary(buf, pKlv->key, pKlv->keyLen);
|
||||||
|
tlen += taosEncodeBinary(buf, pKlv->value, pKlv->valueLen);
|
||||||
|
return tlen;
|
||||||
|
}
|
||||||
|
|
||||||
|
static FORCE_INLINE void* taosDecodeSKlv(void* buf, SKlv* pKlv) {
|
||||||
|
buf = taosDecodeFixedI32(buf, &pKlv->keyLen);
|
||||||
|
buf = taosDecodeFixedI32(buf, &pKlv->valueLen);
|
||||||
|
buf = taosDecodeBinary(buf, &pKlv->key, pKlv->keyLen);
|
||||||
|
buf = taosDecodeBinary(buf, &pKlv->value, pKlv->valueLen);
|
||||||
|
return buf;
|
||||||
|
}
|
||||||
|
|
||||||
|
typedef struct SClientHbKey {
|
||||||
|
int32_t connId;
|
||||||
|
int32_t hbType;
|
||||||
|
} SClientHbKey;
|
||||||
|
|
||||||
|
static FORCE_INLINE int taosEncodeSClientHbKey(void** buf, const SClientHbKey* pKey) {
|
||||||
|
int tlen = 0;
|
||||||
|
tlen += taosEncodeFixedI32(buf, pKey->connId);
|
||||||
|
tlen += taosEncodeFixedI32(buf, pKey->hbType);
|
||||||
|
return tlen;
|
||||||
|
}
|
||||||
|
|
||||||
|
static FORCE_INLINE void* taosDecodeSClientHbKey(void* buf, SClientHbKey* pKey) {
|
||||||
|
buf = taosDecodeFixedI32(buf, &pKey->connId);
|
||||||
|
buf = taosDecodeFixedI32(buf, &pKey->hbType);
|
||||||
|
return buf;
|
||||||
|
}
|
||||||
|
|
||||||
|
typedef struct SClientHbReq {
|
||||||
|
SClientHbKey hbKey;
|
||||||
|
SHashObj* info; // hash<Sklv>
|
||||||
|
} SClientHbReq;
|
||||||
|
|
||||||
|
static FORCE_INLINE int tSerializeSClientHbReq(void** buf, const SClientHbReq* pReq) {
|
||||||
|
int tlen = 0;
|
||||||
|
tlen += taosEncodeSClientHbKey(buf, &pReq->hbKey);
|
||||||
|
|
||||||
|
void* pIter = NULL;
|
||||||
|
void* data;
|
||||||
|
SKlv klv;
|
||||||
|
data = taosHashIterate(pReq->info, pIter);
|
||||||
|
while (data != NULL) {
|
||||||
|
taosHashGetKey(data, &klv.key, (size_t*)&klv.keyLen);
|
||||||
|
klv.valueLen = taosHashGetDataLen(data);
|
||||||
|
klv.value = data;
|
||||||
|
taosEncodeSKlv(buf, &klv);
|
||||||
|
|
||||||
|
data = taosHashIterate(pReq->info, pIter);
|
||||||
|
}
|
||||||
|
return tlen;
|
||||||
|
}
|
||||||
|
|
||||||
|
static FORCE_INLINE void* tDeserializeClientHbReq(void* buf, SClientHbReq* pReq) {
|
||||||
|
ASSERT(pReq->info != NULL);
|
||||||
|
buf = taosDecodeSClientHbKey(buf, &pReq->hbKey);
|
||||||
|
|
||||||
|
//TODO: error handling
|
||||||
|
if(pReq->info == NULL) {
|
||||||
|
pReq->info = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
||||||
|
}
|
||||||
|
SKlv klv;
|
||||||
|
buf = taosDecodeSKlv(buf, &klv);
|
||||||
|
taosHashPut(pReq->info, klv.key, klv.keyLen, klv.value, klv.valueLen);
|
||||||
|
|
||||||
|
return buf;
|
||||||
|
}
|
||||||
|
|
||||||
|
typedef struct SClientHbBatchReq {
|
||||||
|
int64_t reqId;
|
||||||
|
SArray* reqs; // SArray<SClientHbReq>
|
||||||
|
} SClientHbBatchReq;
|
||||||
|
|
||||||
|
typedef struct SClientHbHandleResult {
|
||||||
|
} SClientHbHandleResult;
|
||||||
|
|
||||||
|
typedef struct SClientHbRsp {
|
||||||
|
int32_t connId;
|
||||||
|
int32_t hbType;
|
||||||
|
} SClientHbRsp;
|
||||||
|
|
||||||
|
typedef struct SClientHbBatchRsp {
|
||||||
|
int64_t reqId;
|
||||||
|
int64_t rspId;
|
||||||
|
SArray* rsps; // SArray<SClientHbRsp>
|
||||||
|
} SClientHbBatchRsp;
|
||||||
|
|
||||||
|
typedef int32_t (*FHbRspHandle)(SClientHbReq* pReq);
|
||||||
|
typedef int32_t (*FGetConnInfo)(int32_t conn, void* self);
|
||||||
|
|
||||||
|
typedef struct SClientHbMgr {
|
||||||
|
int8_t inited;
|
||||||
|
int32_t reportInterval; // unit ms
|
||||||
|
int32_t stats;
|
||||||
|
SRWLatch lock;
|
||||||
|
SHashObj* info; //hash<SClientHbKey, SClientHbReq>
|
||||||
|
FHbRspHandle handle[HEARTBEAT_TYPE_MAX];
|
||||||
|
// input queue
|
||||||
|
} SClientHbMgr;
|
||||||
|
|
||||||
|
static SClientHbMgr clientHbMgr = {0};
|
||||||
|
|
||||||
|
int hbMgrInit();
|
||||||
|
void hbMgrCleanUp();
|
||||||
|
|
||||||
|
int registerConn(int32_t connId, FGetConnInfo func, FHbRspHandle rspHandle);
|
||||||
|
|
||||||
|
int registerHbRspHandle(int32_t connId, int32_t hbType, FHbRspHandle rspHandle);
|
||||||
|
|
||||||
|
int HbAddConnInfo(int32_t connId, void* key, void* value, int32_t keyLen, int32_t valueLen);
|
|
@ -0,0 +1,60 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "clientHb.h"
|
||||||
|
|
||||||
|
static int32_t mqHbRspHandle(SClientHbReq* pReq) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int hbMgrInit() {
|
||||||
|
//init once
|
||||||
|
//
|
||||||
|
//init lock
|
||||||
|
//
|
||||||
|
//init handle funcs
|
||||||
|
clientHbMgr.handle[mq] = mqHbRspHandle;
|
||||||
|
|
||||||
|
//init stat
|
||||||
|
clientHbMgr.stats = 0;
|
||||||
|
|
||||||
|
//init config
|
||||||
|
clientHbMgr.reportInterval = 1500;
|
||||||
|
|
||||||
|
//init hash info
|
||||||
|
//
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
void hbMgrCleanUp() {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
int registerConn(int32_t connId, FGetConnInfo func, FHbRspHandle rspHandle) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int registerHbRspHandle(int32_t connId, int32_t hbType, FHbRspHandle rspHandle) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int HbAddConnInfo(int32_t connId, void* key, void* value, int32_t keyLen, int32_t valueLen) {
|
||||||
|
//lock
|
||||||
|
|
||||||
|
//find req by connection id
|
||||||
|
|
||||||
|
//unlock
|
||||||
|
return 0;
|
||||||
|
}
|
|
@ -155,7 +155,6 @@ static SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) {
|
||||||
SDB_GET_INT32(pRaw, dataPos, &vgSize, CONSUME_DECODE_OVER);
|
SDB_GET_INT32(pRaw, dataPos, &vgSize, CONSUME_DECODE_OVER);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
CONSUME_DECODE_OVER:
|
CONSUME_DECODE_OVER:
|
||||||
if (terrno != 0) {
|
if (terrno != 0) {
|
||||||
mError("consumer:%ld, failed to decode from raw:%p since %s", pConsumer->consumerId, pRaw, terrstr());
|
mError("consumer:%ld, failed to decode from raw:%p since %s", pConsumer->consumerId, pRaw, terrstr());
|
||||||
|
@ -209,6 +208,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
|
||||||
tDeserializeSCMSubscribeReq(msgStr, pSubscribe);
|
tDeserializeSCMSubscribeReq(msgStr, pSubscribe);
|
||||||
int64_t consumerId = pSubscribe->consumerId;
|
int64_t consumerId = pSubscribe->consumerId;
|
||||||
char *consumerGroup = pSubscribe->consumerGroup;
|
char *consumerGroup = pSubscribe->consumerGroup;
|
||||||
|
int32_t cgroupLen = strlen(consumerGroup);
|
||||||
|
|
||||||
SArray *newSub = NULL;
|
SArray *newSub = NULL;
|
||||||
int newTopicNum = pSubscribe->topicNum;
|
int newTopicNum = pSubscribe->topicNum;
|
||||||
|
@ -216,13 +216,14 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
|
||||||
newSub = taosArrayInit(newTopicNum, sizeof(SMqConsumerTopic));
|
newSub = taosArrayInit(newTopicNum, sizeof(SMqConsumerTopic));
|
||||||
}
|
}
|
||||||
for (int i = 0; i < newTopicNum; i++) {
|
for (int i = 0; i < newTopicNum; i++) {
|
||||||
char *topic = pSubscribe->topicName[i];
|
char *newTopicName = taosArrayGetP(newSub, i);
|
||||||
SMqConsumerTopic *pConsumerTopic = malloc(sizeof(SMqConsumerTopic));
|
SMqConsumerTopic *pConsumerTopic = malloc(sizeof(SMqConsumerTopic));
|
||||||
if (pConsumerTopic == NULL) {
|
if (pConsumerTopic == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
// TODO: free
|
// TODO: free
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
strcpy(pConsumerTopic->name, newTopicName);
|
||||||
pConsumerTopic->vgroups = tdListNew(sizeof(int64_t));
|
pConsumerTopic->vgroups = tdListNew(sizeof(int64_t));
|
||||||
taosArrayPush(newSub, pConsumerTopic);
|
taosArrayPush(newSub, pConsumerTopic);
|
||||||
free(pConsumerTopic);
|
free(pConsumerTopic);
|
||||||
|
@ -239,7 +240,8 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
strcpy(pConsumer->cgroup, pSubscribe->consumerGroup);
|
pConsumer->consumerId = consumerId;
|
||||||
|
strcpy(pConsumer->cgroup, consumerGroup);
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
oldSub = pConsumer->topics;
|
oldSub = pConsumer->topics;
|
||||||
|
@ -260,6 +262,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
|
||||||
j++;
|
j++;
|
||||||
} else if (j >= oldTopicNum) {
|
} else if (j >= oldTopicNum) {
|
||||||
pNewTopic = taosArrayGet(newSub, i);
|
pNewTopic = taosArrayGet(newSub, i);
|
||||||
|
i++;
|
||||||
} else {
|
} else {
|
||||||
pNewTopic = taosArrayGet(newSub, i);
|
pNewTopic = taosArrayGet(newSub, i);
|
||||||
pOldTopic = taosArrayGet(oldSub, j);
|
pOldTopic = taosArrayGet(oldSub, j);
|
||||||
|
@ -292,7 +295,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
|
||||||
|
|
||||||
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, oldTopicName);
|
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, oldTopicName);
|
||||||
ASSERT(pTopic != NULL);
|
ASSERT(pTopic != NULL);
|
||||||
SMqCGroup *pGroup = taosHashGet(pTopic->cgroups, pSubscribe->consumerGroup, strlen(pSubscribe->consumerGroup));
|
SMqCGroup *pGroup = taosHashGet(pTopic->cgroups, consumerGroup, cgroupLen);
|
||||||
while ((pn = tdListNext(&iter)) != NULL) {
|
while ((pn = tdListNext(&iter)) != NULL) {
|
||||||
int32_t vgId = *(int64_t *)pn->data;
|
int32_t vgId = *(int64_t *)pn->data;
|
||||||
SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
|
SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
|
||||||
|
@ -302,8 +305,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
// acquire and get epset
|
// acquire and get epset
|
||||||
void *pMqVgSetReq =
|
void *pMqVgSetReq = mndBuildMqVGroupSetReq(pMnode, oldTopicName, vgId, consumerId, consumerGroup);
|
||||||
mndBuildMqVGroupSetReq(pMnode, oldTopicName, vgId, pSubscribe->consumerId, pSubscribe->consumerGroup);
|
|
||||||
// TODO:serialize
|
// TODO:serialize
|
||||||
if (pMsg == NULL) {
|
if (pMsg == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
@ -321,7 +323,8 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
taosHashRemove(pTopic->cgroups, pSubscribe->consumerGroup, strlen(pSubscribe->consumerGroup));
|
taosHashRemove(pTopic->cgroups, consumerGroup, cgroupLen);
|
||||||
|
mndReleaseTopic(pMnode, pTopic);
|
||||||
|
|
||||||
} else if (pNewTopic != NULL) {
|
} else if (pNewTopic != NULL) {
|
||||||
ASSERT(pOldTopic == NULL);
|
ASSERT(pOldTopic == NULL);
|
||||||
|
@ -330,7 +333,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
|
||||||
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, newTopicName);
|
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, newTopicName);
|
||||||
ASSERT(pTopic != NULL);
|
ASSERT(pTopic != NULL);
|
||||||
|
|
||||||
SMqCGroup *pGroup = taosHashGet(pTopic->cgroups, pSubscribe->consumerGroup, strlen(pSubscribe->consumerGroup));
|
SMqCGroup *pGroup = taosHashGet(pTopic->cgroups, consumerGroup, cgroupLen);
|
||||||
if (pGroup == NULL) {
|
if (pGroup == NULL) {
|
||||||
// add new group
|
// add new group
|
||||||
pGroup = malloc(sizeof(SMqCGroup));
|
pGroup = malloc(sizeof(SMqCGroup));
|
||||||
|
@ -346,18 +349,20 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
|
||||||
}
|
}
|
||||||
pGroup->status = 0;
|
pGroup->status = 0;
|
||||||
// add into cgroups
|
// add into cgroups
|
||||||
taosHashPut(pTopic->cgroups, pSubscribe->consumerGroup, strlen(pSubscribe->consumerGroup), pGroup,
|
taosHashPut(pTopic->cgroups, consumerGroup, cgroupLen, pGroup, sizeof(SMqCGroup));
|
||||||
sizeof(SMqCGroup));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// put the consumer into list
|
// put the consumer into list
|
||||||
// rebalance will be triggered by timer
|
// rebalance will be triggered by timer
|
||||||
tdListAppend(pGroup->consumerIds, &pSubscribe->consumerId);
|
tdListAppend(pGroup->consumerIds, &consumerId);
|
||||||
|
|
||||||
SSdbRaw *pTopicRaw = mndTopicActionEncode(pTopic);
|
SSdbRaw *pTopicRaw = mndTopicActionEncode(pTopic);
|
||||||
sdbSetRawStatus(pTopicRaw, SDB_STATUS_READY);
|
sdbSetRawStatus(pTopicRaw, SDB_STATUS_READY);
|
||||||
// TODO: error handling
|
// TODO: error handling
|
||||||
mndTransAppendRedolog(pTrans, pTopicRaw);
|
mndTransAppendRedolog(pTrans, pTopicRaw);
|
||||||
|
|
||||||
|
mndReleaseTopic(pMnode, pTopic);
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
}
|
}
|
||||||
|
@ -376,11 +381,13 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
|
||||||
if (mndTransPrepare(pMnode, pTrans) != 0) {
|
if (mndTransPrepare(pMnode, pTrans) != 0) {
|
||||||
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
|
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
|
||||||
mndTransDrop(pTrans);
|
mndTransDrop(pTrans);
|
||||||
|
mndReleaseConsumer(pMnode, pConsumer);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: free memory
|
// TODO: free memory
|
||||||
mndTransDrop(pTrans);
|
mndTransDrop(pTrans);
|
||||||
|
mndReleaseConsumer(pMnode, pConsumer);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -58,25 +58,36 @@ int32_t mndInitTopic(SMnode *pMnode) {
|
||||||
void mndCleanupTopic(SMnode *pMnode) {}
|
void mndCleanupTopic(SMnode *pMnode) {}
|
||||||
|
|
||||||
SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
|
SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
|
||||||
int32_t size = sizeof(SMqTopicObj) + MND_TOPIC_RESERVE_SIZE;
|
int32_t size = sizeof(SMqTopicObj) + MND_TOPIC_RESERVE_SIZE;
|
||||||
SSdbRaw *pRaw = sdbAllocRaw(SDB_TOPIC, MND_TOPIC_VER_NUMBER, size);
|
SSdbRaw *pRaw = sdbAllocRaw(SDB_TOPIC, MND_TOPIC_VER_NUMBER, size);
|
||||||
if (pRaw == NULL) goto WTF;
|
if (pRaw == NULL) goto TOPIC_ENCODE_OVER;
|
||||||
|
|
||||||
int32_t dataPos = 0;
|
int32_t dataPos = 0;
|
||||||
SDB_SET_BINARY(pRaw, dataPos, pTopic->name, TSDB_TABLE_FNAME_LEN, WTF);
|
SDB_SET_BINARY(pRaw, dataPos, pTopic->name, TSDB_TABLE_FNAME_LEN, TOPIC_ENCODE_OVER);
|
||||||
SDB_SET_BINARY(pRaw, dataPos, pTopic->db, TSDB_DB_FNAME_LEN, WTF);
|
SDB_SET_BINARY(pRaw, dataPos, pTopic->db, TSDB_DB_FNAME_LEN, TOPIC_ENCODE_OVER);
|
||||||
SDB_SET_INT64(pRaw, dataPos, pTopic->createTime, WTF);
|
SDB_SET_INT64(pRaw, dataPos, pTopic->createTime, TOPIC_ENCODE_OVER);
|
||||||
SDB_SET_INT64(pRaw, dataPos, pTopic->updateTime, WTF);
|
SDB_SET_INT64(pRaw, dataPos, pTopic->updateTime, TOPIC_ENCODE_OVER);
|
||||||
SDB_SET_INT64(pRaw, dataPos, pTopic->uid, WTF);
|
SDB_SET_INT64(pRaw, dataPos, pTopic->uid, TOPIC_ENCODE_OVER);
|
||||||
SDB_SET_INT64(pRaw, dataPos, pTopic->dbUid, WTF);
|
SDB_SET_INT64(pRaw, dataPos, pTopic->dbUid, TOPIC_ENCODE_OVER);
|
||||||
SDB_SET_INT32(pRaw, dataPos, pTopic->version, WTF);
|
SDB_SET_INT32(pRaw, dataPos, pTopic->version, TOPIC_ENCODE_OVER);
|
||||||
SDB_SET_INT32(pRaw, dataPos, pTopic->sqlLen, WTF);
|
SDB_SET_INT32(pRaw, dataPos, pTopic->sqlLen, TOPIC_ENCODE_OVER);
|
||||||
SDB_SET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, WTF);
|
SDB_SET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_ENCODE_OVER);
|
||||||
|
|
||||||
SDB_SET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, WTF);
|
SDB_SET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_ENCODE_OVER);
|
||||||
SDB_SET_DATALEN(pRaw, dataPos, WTF);
|
SDB_SET_DATALEN(pRaw, dataPos, TOPIC_ENCODE_OVER);
|
||||||
|
|
||||||
WTF:
|
terrno = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
|
TOPIC_ENCODE_OVER:
|
||||||
|
if (terrno != TSDB_CODE_SUCCESS) {
|
||||||
|
mError("topic:%s, failed to encode to raw:%p since %s", pTopic->name, pRaw, terrstr());
|
||||||
|
sdbFreeRaw(pRaw);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
mTrace("topic:%s, encode to raw:%p, row:%p", pTopic->name, pRaw, pTopic);
|
||||||
return pRaw;
|
return pRaw;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -115,10 +126,10 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
|
||||||
|
|
||||||
SDB_GET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_DECODE_OVER)
|
SDB_GET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_DECODE_OVER)
|
||||||
|
|
||||||
terrno = 0;
|
terrno = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
TOPIC_DECODE_OVER:
|
TOPIC_DECODE_OVER:
|
||||||
if (terrno != 0) {
|
if (terrno != TSDB_CODE_SUCCESS) {
|
||||||
mError("topic:%s, failed to decode from raw:%p since %s", pTopic->name, pRaw, terrstr());
|
mError("topic:%s, failed to decode from raw:%p since %s", pTopic->name, pRaw, terrstr());
|
||||||
tfree(pRow);
|
tfree(pRow);
|
||||||
return NULL;
|
return NULL;
|
||||||
|
|
|
@ -799,6 +799,11 @@ FORCE_INLINE int32_t taosHashGetKey(void *data, void** key, size_t* keyLen) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
FORCE_INLINE int32_t taosHashGetDataLen(void *data) {
|
||||||
|
SHashNode * node = GET_HASH_PNODE(data);
|
||||||
|
return node->keyLen;
|
||||||
|
}
|
||||||
|
|
||||||
FORCE_INLINE uint32_t taosHashGetDataKeyLen(SHashObj *pHashObj, void *data) {
|
FORCE_INLINE uint32_t taosHashGetDataKeyLen(SHashObj *pHashObj, void *data) {
|
||||||
SHashNode * node = GET_HASH_PNODE(data);
|
SHashNode * node = GET_HASH_PNODE(data);
|
||||||
return node->keyLen;
|
return node->keyLen;
|
||||||
|
|
Loading…
Reference in New Issue