Merge pull request #14993 from taosdata/feature/stream
feat(tmq): support background heartbeat
This commit is contained in:
commit
e4152b433c
|
@ -75,13 +75,18 @@ typedef uint16_t tmsg_t;
|
||||||
#define TSDB_IE_TYPE_DNODE_EXT 6
|
#define TSDB_IE_TYPE_DNODE_EXT 6
|
||||||
#define TSDB_IE_TYPE_DNODE_STATE 7
|
#define TSDB_IE_TYPE_DNODE_STATE 7
|
||||||
|
|
||||||
enum { CONN_TYPE__QUERY = 1, CONN_TYPE__TMQ, CONN_TYPE__UDFD, CONN_TYPE__MAX };
|
enum {
|
||||||
|
CONN_TYPE__QUERY = 1,
|
||||||
|
CONN_TYPE__TMQ,
|
||||||
|
CONN_TYPE__UDFD,
|
||||||
|
CONN_TYPE__MAX,
|
||||||
|
};
|
||||||
|
|
||||||
enum {
|
enum {
|
||||||
HEARTBEAT_KEY_USER_AUTHINFO = 1,
|
HEARTBEAT_KEY_USER_AUTHINFO = 1,
|
||||||
HEARTBEAT_KEY_DBINFO,
|
HEARTBEAT_KEY_DBINFO,
|
||||||
HEARTBEAT_KEY_STBINFO,
|
HEARTBEAT_KEY_STBINFO,
|
||||||
HEARTBEAT_KEY_MQ_TMP,
|
HEARTBEAT_KEY_TMQ,
|
||||||
};
|
};
|
||||||
|
|
||||||
typedef enum _mgmt_table {
|
typedef enum _mgmt_table {
|
||||||
|
@ -2146,6 +2151,15 @@ typedef struct {
|
||||||
char cgroup[TSDB_CGROUP_LEN];
|
char cgroup[TSDB_CGROUP_LEN];
|
||||||
} SMqAskEpReq;
|
} SMqAskEpReq;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int64_t consumerId;
|
||||||
|
int32_t epoch;
|
||||||
|
} SMqHbReq;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int8_t reserved;
|
||||||
|
} SMqHbRsp;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t key;
|
int32_t key;
|
||||||
int32_t valueLen;
|
int32_t valueLen;
|
||||||
|
@ -2335,29 +2349,30 @@ static FORCE_INLINE int32_t tDecodeSClientHbKey(SDecoder* pDecoder, SClientHbKey
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
typedef struct SMqHbVgInfo {
|
typedef struct {
|
||||||
int32_t vgId;
|
int32_t vgId;
|
||||||
} SMqHbVgInfo;
|
// TODO stas
|
||||||
|
} SMqReportVgInfo;
|
||||||
|
|
||||||
static FORCE_INLINE int32_t taosEncodeSMqVgInfo(void** buf, const SMqHbVgInfo* pVgInfo) {
|
static FORCE_INLINE int32_t taosEncodeSMqVgInfo(void** buf, const SMqReportVgInfo* pVgInfo) {
|
||||||
int32_t tlen = 0;
|
int32_t tlen = 0;
|
||||||
tlen += taosEncodeFixedI32(buf, pVgInfo->vgId);
|
tlen += taosEncodeFixedI32(buf, pVgInfo->vgId);
|
||||||
return tlen;
|
return tlen;
|
||||||
}
|
}
|
||||||
|
|
||||||
static FORCE_INLINE void* taosDecodeSMqVgInfo(void* buf, SMqHbVgInfo* pVgInfo) {
|
static FORCE_INLINE void* taosDecodeSMqVgInfo(void* buf, SMqReportVgInfo* pVgInfo) {
|
||||||
buf = taosDecodeFixedI32(buf, &pVgInfo->vgId);
|
buf = taosDecodeFixedI32(buf, &pVgInfo->vgId);
|
||||||
return buf;
|
return buf;
|
||||||
}
|
}
|
||||||
|
|
||||||
typedef struct SMqHbTopicInfo {
|
typedef struct {
|
||||||
int32_t epoch;
|
int32_t epoch;
|
||||||
int64_t topicUid;
|
int64_t topicUid;
|
||||||
char name[TSDB_TOPIC_FNAME_LEN];
|
char name[TSDB_TOPIC_FNAME_LEN];
|
||||||
SArray* pVgInfo; // SArray<SMqHbVgInfo>
|
SArray* pVgInfo; // SArray<SMqHbVgInfo>
|
||||||
} SMqHbTopicInfo;
|
} SMqTopicInfo;
|
||||||
|
|
||||||
static FORCE_INLINE int32_t taosEncodeSMqHbTopicInfoMsg(void** buf, const SMqHbTopicInfo* pTopicInfo) {
|
static FORCE_INLINE int32_t taosEncodeSMqTopicInfoMsg(void** buf, const SMqTopicInfo* pTopicInfo) {
|
||||||
int32_t tlen = 0;
|
int32_t tlen = 0;
|
||||||
tlen += taosEncodeFixedI32(buf, pTopicInfo->epoch);
|
tlen += taosEncodeFixedI32(buf, pTopicInfo->epoch);
|
||||||
tlen += taosEncodeFixedI64(buf, pTopicInfo->topicUid);
|
tlen += taosEncodeFixedI64(buf, pTopicInfo->topicUid);
|
||||||
|
@ -2365,35 +2380,35 @@ static FORCE_INLINE int32_t taosEncodeSMqHbTopicInfoMsg(void** buf, const SMqHbT
|
||||||
int32_t sz = taosArrayGetSize(pTopicInfo->pVgInfo);
|
int32_t sz = taosArrayGetSize(pTopicInfo->pVgInfo);
|
||||||
tlen += taosEncodeFixedI32(buf, sz);
|
tlen += taosEncodeFixedI32(buf, sz);
|
||||||
for (int32_t i = 0; i < sz; i++) {
|
for (int32_t i = 0; i < sz; i++) {
|
||||||
SMqHbVgInfo* pVgInfo = (SMqHbVgInfo*)taosArrayGet(pTopicInfo->pVgInfo, i);
|
SMqReportVgInfo* pVgInfo = (SMqReportVgInfo*)taosArrayGet(pTopicInfo->pVgInfo, i);
|
||||||
tlen += taosEncodeSMqVgInfo(buf, pVgInfo);
|
tlen += taosEncodeSMqVgInfo(buf, pVgInfo);
|
||||||
}
|
}
|
||||||
return tlen;
|
return tlen;
|
||||||
}
|
}
|
||||||
|
|
||||||
static FORCE_INLINE void* taosDecodeSMqHbTopicInfoMsg(void* buf, SMqHbTopicInfo* pTopicInfo) {
|
static FORCE_INLINE void* taosDecodeSMqTopicInfoMsg(void* buf, SMqTopicInfo* pTopicInfo) {
|
||||||
buf = taosDecodeFixedI32(buf, &pTopicInfo->epoch);
|
buf = taosDecodeFixedI32(buf, &pTopicInfo->epoch);
|
||||||
buf = taosDecodeFixedI64(buf, &pTopicInfo->topicUid);
|
buf = taosDecodeFixedI64(buf, &pTopicInfo->topicUid);
|
||||||
buf = taosDecodeStringTo(buf, pTopicInfo->name);
|
buf = taosDecodeStringTo(buf, pTopicInfo->name);
|
||||||
int32_t sz;
|
int32_t sz;
|
||||||
buf = taosDecodeFixedI32(buf, &sz);
|
buf = taosDecodeFixedI32(buf, &sz);
|
||||||
pTopicInfo->pVgInfo = taosArrayInit(sz, sizeof(SMqHbVgInfo));
|
pTopicInfo->pVgInfo = taosArrayInit(sz, sizeof(SMqReportVgInfo));
|
||||||
for (int32_t i = 0; i < sz; i++) {
|
for (int32_t i = 0; i < sz; i++) {
|
||||||
SMqHbVgInfo vgInfo;
|
SMqReportVgInfo vgInfo;
|
||||||
buf = taosDecodeSMqVgInfo(buf, &vgInfo);
|
buf = taosDecodeSMqVgInfo(buf, &vgInfo);
|
||||||
taosArrayPush(pTopicInfo->pVgInfo, &vgInfo);
|
taosArrayPush(pTopicInfo->pVgInfo, &vgInfo);
|
||||||
}
|
}
|
||||||
return buf;
|
return buf;
|
||||||
}
|
}
|
||||||
|
|
||||||
typedef struct SMqHbMsg {
|
typedef struct {
|
||||||
int32_t status; // ask hb endpoint
|
int32_t status; // ask hb endpoint
|
||||||
int32_t epoch;
|
int32_t epoch;
|
||||||
int64_t consumerId;
|
int64_t consumerId;
|
||||||
SArray* pTopics; // SArray<SMqHbTopicInfo>
|
SArray* pTopics; // SArray<SMqHbTopicInfo>
|
||||||
} SMqHbMsg;
|
} SMqReportReq;
|
||||||
|
|
||||||
static FORCE_INLINE int32_t taosEncodeSMqMsg(void** buf, const SMqHbMsg* pMsg) {
|
static FORCE_INLINE int32_t taosEncodeSMqReportMsg(void** buf, const SMqReportReq* pMsg) {
|
||||||
int32_t tlen = 0;
|
int32_t tlen = 0;
|
||||||
tlen += taosEncodeFixedI32(buf, pMsg->status);
|
tlen += taosEncodeFixedI32(buf, pMsg->status);
|
||||||
tlen += taosEncodeFixedI32(buf, pMsg->epoch);
|
tlen += taosEncodeFixedI32(buf, pMsg->epoch);
|
||||||
|
@ -2401,22 +2416,22 @@ static FORCE_INLINE int32_t taosEncodeSMqMsg(void** buf, const SMqHbMsg* pMsg) {
|
||||||
int32_t sz = taosArrayGetSize(pMsg->pTopics);
|
int32_t sz = taosArrayGetSize(pMsg->pTopics);
|
||||||
tlen += taosEncodeFixedI32(buf, sz);
|
tlen += taosEncodeFixedI32(buf, sz);
|
||||||
for (int32_t i = 0; i < sz; i++) {
|
for (int32_t i = 0; i < sz; i++) {
|
||||||
SMqHbTopicInfo* topicInfo = (SMqHbTopicInfo*)taosArrayGet(pMsg->pTopics, i);
|
SMqTopicInfo* topicInfo = (SMqTopicInfo*)taosArrayGet(pMsg->pTopics, i);
|
||||||
tlen += taosEncodeSMqHbTopicInfoMsg(buf, topicInfo);
|
tlen += taosEncodeSMqTopicInfoMsg(buf, topicInfo);
|
||||||
}
|
}
|
||||||
return tlen;
|
return tlen;
|
||||||
}
|
}
|
||||||
|
|
||||||
static FORCE_INLINE void* taosDecodeSMqMsg(void* buf, SMqHbMsg* pMsg) {
|
static FORCE_INLINE void* taosDecodeSMqReportMsg(void* buf, SMqReportReq* pMsg) {
|
||||||
buf = taosDecodeFixedI32(buf, &pMsg->status);
|
buf = taosDecodeFixedI32(buf, &pMsg->status);
|
||||||
buf = taosDecodeFixedI32(buf, &pMsg->epoch);
|
buf = taosDecodeFixedI32(buf, &pMsg->epoch);
|
||||||
buf = taosDecodeFixedI64(buf, &pMsg->consumerId);
|
buf = taosDecodeFixedI64(buf, &pMsg->consumerId);
|
||||||
int32_t sz;
|
int32_t sz;
|
||||||
buf = taosDecodeFixedI32(buf, &sz);
|
buf = taosDecodeFixedI32(buf, &sz);
|
||||||
pMsg->pTopics = taosArrayInit(sz, sizeof(SMqHbTopicInfo));
|
pMsg->pTopics = taosArrayInit(sz, sizeof(SMqTopicInfo));
|
||||||
for (int32_t i = 0; i < sz; i++) {
|
for (int32_t i = 0; i < sz; i++) {
|
||||||
SMqHbTopicInfo topicInfo;
|
SMqTopicInfo topicInfo;
|
||||||
buf = taosDecodeSMqHbTopicInfoMsg(buf, &topicInfo);
|
buf = taosDecodeSMqTopicInfoMsg(buf, &topicInfo);
|
||||||
taosArrayPush(pMsg->pTopics, &topicInfo);
|
taosArrayPush(pMsg->pTopics, &topicInfo);
|
||||||
}
|
}
|
||||||
return buf;
|
return buf;
|
||||||
|
@ -2921,89 +2936,6 @@ typedef struct {
|
||||||
int32_t tEncodeSMqDataRsp(SEncoder* pEncoder, const SMqDataRsp* pRsp);
|
int32_t tEncodeSMqDataRsp(SEncoder* pEncoder, const SMqDataRsp* pRsp);
|
||||||
int32_t tDecodeSMqDataRsp(SDecoder* pDecoder, SMqDataRsp* pRsp);
|
int32_t tDecodeSMqDataRsp(SDecoder* pDecoder, SMqDataRsp* pRsp);
|
||||||
|
|
||||||
#if 0
|
|
||||||
typedef struct {
|
|
||||||
SMqRspHead head;
|
|
||||||
int64_t reqOffset;
|
|
||||||
int64_t rspOffset;
|
|
||||||
int32_t skipLogNum;
|
|
||||||
int32_t blockNum;
|
|
||||||
int8_t withTbName;
|
|
||||||
int8_t withSchema;
|
|
||||||
SArray* blockDataLen; // SArray<int32_t>
|
|
||||||
SArray* blockData; // SArray<SRetrieveTableRsp*>
|
|
||||||
SArray* blockTbName; // SArray<char*>
|
|
||||||
SArray* blockSchema; // SArray<SSchemaWrapper>
|
|
||||||
} SMqDataBlkRsp;
|
|
||||||
|
|
||||||
static FORCE_INLINE int32_t tEncodeSMqDataBlkRsp(void** buf, const SMqDataBlkRsp* pRsp) {
|
|
||||||
int32_t tlen = 0;
|
|
||||||
tlen += taosEncodeFixedI64(buf, pRsp->reqOffset);
|
|
||||||
tlen += taosEncodeFixedI64(buf, pRsp->rspOffset);
|
|
||||||
tlen += taosEncodeFixedI32(buf, pRsp->skipLogNum);
|
|
||||||
tlen += taosEncodeFixedI32(buf, pRsp->blockNum);
|
|
||||||
if (pRsp->blockNum != 0) {
|
|
||||||
tlen += taosEncodeFixedI8(buf, pRsp->withTbName);
|
|
||||||
tlen += taosEncodeFixedI8(buf, pRsp->withSchema);
|
|
||||||
|
|
||||||
for (int32_t i = 0; i < pRsp->blockNum; i++) {
|
|
||||||
int32_t bLen = *(int32_t*)taosArrayGet(pRsp->blockDataLen, i);
|
|
||||||
void* data = taosArrayGetP(pRsp->blockData, i);
|
|
||||||
tlen += taosEncodeFixedI32(buf, bLen);
|
|
||||||
tlen += taosEncodeBinary(buf, data, bLen);
|
|
||||||
if (pRsp->withSchema) {
|
|
||||||
SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(pRsp->blockSchema, i);
|
|
||||||
tlen += taosEncodeSSchemaWrapper(buf, pSW);
|
|
||||||
}
|
|
||||||
if (pRsp->withTbName) {
|
|
||||||
char* tbName = (char*)taosArrayGetP(pRsp->blockTbName, i);
|
|
||||||
tlen += taosEncodeString(buf, tbName);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return tlen;
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE void* tDecodeSMqDataBlkRsp(const void* buf, SMqDataBlkRsp* pRsp) {
|
|
||||||
buf = taosDecodeFixedI64(buf, &pRsp->reqOffset);
|
|
||||||
buf = taosDecodeFixedI64(buf, &pRsp->rspOffset);
|
|
||||||
buf = taosDecodeFixedI32(buf, &pRsp->skipLogNum);
|
|
||||||
buf = taosDecodeFixedI32(buf, &pRsp->blockNum);
|
|
||||||
if (pRsp->blockNum != 0) {
|
|
||||||
pRsp->blockData = taosArrayInit(pRsp->blockNum, sizeof(void*));
|
|
||||||
pRsp->blockDataLen = taosArrayInit(pRsp->blockNum, sizeof(int32_t));
|
|
||||||
buf = taosDecodeFixedI8(buf, &pRsp->withTbName);
|
|
||||||
buf = taosDecodeFixedI8(buf, &pRsp->withSchema);
|
|
||||||
if (pRsp->withTbName) {
|
|
||||||
pRsp->blockTbName = taosArrayInit(pRsp->blockNum, sizeof(void*));
|
|
||||||
}
|
|
||||||
if (pRsp->withSchema) {
|
|
||||||
pRsp->blockSchema = taosArrayInit(pRsp->blockNum, sizeof(void*));
|
|
||||||
}
|
|
||||||
|
|
||||||
for (int32_t i = 0; i < pRsp->blockNum; i++) {
|
|
||||||
int32_t bLen = 0;
|
|
||||||
void* data = NULL;
|
|
||||||
buf = taosDecodeFixedI32(buf, &bLen);
|
|
||||||
buf = taosDecodeBinary(buf, &data, bLen);
|
|
||||||
taosArrayPush(pRsp->blockDataLen, &bLen);
|
|
||||||
taosArrayPush(pRsp->blockData, &data);
|
|
||||||
if (pRsp->withSchema) {
|
|
||||||
SSchemaWrapper* pSW = (SSchemaWrapper*)taosMemoryMalloc(sizeof(SSchemaWrapper));
|
|
||||||
buf = taosDecodeSSchemaWrapper(buf, pSW);
|
|
||||||
taosArrayPush(pRsp->blockSchema, &pSW);
|
|
||||||
}
|
|
||||||
if (pRsp->withTbName) {
|
|
||||||
char* name = NULL;
|
|
||||||
buf = taosDecodeString(buf, &name);
|
|
||||||
taosArrayPush(pRsp->blockTbName, &name);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return (void*)buf;
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
SMqRspHead head;
|
SMqRspHead head;
|
||||||
char cgroup[TSDB_CGROUP_LEN];
|
char cgroup[TSDB_CGROUP_LEN];
|
||||||
|
|
|
@ -144,6 +144,7 @@ enum {
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_MQ_ASK_EP, "ask-ep", SMqAskEpReq, SMqAskEpRsp)
|
TD_DEF_MSG_TYPE(TDMT_MND_MQ_ASK_EP, "ask-ep", SMqAskEpReq, SMqAskEpRsp)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_MQ_CONSUMER_LOST, "consumer-lost", SMqConsumerLostMsg, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_MQ_CONSUMER_LOST, "consumer-lost", SMqConsumerLostMsg, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_MQ_CONSUMER_RECOVER, "consumer-recover", SMqConsumerRecoverMsg, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_MQ_CONSUMER_RECOVER, "consumer-recover", SMqConsumerRecoverMsg, NULL)
|
||||||
|
TD_DEF_MSG_TYPE(TDMT_MND_MQ_HB, "consumer-hb", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_MQ_DO_REBALANCE, "do-rebalance", SMqDoRebalanceMsg, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_MQ_DO_REBALANCE, "do-rebalance", SMqDoRebalanceMsg, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_MQ_DROP_CGROUP, "drop-cgroup", SMqDropCGroupReq, SMqDropCGroupRsp)
|
TD_DEF_MSG_TYPE(TDMT_MND_MQ_DROP_CGROUP, "drop-cgroup", SMqDropCGroupReq, SMqDropCGroupRsp)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_MQ_COMMIT_OFFSET, "mnode-commit-offset", SMqCMCommitOffsetReq, SMqCMCommitOffsetRsp)
|
TD_DEF_MSG_TYPE(TDMT_MND_MQ_COMMIT_OFFSET, "mnode-commit-offset", SMqCMCommitOffsetReq, SMqCMCommitOffsetRsp)
|
||||||
|
|
|
@ -571,7 +571,8 @@ int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
void hbMgrInitMqHbHandle() {
|
static FORCE_INLINE void hbMgrInitHandle() {
|
||||||
|
// init all handle
|
||||||
clientHbMgr.reqHandle[CONN_TYPE__QUERY] = hbQueryHbReqHandle;
|
clientHbMgr.reqHandle[CONN_TYPE__QUERY] = hbQueryHbReqHandle;
|
||||||
clientHbMgr.reqHandle[CONN_TYPE__TMQ] = hbMqHbReqHandle;
|
clientHbMgr.reqHandle[CONN_TYPE__TMQ] = hbMqHbReqHandle;
|
||||||
|
|
||||||
|
@ -579,11 +580,6 @@ void hbMgrInitMqHbHandle() {
|
||||||
clientHbMgr.rspHandle[CONN_TYPE__TMQ] = hbMqHbRspHandle;
|
clientHbMgr.rspHandle[CONN_TYPE__TMQ] = hbMqHbRspHandle;
|
||||||
}
|
}
|
||||||
|
|
||||||
static FORCE_INLINE void hbMgrInitHandle() {
|
|
||||||
// init all handle
|
|
||||||
hbMgrInitMqHbHandle();
|
|
||||||
}
|
|
||||||
|
|
||||||
SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) {
|
SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) {
|
||||||
SClientHbBatchReq *pBatchReq = taosMemoryCalloc(1, sizeof(SClientHbBatchReq));
|
SClientHbBatchReq *pBatchReq = taosMemoryCalloc(1, sizeof(SClientHbBatchReq));
|
||||||
if (pBatchReq == NULL) {
|
if (pBatchReq == NULL) {
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -196,6 +196,7 @@ SArray *mmGetMsgHandles() {
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_TOPIC, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_TOPIC, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_MND_SUBSCRIBE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_MND_SUBSCRIBE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_MND_MQ_ASK_EP, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_MND_MQ_ASK_EP, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
|
||||||
|
if (dmSetMgmtHandle(pArray, TDMT_MND_MQ_HB, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_MND_MQ_DROP_CGROUP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_MND_MQ_DROP_CGROUP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_MND_MQ_DROP_CGROUP_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_MND_MQ_DROP_CGROUP_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_MND_MQ_COMMIT_OFFSET, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_MND_MQ_COMMIT_OFFSET, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
|
|
|
@ -48,6 +48,7 @@ static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter);
|
||||||
|
|
||||||
static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg);
|
static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg);
|
||||||
static int32_t mndProcessAskEpReq(SRpcMsg *pMsg);
|
static int32_t mndProcessAskEpReq(SRpcMsg *pMsg);
|
||||||
|
static int32_t mndProcessMqHbReq(SRpcMsg *pMsg);
|
||||||
static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg);
|
static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg);
|
||||||
static int32_t mndProcessConsumerLostMsg(SRpcMsg *pMsg);
|
static int32_t mndProcessConsumerLostMsg(SRpcMsg *pMsg);
|
||||||
static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg);
|
static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg);
|
||||||
|
@ -62,6 +63,7 @@ int32_t mndInitConsumer(SMnode *pMnode) {
|
||||||
.deleteFp = (SdbDeleteFp)mndConsumerActionDelete};
|
.deleteFp = (SdbDeleteFp)mndConsumerActionDelete};
|
||||||
|
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_SUBSCRIBE, mndProcessSubscribeReq);
|
mndSetMsgHandle(pMnode, TDMT_MND_SUBSCRIBE, mndProcessSubscribeReq);
|
||||||
|
mndSetMsgHandle(pMnode, TDMT_MND_MQ_HB, mndProcessMqHbReq);
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_MQ_ASK_EP, mndProcessAskEpReq);
|
mndSetMsgHandle(pMnode, TDMT_MND_MQ_ASK_EP, mndProcessAskEpReq);
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_MQ_TIMER, mndProcessMqTimerMsg);
|
mndSetMsgHandle(pMnode, TDMT_MND_MQ_TIMER, mndProcessMqTimerMsg);
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_MQ_CONSUMER_LOST, mndProcessConsumerLostMsg);
|
mndSetMsgHandle(pMnode, TDMT_MND_MQ_CONSUMER_LOST, mndProcessConsumerLostMsg);
|
||||||
|
@ -255,24 +257,15 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
|
static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
|
||||||
SMnode *pMnode = pMsg->info.node;
|
SMnode *pMnode = pMsg->info.node;
|
||||||
SMqAskEpReq *pReq = (SMqAskEpReq *)pMsg->pCont;
|
SMqHbReq *pReq = (SMqHbReq *)pMsg->pCont;
|
||||||
SMqAskEpRsp rsp = {0};
|
int64_t consumerId = be64toh(pReq->consumerId);
|
||||||
int64_t consumerId = be64toh(pReq->consumerId);
|
|
||||||
int32_t epoch = ntohl(pReq->epoch);
|
|
||||||
|
|
||||||
SMqConsumerObj *pConsumer = mndAcquireConsumer(pMsg->info.node, consumerId);
|
SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId);
|
||||||
if (pConsumer == NULL) {
|
|
||||||
terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
ASSERT(strcmp(pReq->cgroup, pConsumer->cgroup) == 0);
|
|
||||||
/*int32_t hbStatus = atomic_load_32(&pConsumer->hbStatus);*/
|
|
||||||
atomic_store_32(&pConsumer->hbStatus, 0);
|
atomic_store_32(&pConsumer->hbStatus, 0);
|
||||||
|
|
||||||
// 1. check consumer status
|
|
||||||
int32_t status = atomic_load_32(&pConsumer->status);
|
int32_t status = atomic_load_32(&pConsumer->status);
|
||||||
|
|
||||||
if (status == MQ_CONSUMER_STATUS__LOST_REBD) {
|
if (status == MQ_CONSUMER_STATUS__LOST_REBD) {
|
||||||
|
@ -286,6 +279,46 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
|
||||||
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, pRpcMsg);
|
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, pRpcMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
mndReleaseConsumer(pMnode, pConsumer);
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
|
||||||
|
SMnode *pMnode = pMsg->info.node;
|
||||||
|
SMqAskEpReq *pReq = (SMqAskEpReq *)pMsg->pCont;
|
||||||
|
SMqAskEpRsp rsp = {0};
|
||||||
|
int64_t consumerId = be64toh(pReq->consumerId);
|
||||||
|
int32_t epoch = ntohl(pReq->epoch);
|
||||||
|
|
||||||
|
SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId);
|
||||||
|
if (pConsumer == NULL) {
|
||||||
|
terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
ASSERT(strcmp(pReq->cgroup, pConsumer->cgroup) == 0);
|
||||||
|
|
||||||
|
#if 1
|
||||||
|
atomic_store_32(&pConsumer->hbStatus, 0);
|
||||||
|
#endif
|
||||||
|
|
||||||
|
// 1. check consumer status
|
||||||
|
int32_t status = atomic_load_32(&pConsumer->status);
|
||||||
|
|
||||||
|
#if 1
|
||||||
|
if (status == MQ_CONSUMER_STATUS__LOST_REBD) {
|
||||||
|
SMqConsumerRecoverMsg *pRecoverMsg = rpcMallocCont(sizeof(SMqConsumerRecoverMsg));
|
||||||
|
|
||||||
|
pRecoverMsg->consumerId = consumerId;
|
||||||
|
SRpcMsg *pRpcMsg = taosMemoryCalloc(1, sizeof(SRpcMsg));
|
||||||
|
pRpcMsg->msgType = TDMT_MND_MQ_CONSUMER_RECOVER;
|
||||||
|
pRpcMsg->pCont = pRecoverMsg;
|
||||||
|
pRpcMsg->contLen = sizeof(SMqConsumerRecoverMsg);
|
||||||
|
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, pRpcMsg);
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
if (status != MQ_CONSUMER_STATUS__READY) {
|
if (status != MQ_CONSUMER_STATUS__READY) {
|
||||||
terrno = TSDB_CODE_MND_CONSUMER_NOT_READY;
|
terrno = TSDB_CODE_MND_CONSUMER_NOT_READY;
|
||||||
return -1;
|
return -1;
|
||||||
|
|
|
@ -388,67 +388,7 @@ static void mndCancelGetNextApp(SMnode *pMnode, void *pIter) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static SClientHbRsp *mndMqHbBuildRsp(SMnode *pMnode, SClientHbReq *pReq) {
|
static SClientHbRsp *mndMqHbBuildRsp(SMnode *pMnode, SClientHbReq *pReq) {
|
||||||
#if 0
|
//
|
||||||
SClientHbRsp* pRsp = taosMemoryMalloc(sizeof(SClientHbRsp));
|
|
||||||
if (pRsp == NULL) {
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
pRsp->connKey = pReq->connKey;
|
|
||||||
SMqHbBatchRsp batchRsp;
|
|
||||||
batchRsp.batchRsps = taosArrayInit(0, sizeof(SMqHbRsp));
|
|
||||||
if (batchRsp.batchRsps == NULL) {
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
SClientHbKey connKey = pReq->connKey;
|
|
||||||
SHashObj* pObj = pReq->info;
|
|
||||||
SKv* pKv = taosHashGet(pObj, "mq-tmp", strlen("mq-tmp") + 1);
|
|
||||||
if (pKv == NULL) {
|
|
||||||
taosMemoryFree(pRsp);
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
SMqHbMsg mqHb;
|
|
||||||
taosDecodeSMqMsg(pKv->value, &mqHb);
|
|
||||||
/*int64_t clientUid = htonl(pKv->value);*/
|
|
||||||
/*if (mqHb.epoch )*/
|
|
||||||
int sz = taosArrayGetSize(mqHb.pTopics);
|
|
||||||
SMqConsumerObj* pConsumer = mndAcquireConsumer(pMnode, mqHb.consumerId);
|
|
||||||
for (int i = 0; i < sz; i++) {
|
|
||||||
SMqHbOneTopicBatchRsp innerBatchRsp;
|
|
||||||
innerBatchRsp.rsps = taosArrayInit(sz, sizeof(SMqHbRsp));
|
|
||||||
if (innerBatchRsp.rsps == NULL) {
|
|
||||||
//TODO
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
SMqHbTopicInfo* topicInfo = taosArrayGet(mqHb.pTopics, i);
|
|
||||||
SMqConsumerTopic* pConsumerTopic = taosHashGet(pConsumer->topicHash, topicInfo->name, strlen(topicInfo->name)+1);
|
|
||||||
if (pConsumerTopic->epoch != topicInfo->epoch) {
|
|
||||||
//add new vgids into rsp
|
|
||||||
int vgSz = taosArrayGetSize(topicInfo->pVgInfo);
|
|
||||||
for (int j = 0; j < vgSz; j++) {
|
|
||||||
SMqHbRsp innerRsp;
|
|
||||||
SMqHbVgInfo* pVgInfo = taosArrayGet(topicInfo->pVgInfo, i);
|
|
||||||
SVgObj* pVgObj = mndAcquireVgroup(pMnode, pVgInfo->vgId);
|
|
||||||
innerRsp.epSet = mndGetVgroupEpset(pMnode, pVgObj);
|
|
||||||
taosArrayPush(innerBatchRsp.rsps, &innerRsp);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
taosArrayPush(batchRsp.batchRsps, &innerBatchRsp);
|
|
||||||
}
|
|
||||||
int32_t tlen = taosEncodeSMqHbBatchRsp(NULL, &batchRsp);
|
|
||||||
void* buf = taosMemoryMalloc(tlen);
|
|
||||||
if (buf == NULL) {
|
|
||||||
//TODO
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
void* abuf = buf;
|
|
||||||
taosEncodeSMqHbBatchRsp(&abuf, &batchRsp);
|
|
||||||
pRsp->body = buf;
|
|
||||||
pRsp->bodyLen = tlen;
|
|
||||||
return pRsp;
|
|
||||||
#endif
|
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue