feat:[TS-4592] clear lost status for consumer

This commit is contained in:
wangmm0220 2024-07-10 18:42:20 +08:00
parent c3e09d0852
commit b28437aaba
9 changed files with 228 additions and 131 deletions

View File

@ -2772,6 +2772,9 @@ enum {
TOPIC_SUB_TYPE__COLUMN, TOPIC_SUB_TYPE__COLUMN,
}; };
#define DEFAULT_MAX_POLL_INTERVAL 3000000
#define DEFAULT_SESSION_TIMEOUT 10000
typedef struct { typedef struct {
char name[TSDB_TOPIC_FNAME_LEN]; // accout.topic char name[TSDB_TOPIC_FNAME_LEN]; // accout.topic
int8_t igExists; int8_t igExists;
@ -2794,7 +2797,7 @@ typedef struct {
typedef struct { typedef struct {
int64_t consumerId; int64_t consumerId;
char cgroup[TSDB_CGROUP_LEN]; char cgroup[TSDB_CGROUP_LEN];
char clientId[256]; char clientId[TSDB_CLIENT_ID_LEN];
SArray* topicNames; // SArray<char**> SArray* topicNames; // SArray<char**>
int8_t withTbName; int8_t withTbName;
@ -2803,6 +2806,8 @@ typedef struct {
int8_t resetOffsetCfg; int8_t resetOffsetCfg;
int8_t enableReplay; int8_t enableReplay;
int8_t enableBatchMeta; int8_t enableBatchMeta;
int32_t sessionTimeoutMs;
int32_t maxPollIntervalMs;
} SCMSubscribeReq; } SCMSubscribeReq;
static FORCE_INLINE int32_t tSerializeSCMSubscribeReq(void** buf, const SCMSubscribeReq* pReq) { static FORCE_INLINE int32_t tSerializeSCMSubscribeReq(void** buf, const SCMSubscribeReq* pReq) {
@ -2824,11 +2829,14 @@ static FORCE_INLINE int32_t tSerializeSCMSubscribeReq(void** buf, const SCMSubsc
tlen += taosEncodeFixedI8(buf, pReq->resetOffsetCfg); tlen += taosEncodeFixedI8(buf, pReq->resetOffsetCfg);
tlen += taosEncodeFixedI8(buf, pReq->enableReplay); tlen += taosEncodeFixedI8(buf, pReq->enableReplay);
tlen += taosEncodeFixedI8(buf, pReq->enableBatchMeta); tlen += taosEncodeFixedI8(buf, pReq->enableBatchMeta);
tlen += taosEncodeFixedI32(buf, pReq->sessionTimeoutMs);
tlen += taosEncodeFixedI32(buf, pReq->maxPollIntervalMs);
return tlen; return tlen;
} }
static FORCE_INLINE void* tDeserializeSCMSubscribeReq(void* buf, SCMSubscribeReq* pReq) { static FORCE_INLINE void* tDeserializeSCMSubscribeReq(void* buf, SCMSubscribeReq* pReq, int32_t len) {
void* start = buf;
buf = taosDecodeFixedI64(buf, &pReq->consumerId); buf = taosDecodeFixedI64(buf, &pReq->consumerId);
buf = taosDecodeStringTo(buf, pReq->cgroup); buf = taosDecodeStringTo(buf, pReq->cgroup);
buf = taosDecodeStringTo(buf, pReq->clientId); buf = taosDecodeStringTo(buf, pReq->clientId);
@ -2849,6 +2857,14 @@ static FORCE_INLINE void* tDeserializeSCMSubscribeReq(void* buf, SCMSubscribeReq
buf = taosDecodeFixedI8(buf, &pReq->resetOffsetCfg); buf = taosDecodeFixedI8(buf, &pReq->resetOffsetCfg);
buf = taosDecodeFixedI8(buf, &pReq->enableReplay); buf = taosDecodeFixedI8(buf, &pReq->enableReplay);
buf = taosDecodeFixedI8(buf, &pReq->enableBatchMeta); buf = taosDecodeFixedI8(buf, &pReq->enableBatchMeta);
if (buf - start < len) {
buf = taosDecodeFixedI32(buf, &pReq->sessionTimeoutMs);
buf = taosDecodeFixedI32(buf, &pReq->maxPollIntervalMs);
} else {
pReq->sessionTimeoutMs = DEFAULT_SESSION_TIMEOUT;
pReq->maxPollIntervalMs = DEFAULT_MAX_POLL_INTERVAL;
}
return buf; return buf;
} }
@ -4060,6 +4076,7 @@ typedef struct {
int64_t consumerId; int64_t consumerId;
int32_t epoch; int32_t epoch;
SArray* topics; SArray* topics;
int8_t pollFlag;
} SMqHbReq; } SMqHbReq;
typedef struct { typedef struct {

View File

@ -221,6 +221,8 @@ typedef enum ELogicConditionType {
#define TSDB_TABLE_NAME_LEN 193 // it is a null-terminated string #define TSDB_TABLE_NAME_LEN 193 // it is a null-terminated string
#define TSDB_TOPIC_NAME_LEN 193 // it is a null-terminated string #define TSDB_TOPIC_NAME_LEN 193 // it is a null-terminated string
#define TSDB_CGROUP_LEN 193 // it is a null-terminated string #define TSDB_CGROUP_LEN 193 // it is a null-terminated string
#define TSDB_CLIENT_ID_LEN 256 // it is a null-terminated string
#define TSDB_CONSUMER_ID_LEN 32 // it is a null-terminated string
#define TSDB_OFFSET_LEN 64 // it is a null-terminated string #define TSDB_OFFSET_LEN 64 // it is a null-terminated string
#define TSDB_USER_CGROUP_LEN (TSDB_USER_LEN + TSDB_CGROUP_LEN) // it is a null-terminated string #define TSDB_USER_CGROUP_LEN (TSDB_USER_LEN + TSDB_CGROUP_LEN) // it is a null-terminated string
#define TSDB_STREAM_NAME_LEN 193 // it is a null-terminated string #define TSDB_STREAM_NAME_LEN 193 // it is a null-terminated string

View File

@ -37,6 +37,7 @@ struct SMqMgmt {
static TdThreadOnce tmqInit = PTHREAD_ONCE_INIT; // initialize only once static TdThreadOnce tmqInit = PTHREAD_ONCE_INIT; // initialize only once
volatile int32_t tmqInitRes = 0; // initialize rsp code volatile int32_t tmqInitRes = 0; // initialize rsp code
static struct SMqMgmt tmqMgmt = {0}; static struct SMqMgmt tmqMgmt = {0};
static int8_t pollFlag = 0;
typedef struct { typedef struct {
int32_t code; int32_t code;
@ -56,7 +57,7 @@ struct tmq_list_t {
}; };
struct tmq_conf_t { struct tmq_conf_t {
char clientId[256]; char clientId[TSDB_CLIENT_ID_LEN];
char groupId[TSDB_CGROUP_LEN]; char groupId[TSDB_CGROUP_LEN];
int8_t autoCommit; int8_t autoCommit;
int8_t resetOffset; int8_t resetOffset;
@ -66,6 +67,9 @@ struct tmq_conf_t {
int8_t sourceExcluded; // do not consume, bit int8_t sourceExcluded; // do not consume, bit
uint16_t port; uint16_t port;
int32_t autoCommitInterval; int32_t autoCommitInterval;
int32_t sessionTimeoutMs;
int32_t heartBeatIntervalMs;
int32_t maxPollIntervalMs;
char* ip; char* ip;
char* user; char* user;
char* pass; char* pass;
@ -77,15 +81,18 @@ struct tmq_conf_t {
struct tmq_t { struct tmq_t {
int64_t refId; int64_t refId;
char groupId[TSDB_CGROUP_LEN]; char groupId[TSDB_CGROUP_LEN];
char clientId[256]; char clientId[TSDB_CLIENT_ID_LEN];
int8_t withTbName; int8_t withTbName;
int8_t useSnapshot; int8_t useSnapshot;
int8_t autoCommit; int8_t autoCommit;
int32_t autoCommitInterval; int32_t autoCommitInterval;
int32_t sessionTimeoutMs;
int32_t heartBeatIntervalMs;
int32_t maxPollIntervalMs;
int8_t resetOffsetCfg; int8_t resetOffsetCfg;
int8_t replayEnable; int8_t replayEnable;
int8_t sourceExcluded; // do not consume, bit int8_t sourceExcluded; // do not consume, bit
uint64_t consumerId; int64_t consumerId;
tmq_commit_cb* commitCb; tmq_commit_cb* commitCb;
void* commitCbUserParam; void* commitCbUserParam;
int8_t enableBatchMeta; int8_t enableBatchMeta;
@ -272,6 +279,9 @@ tmq_conf_t* tmq_conf_new() {
conf->autoCommitInterval = DEFAULT_AUTO_COMMIT_INTERVAL; conf->autoCommitInterval = DEFAULT_AUTO_COMMIT_INTERVAL;
conf->resetOffset = TMQ_OFFSET__RESET_LATEST; conf->resetOffset = TMQ_OFFSET__RESET_LATEST;
conf->enableBatchMeta = false; conf->enableBatchMeta = false;
conf->heartBeatIntervalMs = DEFAULT_HEARTBEAT_INTERVAL;
conf->maxPollIntervalMs = DEFAULT_MAX_POLL_INTERVAL;
conf->sessionTimeoutMs = DEFAULT_SESSION_TIMEOUT;
return conf; return conf;
} }
@ -301,7 +311,7 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
} }
if (strcasecmp(key, "client.id") == 0) { if (strcasecmp(key, "client.id") == 0) {
tstrncpy(conf->clientId, value, 256); tstrncpy(conf->clientId, value, TSDB_CLIENT_ID_LEN);
return TMQ_CONF_OK; return TMQ_CONF_OK;
} }
@ -318,7 +328,38 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
} }
if (strcasecmp(key, "auto.commit.interval.ms") == 0) { if (strcasecmp(key, "auto.commit.interval.ms") == 0) {
conf->autoCommitInterval = taosStr2int64(value); int64_t tmp = taosStr2int64(value);
if (tmp < 0 || EINVAL == errno || ERANGE == errno) {
return TMQ_CONF_INVALID;
}
conf->autoCommitInterval = (tmp > INT32_MAX ? INT32_MAX : tmp);
return TMQ_CONF_OK;
}
if (strcasecmp(key, "session.timeout.ms") == 0) {
int64_t tmp = taosStr2int64(value);
if (tmp < 6000 || tmp > 1800000){
return TMQ_CONF_INVALID;
}
conf->sessionTimeoutMs = tmp;
return TMQ_CONF_OK;
}
if (strcasecmp(key, "heartbeat.interval.ms") == 0) {
int64_t tmp = taosStr2int64(value);
if (tmp < 1000 || tmp >= conf->sessionTimeoutMs){
return TMQ_CONF_INVALID;
}
conf->heartBeatIntervalMs = tmp;
return TMQ_CONF_OK;
}
if (strcasecmp(key, "max.poll.interval.ms") == 0) {
int64_t tmp = taosStr2int64(value);
if (tmp < 1000 || tmp > INT32_MAX){
return TMQ_CONF_INVALID;
}
conf->maxPollIntervalMs = tmp;
return TMQ_CONF_OK; return TMQ_CONF_OK;
} }
@ -377,7 +418,12 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
} }
if (strcasecmp(key, "td.connect.port") == 0) { if (strcasecmp(key, "td.connect.port") == 0) {
conf->port = taosStr2int64(value); int64_t tmp = taosStr2int64(value);
if (tmp <= 0 || tmp > 65535) {
return TMQ_CONF_INVALID;
}
conf->port = tmp;
return TMQ_CONF_OK; return TMQ_CONF_OK;
} }
@ -813,6 +859,7 @@ void tmqSendHbReq(void* param, void* tmrId) {
SMqHbReq req = {0}; SMqHbReq req = {0};
req.consumerId = tmq->consumerId; req.consumerId = tmq->consumerId;
req.epoch = tmq->epoch; req.epoch = tmq->epoch;
req.pollFlag = atomic_load_8(&pollFlag);
taosRLockLatch(&tmq->lock); taosRLockLatch(&tmq->lock);
req.topics = taosArrayInit(taosArrayGetSize(tmq->clientTopics), sizeof(TopicOffsetRows)); req.topics = taosArrayInit(taosArrayGetSize(tmq->clientTopics), sizeof(TopicOffsetRows));
for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) { for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
@ -878,9 +925,10 @@ void tmqSendHbReq(void* param, void* tmrId) {
tscError("tmqSendHbReq asyncSendMsgToServer failed"); tscError("tmqSendHbReq asyncSendMsgToServer failed");
} }
atomic_val_compare_exchange_8(&pollFlag, 1, 0);
OVER: OVER:
tDestroySMqHbReq(&req); tDestroySMqHbReq(&req);
taosTmrReset(tmqSendHbReq, DEFAULT_HEARTBEAT_INTERVAL, param, tmqMgmt.timer, &tmq->hbLiveTimer); taosTmrReset(tmqSendHbReq, tmq->heartBeatIntervalMs, param, tmqMgmt.timer, &tmq->hbLiveTimer);
taosReleaseRef(tmqMgmt.rsetId, refId); taosReleaseRef(tmqMgmt.rsetId, refId);
} }
@ -1134,6 +1182,9 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
pTmq->useSnapshot = conf->snapEnable; pTmq->useSnapshot = conf->snapEnable;
pTmq->autoCommit = conf->autoCommit; pTmq->autoCommit = conf->autoCommit;
pTmq->autoCommitInterval = conf->autoCommitInterval; pTmq->autoCommitInterval = conf->autoCommitInterval;
pTmq->sessionTimeoutMs = conf->sessionTimeoutMs;
pTmq->heartBeatIntervalMs = conf->heartBeatIntervalMs;
pTmq->maxPollIntervalMs = conf->maxPollIntervalMs;
pTmq->commitCb = conf->commitCb; pTmq->commitCb = conf->commitCb;
pTmq->commitCbUserParam = conf->commitCbUserParam; pTmq->commitCbUserParam = conf->commitCbUserParam;
pTmq->resetOffsetCfg = conf->resetOffset; pTmq->resetOffsetCfg = conf->resetOffset;
@ -1173,7 +1224,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t)); int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
*pRefId = pTmq->refId; *pRefId = pTmq->refId;
pTmq->hbLiveTimer = taosTmrStart(tmqSendHbReq, DEFAULT_HEARTBEAT_INTERVAL, pRefId, tmqMgmt.timer); pTmq->hbLiveTimer = taosTmrStart(tmqSendHbReq, pTmq->heartBeatIntervalMs, pRefId, tmqMgmt.timer);
char buf[TSDB_OFFSET_LEN] = {0}; char buf[TSDB_OFFSET_LEN] = {0};
STqOffsetVal offset = {.type = pTmq->resetOffsetCfg}; STqOffsetVal offset = {.type = pTmq->resetOffsetCfg};
@ -1203,7 +1254,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
tscInfo("consumer:0x%" PRIx64 " cgroup:%s, subscribe %d topics", tmq->consumerId, tmq->groupId, sz); tscInfo("consumer:0x%" PRIx64 " cgroup:%s, subscribe %d topics", tmq->consumerId, tmq->groupId, sz);
req.consumerId = tmq->consumerId; req.consumerId = tmq->consumerId;
tstrncpy(req.clientId, tmq->clientId, 256); tstrncpy(req.clientId, tmq->clientId, TSDB_CLIENT_ID_LEN);
tstrncpy(req.cgroup, tmq->groupId, TSDB_CGROUP_LEN); tstrncpy(req.cgroup, tmq->groupId, TSDB_CGROUP_LEN);
req.topicNames = taosArrayInit(sz, sizeof(void*)); req.topicNames = taosArrayInit(sz, sizeof(void*));
@ -1215,6 +1266,8 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
req.withTbName = tmq->withTbName; req.withTbName = tmq->withTbName;
req.autoCommit = tmq->autoCommit; req.autoCommit = tmq->autoCommit;
req.autoCommitInterval = tmq->autoCommitInterval; req.autoCommitInterval = tmq->autoCommitInterval;
req.sessionTimeoutMs = tmq->sessionTimeoutMs;
req.maxPollIntervalMs = tmq->maxPollIntervalMs;
req.resetOffsetCfg = tmq->resetOffsetCfg; req.resetOffsetCfg = tmq->resetOffsetCfg;
req.enableReplay = tmq->replayEnable; req.enableReplay = tmq->replayEnable;
req.enableBatchMeta = tmq->enableBatchMeta; req.enableBatchMeta = tmq->enableBatchMeta;
@ -2207,6 +2260,8 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
} }
} }
atomic_val_compare_exchange_8(&pollFlag, 0, 1);
while (1) { while (1) {
tmqHandleAllDelayedTask(tmq); tmqHandleAllDelayedTask(tmq);

View File

@ -482,16 +482,16 @@ static const SSysDbTableSchema connectionsSchema[] = {
static const SSysDbTableSchema consumerSchema[] = { static const SSysDbTableSchema consumerSchema[] = {
{.name = "consumer_id", .bytes = 32, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, {.name = "consumer_id", .bytes = TSDB_CONSUMER_ID_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
{.name = "consumer_group", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, {.name = "consumer_group", .bytes = TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
{.name = "client_id", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, {.name = "client_id", .bytes = TSDB_CLIENT_ID_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
{.name = "status", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, {.name = "status", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
{.name = "topics", .bytes = TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, {.name = "topics", .bytes = TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
/*{.name = "end_point", .bytes = TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},*/ /*{.name = "end_point", .bytes = TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},*/
{.name = "up_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false}, {.name = "up_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false},
{.name = "subscribe_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false}, {.name = "subscribe_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false},
{.name = "rebalance_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false}, {.name = "rebalance_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false},
{.name = "parameters", .bytes = 64 + TSDB_OFFSET_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, {.name = "parameters", .bytes = 128 + TSDB_OFFSET_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
}; };
static const SSysDbTableSchema offsetSchema[] = { static const SSysDbTableSchema offsetSchema[] = {

View File

@ -7002,6 +7002,7 @@ int32_t tSerializeSMqHbReq(void *buf, int32_t bufLen, SMqHbReq *pReq) {
} }
} }
if (tEncodeI8(&encoder, pReq->pollFlag) < 0) return -1;
tEndEncode(&encoder); tEndEncode(&encoder);
int32_t tlen = encoder.pos; int32_t tlen = encoder.pos;
@ -7041,6 +7042,9 @@ int32_t tDeserializeSMqHbReq(void *buf, int32_t bufLen, SMqHbReq *pReq) {
} }
} }
} }
if (!tDecodeIsEnd(&decoder)) {
if (tDecodeI8(&decoder, &pReq->pollFlag) < 0) return -1;
}
tEndDecode(&decoder); tEndDecode(&decoder);
tDecoderClear(&decoder); tDecoderClear(&decoder);

View File

@ -596,11 +596,12 @@ typedef struct {
typedef struct { typedef struct {
int64_t consumerId; int64_t consumerId;
char cgroup[TSDB_CGROUP_LEN]; char cgroup[TSDB_CGROUP_LEN];
char clientId[256]; char clientId[TSDB_CLIENT_ID_LEN];
int8_t updateType; // used only for update int8_t updateType; // used only for update
int32_t epoch; int32_t epoch;
int32_t status; int32_t status;
int32_t hbStatus; // hbStatus is not applicable to serialization int32_t hbStatus; // hbStatus is not applicable to serialization
int32_t pollStatus; // pollStatus is not applicable to serialization
SRWLatch lock; // lock is used for topics update SRWLatch lock; // lock is used for topics update
SArray* currentTopics; // SArray<char*> SArray* currentTopics; // SArray<char*>
SArray* rebNewTopics; // SArray<char*> SArray* rebNewTopics; // SArray<char*>
@ -620,6 +621,8 @@ typedef struct {
int8_t autoCommit; int8_t autoCommit;
int32_t autoCommitInterval; int32_t autoCommitInterval;
int32_t resetOffsetCfg; int32_t resetOffsetCfg;
int32_t sessionTimeoutMs;
int32_t maxPollIntervalMs;
} SMqConsumerObj; } SMqConsumerObj;
SMqConsumerObj *tNewSMqConsumerObj(int64_t consumerId, char *cgroup, int8_t updateType, char *topic, SCMSubscribeReq *subscribe); SMqConsumerObj *tNewSMqConsumerObj(int64_t consumerId, char *cgroup, int8_t updateType, char *topic, SCMSubscribeReq *subscribe);

View File

@ -25,7 +25,7 @@
#include "tcompare.h" #include "tcompare.h"
#include "tname.h" #include "tname.h"
#define MND_CONSUMER_VER_NUMBER 2 #define MND_CONSUMER_VER_NUMBER 3
#define MND_CONSUMER_RESERVE_SIZE 64 #define MND_CONSUMER_RESERVE_SIZE 64
#define MND_MAX_GROUP_PER_TOPIC 100 #define MND_MAX_GROUP_PER_TOPIC 100
@ -40,7 +40,7 @@ static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg);
static int32_t mndProcessAskEpReq(SRpcMsg *pMsg); static int32_t mndProcessAskEpReq(SRpcMsg *pMsg);
static int32_t mndProcessMqHbReq(SRpcMsg *pMsg); static int32_t mndProcessMqHbReq(SRpcMsg *pMsg);
static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg); static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg);
static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg); //static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg);
int32_t mndInitConsumer(SMnode *pMnode) { int32_t mndInitConsumer(SMnode *pMnode) {
SSdbTable table = { SSdbTable table = {
@ -57,7 +57,7 @@ int32_t mndInitConsumer(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_MND_TMQ_HB, mndProcessMqHbReq); mndSetMsgHandle(pMnode, TDMT_MND_TMQ_HB, mndProcessMqHbReq);
mndSetMsgHandle(pMnode, TDMT_MND_TMQ_ASK_EP, mndProcessAskEpReq); mndSetMsgHandle(pMnode, TDMT_MND_TMQ_ASK_EP, mndProcessAskEpReq);
// mndSetMsgHandle(pMnode, TDMT_MND_TMQ_TIMER, mndProcessMqTimerMsg); // mndSetMsgHandle(pMnode, TDMT_MND_TMQ_TIMER, mndProcessMqTimerMsg);
mndSetMsgHandle(pMnode, TDMT_MND_TMQ_CONSUMER_RECOVER, mndProcessConsumerRecoverMsg); // mndSetMsgHandle(pMnode, TDMT_MND_TMQ_CONSUMER_RECOVER, mndProcessConsumerRecoverMsg);
mndSetMsgHandle(pMnode, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, mndProcessConsumerClearMsg); mndSetMsgHandle(pMnode, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, mndProcessConsumerClearMsg);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONSUMERS, mndRetrieveConsumer); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONSUMERS, mndRetrieveConsumer);
@ -144,56 +144,56 @@ FAILED:
return code; return code;
} }
static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) { //static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) {
int32_t code = 0; // int32_t code = 0;
SMnode *pMnode = pMsg->info.node; // SMnode *pMnode = pMsg->info.node;
SMqConsumerRecoverMsg *pRecoverMsg = pMsg->pCont; // SMqConsumerRecoverMsg *pRecoverMsg = pMsg->pCont;
SMqConsumerObj *pConsumerNew = NULL; // SMqConsumerObj *pConsumerNew = NULL;
STrans *pTrans = NULL; // STrans *pTrans = NULL;
SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, pRecoverMsg->consumerId); // SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, pRecoverMsg->consumerId);
if (pConsumer == NULL) { // if (pConsumer == NULL) {
mError("cannot find consumer %" PRId64 " when processing consumer recover msg", pRecoverMsg->consumerId); // mError("cannot find consumer %" PRId64 " when processing consumer recover msg", pRecoverMsg->consumerId);
code = -1; // code = -1;
goto END; // goto END;
} // }
//
mInfo("receive consumer recover msg, consumer:0x%" PRIx64 " status:%d(%s)", pRecoverMsg->consumerId, // mInfo("receive consumer recover msg, consumer:0x%" PRIx64 " status:%d(%s)", pRecoverMsg->consumerId,
pConsumer->status, mndConsumerStatusName(pConsumer->status)); // pConsumer->status, mndConsumerStatusName(pConsumer->status));
//
if (pConsumer->status != MQ_CONSUMER_STATUS_LOST) { // if (pConsumer->status != MQ_CONSUMER_STATUS_LOST) {
terrno = TSDB_CODE_MND_CONSUMER_NOT_READY; // terrno = TSDB_CODE_MND_CONSUMER_NOT_READY;
code = -1; // code = -1;
goto END; // goto END;
} // }
//
pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup, CONSUMER_UPDATE_REC, NULL, NULL); // pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup, CONSUMER_UPDATE_REC, NULL, NULL);
if (pConsumerNew == NULL){ // if (pConsumerNew == NULL){
code = -1; // code = -1;
goto END; // goto END;
} // }
//
pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "recover-csm"); // pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "recover-csm");
if (pTrans == NULL) { // if (pTrans == NULL) {
code = -1; // code = -1;
goto END; // goto END;
} // }
code = validateTopics(pConsumer->assignedTopics, pMnode, pMsg->info.conn.user, false); // code = validateTopics(pConsumer->assignedTopics, pMnode, pMsg->info.conn.user, false);
if (code != 0) { // if (code != 0) {
goto END; // goto END;
} // }
//
code = mndSetConsumerCommitLogs(pTrans, pConsumerNew); // code = mndSetConsumerCommitLogs(pTrans, pConsumerNew);
if (code != 0) { // if (code != 0) {
goto END; // goto END;
} // }
//
code = mndTransPrepare(pMnode, pTrans); // code = mndTransPrepare(pMnode, pTrans);
END: //END:
mndReleaseConsumer(pMnode, pConsumer); // mndReleaseConsumer(pMnode, pConsumer);
tDeleteSMqConsumerObj(pConsumerNew); // tDeleteSMqConsumerObj(pConsumerNew);
mndTransDrop(pTrans); // mndTransDrop(pTrans);
return code; // return code;
} //}
static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg) { static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg) {
int32_t code = 0; int32_t code = 0;
@ -328,13 +328,15 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
} }
atomic_store_32(&pConsumer->hbStatus, 0); atomic_store_32(&pConsumer->hbStatus, 0);
if (req.pollFlag == 1){
int32_t status = atomic_load_32(&pConsumer->status); atomic_store_32(&pConsumer->pollStatus, 0);
if (status == MQ_CONSUMER_STATUS_LOST) {
mInfo("try to recover consumer:0x%" PRIx64, consumerId);
mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_CONSUMER_RECOVER, &pMsg->info);
} }
// int32_t status = atomic_load_32(&pConsumer->status);
//
// if (status == MQ_CONSUMER_STATUS_LOST) {
// mInfo("try to recover consumer:0x%" PRIx64, consumerId);
// mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_CONSUMER_RECOVER, &pMsg->info);
// }
storeOffsetRows(pMnode, &req, pConsumer); storeOffsetRows(pMnode, &req, pConsumer);
code = buildMqHbRsp(pMsg, &rsp); code = buildMqHbRsp(pMsg, &rsp);
@ -480,14 +482,12 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
goto END; goto END;
} }
atomic_store_32(&pConsumer->hbStatus, 0);
// 1. check consumer status // 1. check consumer status
int32_t status = atomic_load_32(&pConsumer->status); int32_t status = atomic_load_32(&pConsumer->status);
//
if (status == MQ_CONSUMER_STATUS_LOST) { // if (status == MQ_CONSUMER_STATUS_LOST) {
mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_CONSUMER_RECOVER, &pMsg->info); // mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_CONSUMER_RECOVER, &pMsg->info);
} // }
if (status != MQ_CONSUMER_STATUS_READY) { if (status != MQ_CONSUMER_STATUS_READY) {
mInfo("consumer:0x%" PRIx64 " not ready, status: %s", consumerId, mndConsumerStatusName(status)); mInfo("consumer:0x%" PRIx64 " not ready, status: %s", consumerId, mndConsumerStatusName(status));
@ -652,7 +652,7 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
int32_t code = 0; int32_t code = 0;
SCMSubscribeReq subscribe = {0}; SCMSubscribeReq subscribe = {0};
tDeserializeSCMSubscribeReq(msgStr, &subscribe); tDeserializeSCMSubscribeReq(msgStr, &subscribe, pMsg->contLen);
SMqConsumerObj *pConsumerNew = NULL; SMqConsumerObj *pConsumerNew = NULL;
STrans *pTrans = NULL; STrans *pTrans = NULL;
@ -806,17 +806,17 @@ static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer) {
return 0; return 0;
} }
static void updateConsumerStatus(SMqConsumerObj *pConsumer) { //static void updateConsumerStatus(SMqConsumerObj *pConsumer) {
int32_t status = pConsumer->status; // int32_t status = pConsumer->status;
//
if (taosArrayGetSize(pConsumer->rebNewTopics) == 0 && taosArrayGetSize(pConsumer->rebRemovedTopics) == 0) { // if (taosArrayGetSize(pConsumer->rebNewTopics) == 0 && taosArrayGetSize(pConsumer->rebRemovedTopics) == 0) {
if (status == MQ_CONSUMER_STATUS_REBALANCE) { // if (status == MQ_CONSUMER_STATUS_REBALANCE) {
pConsumer->status = MQ_CONSUMER_STATUS_READY; // pConsumer->status = MQ_CONSUMER_STATUS_READY;
} else if (status == MQ_CONSUMER_STATUS_READY && taosArrayGetSize(pConsumer->currentTopics) == 0) { // } else if (status == MQ_CONSUMER_STATUS_READY && taosArrayGetSize(pConsumer->currentTopics) == 0) {
pConsumer->status = MQ_CONSUMER_STATUS_LOST; // pConsumer->status = MQ_CONSUMER_STATUS_LOST;
} // }
} // }
} //}
// remove from topic list // remove from topic list
static void removeFromTopicList(SArray *topicList, const char *pTopic, int64_t consumerId, char *type) { static void removeFromTopicList(SArray *topicList, const char *pTopic, int64_t consumerId, char *type) {
@ -863,14 +863,14 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
pOldConsumer->subscribeTime = taosGetTimestampMs(); pOldConsumer->subscribeTime = taosGetTimestampMs();
pOldConsumer->status = MQ_CONSUMER_STATUS_REBALANCE; pOldConsumer->status = MQ_CONSUMER_STATUS_REBALANCE;
mInfo("consumer:0x%" PRIx64 " subscribe update, modify existed consumer", pOldConsumer->consumerId); mInfo("consumer:0x%" PRIx64 " subscribe update, modify existed consumer", pOldConsumer->consumerId);
} else if (pNewConsumer->updateType == CONSUMER_UPDATE_REC) { // } else if (pNewConsumer->updateType == CONSUMER_UPDATE_REC) {
int32_t sz = taosArrayGetSize(pOldConsumer->assignedTopics); // int32_t sz = taosArrayGetSize(pOldConsumer->assignedTopics);
for (int32_t i = 0; i < sz; i++) { // for (int32_t i = 0; i < sz; i++) {
char *topic = taosStrdup(taosArrayGetP(pOldConsumer->assignedTopics, i)); // char *topic = taosStrdup(taosArrayGetP(pOldConsumer->assignedTopics, i));
taosArrayPush(pOldConsumer->rebNewTopics, &topic); // taosArrayPush(pOldConsumer->rebNewTopics, &topic);
} // }
pOldConsumer->status = MQ_CONSUMER_STATUS_REBALANCE; // pOldConsumer->status = MQ_CONSUMER_STATUS_REBALANCE;
mInfo("consumer:0x%" PRIx64 " recover update", pOldConsumer->consumerId); // mInfo("consumer:0x%" PRIx64 " recover update", pOldConsumer->consumerId);
} else if (pNewConsumer->updateType == CONSUMER_UPDATE_REB) { } else if (pNewConsumer->updateType == CONSUMER_UPDATE_REB) {
atomic_add_fetch_32(&pOldConsumer->epoch, 1); atomic_add_fetch_32(&pOldConsumer->epoch, 1);
@ -889,7 +889,11 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
} }
int32_t status = pOldConsumer->status; int32_t status = pOldConsumer->status;
updateConsumerStatus(pOldConsumer); // updateConsumerStatus(pOldConsumer);
if (taosArrayGetSize(pOldConsumer->rebNewTopics) == 0 && taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0) {
pOldConsumer->status = MQ_CONSUMER_STATUS_READY;
}
pOldConsumer->rebalanceTime = taosGetTimestampMs(); pOldConsumer->rebalanceTime = taosGetTimestampMs();
atomic_add_fetch_32(&pOldConsumer->epoch, 1); atomic_add_fetch_32(&pOldConsumer->epoch, 1);
@ -906,7 +910,10 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
removeFromTopicList(pOldConsumer->currentTopics, topic, pOldConsumer->consumerId, "current"); removeFromTopicList(pOldConsumer->currentTopics, topic, pOldConsumer->consumerId, "current");
int32_t status = pOldConsumer->status; int32_t status = pOldConsumer->status;
updateConsumerStatus(pOldConsumer); // updateConsumerStatus(pOldConsumer);
if (taosArrayGetSize(pOldConsumer->rebNewTopics) == 0 && taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0) {
pOldConsumer->status = MQ_CONSUMER_STATUS_READY;
}
pOldConsumer->rebalanceTime = taosGetTimestampMs(); pOldConsumer->rebalanceTime = taosGetTimestampMs();
atomic_add_fetch_32(&pOldConsumer->epoch, 1); atomic_add_fetch_32(&pOldConsumer->epoch, 1);
@ -973,7 +980,7 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *
int32_t cols = 0; int32_t cols = 0;
// consumer id // consumer id
char consumerIdHex[32] = {0}; char consumerIdHex[TSDB_CONSUMER_ID_LEN + VARSTR_HEADER_SIZE] = {0};
sprintf(varDataVal(consumerIdHex), "0x%" PRIx64, pConsumer->consumerId); sprintf(varDataVal(consumerIdHex), "0x%" PRIx64, pConsumer->consumerId);
varDataSetLen(consumerIdHex, strlen(varDataVal(consumerIdHex))); varDataSetLen(consumerIdHex, strlen(varDataVal(consumerIdHex)));
@ -988,19 +995,20 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *
colDataSetVal(pColInfo, numOfRows, (const char *)cgroup, false); colDataSetVal(pColInfo, numOfRows, (const char *)cgroup, false);
// client id // client id
char clientId[256 + VARSTR_HEADER_SIZE] = {0}; char clientId[TSDB_CLIENT_ID_LEN + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(clientId, pConsumer->clientId); STR_TO_VARSTR(clientId, pConsumer->clientId);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)clientId, false); colDataSetVal(pColInfo, numOfRows, (const char *)clientId, false);
// status // status
char status[20 + VARSTR_HEADER_SIZE] = {0};
const char *pStatusName = mndConsumerStatusName(pConsumer->status); const char *pStatusName = mndConsumerStatusName(pConsumer->status);
char *status = taosMemoryCalloc(1, pShow->pMeta->pSchemas[cols].bytes);
STR_TO_VARSTR(status, pStatusName); STR_TO_VARSTR(status, pStatusName);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)status, false); colDataSetVal(pColInfo, numOfRows, (const char *)status, false);
taosMemoryFree(status);
// one subscribed topic // one subscribed topic
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
@ -1033,14 +1041,14 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *
STqOffsetVal pVal = {.type = pConsumer->resetOffsetCfg}; STqOffsetVal pVal = {.type = pConsumer->resetOffsetCfg};
tFormatOffset(buf, TSDB_OFFSET_LEN, &pVal); tFormatOffset(buf, TSDB_OFFSET_LEN, &pVal);
char parasStr[64 + TSDB_OFFSET_LEN + VARSTR_HEADER_SIZE] = {0}; char *parasStr = taosMemoryCalloc(1, pShow->pMeta->pSchemas[cols].bytes);
sprintf(varDataVal(parasStr), "tbname:%d,commit:%d,interval:%dms,reset:%s", pConsumer->withTbName, sprintf(varDataVal(parasStr), "tbname:%d,commit:%d,interval:%dms,reset:%s,maxPoll:%d,timeout:%d", pConsumer->withTbName,
pConsumer->autoCommit, pConsumer->autoCommitInterval, buf); pConsumer->autoCommit, pConsumer->autoCommitInterval, buf, pConsumer->maxPollIntervalMs, pConsumer->sessionTimeoutMs);
varDataSetLen(parasStr, strlen(varDataVal(parasStr))); varDataSetLen(parasStr, strlen(varDataVal(parasStr)));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)parasStr, false); colDataSetVal(pColInfo, numOfRows, (const char *)parasStr, false);
taosMemoryFree(parasStr);
numOfRows++; numOfRows++;
} }
@ -1063,8 +1071,8 @@ const char *mndConsumerStatusName(int status) {
switch (status) { switch (status) {
case MQ_CONSUMER_STATUS_READY: case MQ_CONSUMER_STATUS_READY:
return "ready"; return "ready";
case MQ_CONSUMER_STATUS_LOST: // case MQ_CONSUMER_STATUS_LOST:
return "lost"; // return "lost";
case MQ_CONSUMER_STATUS_REBALANCE: case MQ_CONSUMER_STATUS_REBALANCE:
return "rebalancing"; return "rebalancing";
default: default:

View File

@ -266,6 +266,7 @@ SMqConsumerObj *tNewSMqConsumerObj(int64_t consumerId, char *cgroup, int8_t upda
pConsumer->epoch = 0; pConsumer->epoch = 0;
pConsumer->status = MQ_CONSUMER_STATUS_REBALANCE; pConsumer->status = MQ_CONSUMER_STATUS_REBALANCE;
pConsumer->hbStatus = 0; pConsumer->hbStatus = 0;
pConsumer->pollStatus = 0;
taosInitRWLatch(&pConsumer->lock); taosInitRWLatch(&pConsumer->lock);
pConsumer->createTime = taosGetTimestampMs(); pConsumer->createTime = taosGetTimestampMs();
@ -294,6 +295,8 @@ SMqConsumerObj *tNewSMqConsumerObj(int64_t consumerId, char *cgroup, int8_t upda
pConsumer->autoCommit = subscribe->autoCommit; pConsumer->autoCommit = subscribe->autoCommit;
pConsumer->autoCommitInterval = subscribe->autoCommitInterval; pConsumer->autoCommitInterval = subscribe->autoCommitInterval;
pConsumer->resetOffsetCfg = subscribe->resetOffsetCfg; pConsumer->resetOffsetCfg = subscribe->resetOffsetCfg;
pConsumer->maxPollIntervalMs = subscribe->maxPollIntervalMs;
pConsumer->sessionTimeoutMs = subscribe->sessionTimeoutMs;
pConsumer->rebNewTopics = taosArrayDup(subscribe->topicNames, topicNameDup); pConsumer->rebNewTopics = taosArrayDup(subscribe->topicNames, topicNameDup);
@ -396,6 +399,8 @@ int32_t tEncodeSMqConsumerObj(void **buf, const SMqConsumerObj *pConsumer) {
tlen += taosEncodeFixedI8(buf, pConsumer->autoCommit); tlen += taosEncodeFixedI8(buf, pConsumer->autoCommit);
tlen += taosEncodeFixedI32(buf, pConsumer->autoCommitInterval); tlen += taosEncodeFixedI32(buf, pConsumer->autoCommitInterval);
tlen += taosEncodeFixedI32(buf, pConsumer->resetOffsetCfg); tlen += taosEncodeFixedI32(buf, pConsumer->resetOffsetCfg);
tlen += taosEncodeFixedI32(buf, pConsumer->maxPollIntervalMs);
tlen += taosEncodeFixedI32(buf, pConsumer->sessionTimeoutMs);
return tlen; return tlen;
} }
@ -456,6 +461,14 @@ void *tDecodeSMqConsumerObj(const void *buf, SMqConsumerObj *pConsumer, int8_t s
buf = taosDecodeFixedI32(buf, &pConsumer->autoCommitInterval); buf = taosDecodeFixedI32(buf, &pConsumer->autoCommitInterval);
buf = taosDecodeFixedI32(buf, &pConsumer->resetOffsetCfg); buf = taosDecodeFixedI32(buf, &pConsumer->resetOffsetCfg);
} }
if (sver > 2){
buf = taosDecodeFixedI32(buf, &pConsumer->maxPollIntervalMs);
buf = taosDecodeFixedI32(buf, &pConsumer->sessionTimeoutMs);
} else{
pConsumer->maxPollIntervalMs = DEFAULT_MAX_POLL_INTERVAL;
pConsumer->sessionTimeoutMs = DEFAULT_SESSION_TIMEOUT;
}
return (void *)buf; return (void *)buf;
} }

View File

@ -27,8 +27,7 @@
#define MND_SUBSCRIBE_VER_NUMBER 3 #define MND_SUBSCRIBE_VER_NUMBER 3
#define MND_SUBSCRIBE_RESERVE_SIZE 64 #define MND_SUBSCRIBE_RESERVE_SIZE 64
#define MND_CONSUMER_LOST_HB_CNT 6 //#define MND_CONSUMER_LOST_HB_CNT 6
#define MND_CONSUMER_LOST_CLEAR_THRESHOLD 43200
static int32_t mqRebInExecCnt = 0; static int32_t mqRebInExecCnt = 0;
@ -234,7 +233,7 @@ static void processRemovedConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, c
int32_t numOfRemoved = taosArrayGetSize(pInput->pRebInfo->removedConsumers); int32_t numOfRemoved = taosArrayGetSize(pInput->pRebInfo->removedConsumers);
int32_t actualRemoved = 0; int32_t actualRemoved = 0;
for (int32_t i = 0; i < numOfRemoved; i++) { for (int32_t i = 0; i < numOfRemoved; i++) {
uint64_t consumerId = *(uint64_t *)taosArrayGet(pInput->pRebInfo->removedConsumers, i); int64_t consumerId = *(int64_t *)taosArrayGet(pInput->pRebInfo->removedConsumers, i);
SMqConsumerEp *pConsumerEp = taosHashGet(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t)); SMqConsumerEp *pConsumerEp = taosHashGet(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t));
if (pConsumerEp == NULL) { if (pConsumerEp == NULL) {
continue; continue;
@ -378,12 +377,10 @@ static int32_t processRemoveAddVgs(SMnode *pMnode, SMqRebOutputObj *pOutput) {
} }
} }
if (taosArrayGetSize(pOutput->pSub->unassignedVgs) == 0 && taosArrayGetSize(newVgs) != 0) { if (taosArrayGetSize(newVgs) != 0) {
taosArrayAddAll(pOutput->pSub->unassignedVgs, newVgs); taosArrayAddAll(pOutput->pSub->unassignedVgs, newVgs);
mInfo("[rebalance] processRemoveAddVgs add new vg num:%d", (int)taosArrayGetSize(newVgs)); mInfo("[rebalance] processRemoveAddVgs add new vg num:%d", (int)taosArrayGetSize(newVgs));
taosArrayDestroy(newVgs); taosArrayDestroy(newVgs);
} else {
taosArrayDestroyP(newVgs, (FDelete)tDeleteSMqVgEp);
} }
return totalVgNum; return totalVgNum;
} }
@ -678,7 +675,7 @@ static void freeRebalanceItem(void *param) {
static void buildRebInfo(SHashObj *rebSubHash, SArray *topicList, int8_t type, char *group, int64_t consumerId) { static void buildRebInfo(SHashObj *rebSubHash, SArray *topicList, int8_t type, char *group, int64_t consumerId) {
int32_t topicNum = taosArrayGetSize(topicList); int32_t topicNum = taosArrayGetSize(topicList);
for (int32_t i = 0; i < topicNum; i++) { for (int32_t i = 0; i < topicNum; i++) {
char key[TSDB_SUBSCRIBE_KEY_LEN]; char key[TSDB_SUBSCRIBE_KEY_LEN] = {0};
char *removedTopic = taosArrayGetP(topicList, i); char *removedTopic = taosArrayGetP(topicList, i);
mndMakeSubscribeKey(key, group, removedTopic); mndMakeSubscribeKey(key, group, removedTopic);
SMqRebInfo *pRebSub = mndGetOrCreateRebSub(rebSubHash, key); SMqRebInfo *pRebSub = mndGetOrCreateRebSub(rebSubHash, key);
@ -707,7 +704,7 @@ static void checkForVgroupSplit(SMnode *pMnode, SMqConsumerObj *pConsumer, SHash
SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j); SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j);
SVgObj *pVgroup = mndAcquireVgroup(pMnode, pVgEp->vgId); SVgObj *pVgroup = mndAcquireVgroup(pMnode, pVgEp->vgId);
if (!pVgroup) { if (!pVgroup) {
char key[TSDB_SUBSCRIBE_KEY_LEN]; char key[TSDB_SUBSCRIBE_KEY_LEN] = {0};
mndMakeSubscribeKey(key, pConsumer->cgroup, topic); mndMakeSubscribeKey(key, pConsumer->cgroup, topic);
mndGetOrCreateRebSub(rebSubHash, key); mndGetOrCreateRebSub(rebSubHash, key);
mInfo("vnode splitted, vgId:%d rebalance will be triggered", pVgEp->vgId); mInfo("vnode splitted, vgId:%d rebalance will be triggered", pVgEp->vgId);
@ -733,27 +730,25 @@ static void mndCheckConsumer(SRpcMsg *pMsg, SHashObj *rebSubHash) {
} }
int32_t hbStatus = atomic_add_fetch_32(&pConsumer->hbStatus, 1); int32_t hbStatus = atomic_add_fetch_32(&pConsumer->hbStatus, 1);
int32_t pollStatus = atomic_add_fetch_32(&pConsumer->pollStatus, 1);
int32_t status = atomic_load_32(&pConsumer->status); int32_t status = atomic_load_32(&pConsumer->status);
mDebug("[rebalance] check for consumer:0x%" PRIx64 " status:%d(%s), sub-time:%" PRId64 ", createTime:%" PRId64 mDebug("[rebalance] check for consumer:0x%" PRIx64 " status:%d(%s), sub-time:%" PRId64 ", createTime:%" PRId64
", hbstatus:%d", ", hbstatus:%d, pollStatus:%d",
pConsumer->consumerId, status, mndConsumerStatusName(status), pConsumer->subscribeTime, pConsumer->consumerId, status, mndConsumerStatusName(status), pConsumer->subscribeTime,
pConsumer->createTime, hbStatus); pConsumer->createTime, hbStatus, pollStatus);
if (status == MQ_CONSUMER_STATUS_READY) { if (status == MQ_CONSUMER_STATUS_READY) {
if (taosArrayGetSize(pConsumer->assignedTopics) == 0) { // unsubscribe or close if (taosArrayGetSize(pConsumer->currentTopics) == 0) { // unsubscribe or close
mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, &pMsg->info); mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, &pMsg->info);
} else if (hbStatus > MND_CONSUMER_LOST_HB_CNT) { } else if (hbStatus * tsMqRebalanceInterval * 1000 >= pConsumer->sessionTimeoutMs ||
pollStatus * tsMqRebalanceInterval * 1000 >= pConsumer->maxPollIntervalMs) {
taosRLockLatch(&pConsumer->lock); taosRLockLatch(&pConsumer->lock);
buildRebInfo(rebSubHash, pConsumer->currentTopics, 0, pConsumer->cgroup, pConsumer->consumerId); buildRebInfo(rebSubHash, pConsumer->currentTopics, 0, pConsumer->cgroup, pConsumer->consumerId);
taosRUnLockLatch(&pConsumer->lock); taosRUnLockLatch(&pConsumer->lock);
} else { } else {
checkForVgroupSplit(pMnode, pConsumer, rebSubHash); checkForVgroupSplit(pMnode, pConsumer, rebSubHash);
} }
} else if (status == MQ_CONSUMER_STATUS_LOST) {
if (hbStatus > MND_CONSUMER_LOST_CLEAR_THRESHOLD) { // clear consumer if lost a day
mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, &pMsg->info);
}
} else { } else {
taosRLockLatch(&pConsumer->lock); taosRLockLatch(&pConsumer->lock);
buildRebInfo(rebSubHash, pConsumer->rebNewTopics, 1, pConsumer->cgroup, pConsumer->consumerId); buildRebInfo(rebSubHash, pConsumer->rebNewTopics, 1, pConsumer->cgroup, pConsumer->consumerId);
@ -832,8 +827,8 @@ static int32_t buildRebOutput(SMnode *pMnode, SMqRebInputObj *rebInput, SMqRebOu
if (pSub == NULL) { if (pSub == NULL) {
// split sub key and extract topic // split sub key and extract topic
char topic[TSDB_TOPIC_FNAME_LEN]; char topic[TSDB_TOPIC_FNAME_LEN] = {0};
char cgroup[TSDB_CGROUP_LEN]; char cgroup[TSDB_CGROUP_LEN] = {0};
mndSplitSubscribeKey(key, topic, cgroup, true); mndSplitSubscribeKey(key, topic, cgroup, true);
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic); SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);
@ -878,7 +873,7 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
SMnode *pMnode = pMsg->info.node; SMnode *pMnode = pMsg->info.node;
mDebug("[rebalance] start to process mq timer") mDebug("[rebalance] start to process mq timer")
if (!mndRebTryStart()) { if (!mndRebTryStart()) {
mInfo("[rebalance] mq rebalance already in progress, do nothing") return code; mInfo("[rebalance] mq rebalance already in progress, do nothing") return code;
} }