Merge pull request #26522 from taosdata/feat/TS-4592
feat:[TS-4592]remove lost status for consumer
This commit is contained in:
commit
7d82036903
|
@ -2810,6 +2810,9 @@ enum {
|
||||||
TOPIC_SUB_TYPE__COLUMN,
|
TOPIC_SUB_TYPE__COLUMN,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
#define DEFAULT_MAX_POLL_INTERVAL 3000000
|
||||||
|
#define DEFAULT_SESSION_TIMEOUT 12000
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
char name[TSDB_TOPIC_FNAME_LEN]; // accout.topic
|
char name[TSDB_TOPIC_FNAME_LEN]; // accout.topic
|
||||||
int8_t igExists;
|
int8_t igExists;
|
||||||
|
@ -2832,7 +2835,7 @@ typedef struct {
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int64_t consumerId;
|
int64_t consumerId;
|
||||||
char cgroup[TSDB_CGROUP_LEN];
|
char cgroup[TSDB_CGROUP_LEN];
|
||||||
char clientId[256];
|
char clientId[TSDB_CLIENT_ID_LEN];
|
||||||
SArray* topicNames; // SArray<char**>
|
SArray* topicNames; // SArray<char**>
|
||||||
|
|
||||||
int8_t withTbName;
|
int8_t withTbName;
|
||||||
|
@ -2841,6 +2844,8 @@ typedef struct {
|
||||||
int8_t resetOffsetCfg;
|
int8_t resetOffsetCfg;
|
||||||
int8_t enableReplay;
|
int8_t enableReplay;
|
||||||
int8_t enableBatchMeta;
|
int8_t enableBatchMeta;
|
||||||
|
int32_t sessionTimeoutMs;
|
||||||
|
int32_t maxPollIntervalMs;
|
||||||
} SCMSubscribeReq;
|
} SCMSubscribeReq;
|
||||||
|
|
||||||
static FORCE_INLINE int32_t tSerializeSCMSubscribeReq(void** buf, const SCMSubscribeReq* pReq) {
|
static FORCE_INLINE int32_t tSerializeSCMSubscribeReq(void** buf, const SCMSubscribeReq* pReq) {
|
||||||
|
@ -2862,11 +2867,14 @@ static FORCE_INLINE int32_t tSerializeSCMSubscribeReq(void** buf, const SCMSubsc
|
||||||
tlen += taosEncodeFixedI8(buf, pReq->resetOffsetCfg);
|
tlen += taosEncodeFixedI8(buf, pReq->resetOffsetCfg);
|
||||||
tlen += taosEncodeFixedI8(buf, pReq->enableReplay);
|
tlen += taosEncodeFixedI8(buf, pReq->enableReplay);
|
||||||
tlen += taosEncodeFixedI8(buf, pReq->enableBatchMeta);
|
tlen += taosEncodeFixedI8(buf, pReq->enableBatchMeta);
|
||||||
|
tlen += taosEncodeFixedI32(buf, pReq->sessionTimeoutMs);
|
||||||
|
tlen += taosEncodeFixedI32(buf, pReq->maxPollIntervalMs);
|
||||||
|
|
||||||
return tlen;
|
return tlen;
|
||||||
}
|
}
|
||||||
|
|
||||||
static FORCE_INLINE int32_t tDeserializeSCMSubscribeReq(void* buf, SCMSubscribeReq* pReq) {
|
static FORCE_INLINE int32_t tDeserializeSCMSubscribeReq(void* buf, SCMSubscribeReq* pReq, int32_t len) {
|
||||||
|
void* start = buf;
|
||||||
buf = taosDecodeFixedI64(buf, &pReq->consumerId);
|
buf = taosDecodeFixedI64(buf, &pReq->consumerId);
|
||||||
buf = taosDecodeStringTo(buf, pReq->cgroup);
|
buf = taosDecodeStringTo(buf, pReq->cgroup);
|
||||||
buf = taosDecodeStringTo(buf, pReq->clientId);
|
buf = taosDecodeStringTo(buf, pReq->clientId);
|
||||||
|
@ -2892,6 +2900,14 @@ static FORCE_INLINE int32_t tDeserializeSCMSubscribeReq(void* buf, SCMSubscribeR
|
||||||
buf = taosDecodeFixedI8(buf, &pReq->resetOffsetCfg);
|
buf = taosDecodeFixedI8(buf, &pReq->resetOffsetCfg);
|
||||||
buf = taosDecodeFixedI8(buf, &pReq->enableReplay);
|
buf = taosDecodeFixedI8(buf, &pReq->enableReplay);
|
||||||
buf = taosDecodeFixedI8(buf, &pReq->enableBatchMeta);
|
buf = taosDecodeFixedI8(buf, &pReq->enableBatchMeta);
|
||||||
|
if ((char*)buf - (char*)start < len) {
|
||||||
|
buf = taosDecodeFixedI32(buf, &pReq->sessionTimeoutMs);
|
||||||
|
buf = taosDecodeFixedI32(buf, &pReq->maxPollIntervalMs);
|
||||||
|
} else {
|
||||||
|
pReq->sessionTimeoutMs = DEFAULT_SESSION_TIMEOUT;
|
||||||
|
pReq->maxPollIntervalMs = DEFAULT_MAX_POLL_INTERVAL;
|
||||||
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4119,6 +4135,7 @@ typedef struct {
|
||||||
int64_t consumerId;
|
int64_t consumerId;
|
||||||
int32_t epoch;
|
int32_t epoch;
|
||||||
SArray* topics;
|
SArray* topics;
|
||||||
|
int8_t pollFlag;
|
||||||
} SMqHbReq;
|
} SMqHbReq;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
|
|
@ -221,6 +221,8 @@ typedef enum ELogicConditionType {
|
||||||
#define TSDB_TABLE_NAME_LEN 193 // it is a null-terminated string
|
#define TSDB_TABLE_NAME_LEN 193 // it is a null-terminated string
|
||||||
#define TSDB_TOPIC_NAME_LEN 193 // it is a null-terminated string
|
#define TSDB_TOPIC_NAME_LEN 193 // it is a null-terminated string
|
||||||
#define TSDB_CGROUP_LEN 193 // it is a null-terminated string
|
#define TSDB_CGROUP_LEN 193 // it is a null-terminated string
|
||||||
|
#define TSDB_CLIENT_ID_LEN 256 // it is a null-terminated string
|
||||||
|
#define TSDB_CONSUMER_ID_LEN 32 // it is a null-terminated string
|
||||||
#define TSDB_OFFSET_LEN 64 // it is a null-terminated string
|
#define TSDB_OFFSET_LEN 64 // it is a null-terminated string
|
||||||
#define TSDB_USER_CGROUP_LEN (TSDB_USER_LEN + TSDB_CGROUP_LEN) // it is a null-terminated string
|
#define TSDB_USER_CGROUP_LEN (TSDB_USER_LEN + TSDB_CGROUP_LEN) // it is a null-terminated string
|
||||||
#define TSDB_STREAM_NAME_LEN 193 // it is a null-terminated string
|
#define TSDB_STREAM_NAME_LEN 193 // it is a null-terminated string
|
||||||
|
|
|
@ -37,6 +37,7 @@ struct SMqMgmt {
|
||||||
static TdThreadOnce tmqInit = PTHREAD_ONCE_INIT; // initialize only once
|
static TdThreadOnce tmqInit = PTHREAD_ONCE_INIT; // initialize only once
|
||||||
volatile int32_t tmqInitRes = 0; // initialize rsp code
|
volatile int32_t tmqInitRes = 0; // initialize rsp code
|
||||||
static struct SMqMgmt tmqMgmt = {0};
|
static struct SMqMgmt tmqMgmt = {0};
|
||||||
|
static int8_t pollFlag = 0;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t code;
|
int32_t code;
|
||||||
|
@ -56,7 +57,7 @@ struct tmq_list_t {
|
||||||
};
|
};
|
||||||
|
|
||||||
struct tmq_conf_t {
|
struct tmq_conf_t {
|
||||||
char clientId[256];
|
char clientId[TSDB_CLIENT_ID_LEN];
|
||||||
char groupId[TSDB_CGROUP_LEN];
|
char groupId[TSDB_CGROUP_LEN];
|
||||||
int8_t autoCommit;
|
int8_t autoCommit;
|
||||||
int8_t resetOffset;
|
int8_t resetOffset;
|
||||||
|
@ -66,6 +67,9 @@ struct tmq_conf_t {
|
||||||
int8_t sourceExcluded; // do not consume, bit
|
int8_t sourceExcluded; // do not consume, bit
|
||||||
uint16_t port;
|
uint16_t port;
|
||||||
int32_t autoCommitInterval;
|
int32_t autoCommitInterval;
|
||||||
|
int32_t sessionTimeoutMs;
|
||||||
|
int32_t heartBeatIntervalMs;
|
||||||
|
int32_t maxPollIntervalMs;
|
||||||
char* ip;
|
char* ip;
|
||||||
char* user;
|
char* user;
|
||||||
char* pass;
|
char* pass;
|
||||||
|
@ -77,15 +81,18 @@ struct tmq_conf_t {
|
||||||
struct tmq_t {
|
struct tmq_t {
|
||||||
int64_t refId;
|
int64_t refId;
|
||||||
char groupId[TSDB_CGROUP_LEN];
|
char groupId[TSDB_CGROUP_LEN];
|
||||||
char clientId[256];
|
char clientId[TSDB_CLIENT_ID_LEN];
|
||||||
int8_t withTbName;
|
int8_t withTbName;
|
||||||
int8_t useSnapshot;
|
int8_t useSnapshot;
|
||||||
int8_t autoCommit;
|
int8_t autoCommit;
|
||||||
int32_t autoCommitInterval;
|
int32_t autoCommitInterval;
|
||||||
|
int32_t sessionTimeoutMs;
|
||||||
|
int32_t heartBeatIntervalMs;
|
||||||
|
int32_t maxPollIntervalMs;
|
||||||
int8_t resetOffsetCfg;
|
int8_t resetOffsetCfg;
|
||||||
int8_t replayEnable;
|
int8_t replayEnable;
|
||||||
int8_t sourceExcluded; // do not consume, bit
|
int8_t sourceExcluded; // do not consume, bit
|
||||||
uint64_t consumerId;
|
int64_t consumerId;
|
||||||
tmq_commit_cb* commitCb;
|
tmq_commit_cb* commitCb;
|
||||||
void* commitCbUserParam;
|
void* commitCbUserParam;
|
||||||
int8_t enableBatchMeta;
|
int8_t enableBatchMeta;
|
||||||
|
@ -266,6 +273,9 @@ tmq_conf_t* tmq_conf_new() {
|
||||||
conf->autoCommitInterval = DEFAULT_AUTO_COMMIT_INTERVAL;
|
conf->autoCommitInterval = DEFAULT_AUTO_COMMIT_INTERVAL;
|
||||||
conf->resetOffset = TMQ_OFFSET__RESET_LATEST;
|
conf->resetOffset = TMQ_OFFSET__RESET_LATEST;
|
||||||
conf->enableBatchMeta = false;
|
conf->enableBatchMeta = false;
|
||||||
|
conf->heartBeatIntervalMs = DEFAULT_HEARTBEAT_INTERVAL;
|
||||||
|
conf->maxPollIntervalMs = DEFAULT_MAX_POLL_INTERVAL;
|
||||||
|
conf->sessionTimeoutMs = DEFAULT_SESSION_TIMEOUT;
|
||||||
|
|
||||||
return conf;
|
return conf;
|
||||||
}
|
}
|
||||||
|
@ -295,7 +305,7 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
|
||||||
}
|
}
|
||||||
|
|
||||||
if (strcasecmp(key, "client.id") == 0) {
|
if (strcasecmp(key, "client.id") == 0) {
|
||||||
tstrncpy(conf->clientId, value, 256);
|
tstrncpy(conf->clientId, value, TSDB_CLIENT_ID_LEN);
|
||||||
return TMQ_CONF_OK;
|
return TMQ_CONF_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -312,7 +322,38 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
|
||||||
}
|
}
|
||||||
|
|
||||||
if (strcasecmp(key, "auto.commit.interval.ms") == 0) {
|
if (strcasecmp(key, "auto.commit.interval.ms") == 0) {
|
||||||
conf->autoCommitInterval = taosStr2int64(value);
|
int64_t tmp = taosStr2int64(value);
|
||||||
|
if (tmp < 0 || EINVAL == errno || ERANGE == errno) {
|
||||||
|
return TMQ_CONF_INVALID;
|
||||||
|
}
|
||||||
|
conf->autoCommitInterval = (tmp > INT32_MAX ? INT32_MAX : tmp);
|
||||||
|
return TMQ_CONF_OK;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (strcasecmp(key, "session.timeout.ms") == 0) {
|
||||||
|
int64_t tmp = taosStr2int64(value);
|
||||||
|
if (tmp < 6000 || tmp > 1800000){
|
||||||
|
return TMQ_CONF_INVALID;
|
||||||
|
}
|
||||||
|
conf->sessionTimeoutMs = tmp;
|
||||||
|
return TMQ_CONF_OK;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (strcasecmp(key, "heartbeat.interval.ms") == 0) {
|
||||||
|
int64_t tmp = taosStr2int64(value);
|
||||||
|
if (tmp < 1000 || tmp >= conf->sessionTimeoutMs){
|
||||||
|
return TMQ_CONF_INVALID;
|
||||||
|
}
|
||||||
|
conf->heartBeatIntervalMs = tmp;
|
||||||
|
return TMQ_CONF_OK;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (strcasecmp(key, "max.poll.interval.ms") == 0) {
|
||||||
|
int64_t tmp = taosStr2int64(value);
|
||||||
|
if (tmp < 1000 || tmp > INT32_MAX){
|
||||||
|
return TMQ_CONF_INVALID;
|
||||||
|
}
|
||||||
|
conf->maxPollIntervalMs = tmp;
|
||||||
return TMQ_CONF_OK;
|
return TMQ_CONF_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -371,7 +412,12 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
|
||||||
}
|
}
|
||||||
|
|
||||||
if (strcasecmp(key, "td.connect.port") == 0) {
|
if (strcasecmp(key, "td.connect.port") == 0) {
|
||||||
conf->port = taosStr2int64(value);
|
int64_t tmp = taosStr2int64(value);
|
||||||
|
if (tmp <= 0 || tmp > 65535) {
|
||||||
|
return TMQ_CONF_INVALID;
|
||||||
|
}
|
||||||
|
|
||||||
|
conf->port = tmp;
|
||||||
return TMQ_CONF_OK;
|
return TMQ_CONF_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -825,6 +871,7 @@ void tmqSendHbReq(void* param, void* tmrId) {
|
||||||
SMqHbReq req = {0};
|
SMqHbReq req = {0};
|
||||||
req.consumerId = tmq->consumerId;
|
req.consumerId = tmq->consumerId;
|
||||||
req.epoch = tmq->epoch;
|
req.epoch = tmq->epoch;
|
||||||
|
req.pollFlag = atomic_load_8(&pollFlag);
|
||||||
req.topics = taosArrayInit(taosArrayGetSize(tmq->clientTopics), sizeof(TopicOffsetRows));
|
req.topics = taosArrayInit(taosArrayGetSize(tmq->clientTopics), sizeof(TopicOffsetRows));
|
||||||
if (req.topics == NULL){
|
if (req.topics == NULL){
|
||||||
return;
|
return;
|
||||||
|
@ -906,10 +953,11 @@ void tmqSendHbReq(void* param, void* tmrId) {
|
||||||
tscError("tmqSendHbReq asyncSendMsgToServer failed");
|
tscError("tmqSendHbReq asyncSendMsgToServer failed");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
atomic_val_compare_exchange_8(&pollFlag, 1, 0);
|
||||||
OVER:
|
OVER:
|
||||||
tDestroySMqHbReq(&req);
|
tDestroySMqHbReq(&req);
|
||||||
if (tmrId != NULL) {
|
if(tmrId != NULL){
|
||||||
(void)taosTmrReset(tmqSendHbReq, DEFAULT_HEARTBEAT_INTERVAL, param, tmqMgmt.timer, &tmq->hbLiveTimer);
|
(void)taosTmrReset(tmqSendHbReq, tmq->heartBeatIntervalMs, param, tmqMgmt.timer, &tmq->hbLiveTimer);
|
||||||
}
|
}
|
||||||
(void)taosReleaseRef(tmqMgmt.rsetId, refId);
|
(void)taosReleaseRef(tmqMgmt.rsetId, refId);
|
||||||
}
|
}
|
||||||
|
@ -1208,6 +1256,9 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
|
||||||
pTmq->useSnapshot = conf->snapEnable;
|
pTmq->useSnapshot = conf->snapEnable;
|
||||||
pTmq->autoCommit = conf->autoCommit;
|
pTmq->autoCommit = conf->autoCommit;
|
||||||
pTmq->autoCommitInterval = conf->autoCommitInterval;
|
pTmq->autoCommitInterval = conf->autoCommitInterval;
|
||||||
|
pTmq->sessionTimeoutMs = conf->sessionTimeoutMs;
|
||||||
|
pTmq->heartBeatIntervalMs = conf->heartBeatIntervalMs;
|
||||||
|
pTmq->maxPollIntervalMs = conf->maxPollIntervalMs;
|
||||||
pTmq->commitCb = conf->commitCb;
|
pTmq->commitCb = conf->commitCb;
|
||||||
pTmq->commitCbUserParam = conf->commitCbUserParam;
|
pTmq->commitCbUserParam = conf->commitCbUserParam;
|
||||||
pTmq->resetOffsetCfg = conf->resetOffset;
|
pTmq->resetOffsetCfg = conf->resetOffset;
|
||||||
|
@ -1246,7 +1297,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
|
||||||
goto _failed;
|
goto _failed;
|
||||||
}
|
}
|
||||||
|
|
||||||
pTmq->hbLiveTimer = taosTmrStart(tmqSendHbReq, DEFAULT_HEARTBEAT_INTERVAL, (void*)pTmq->refId, tmqMgmt.timer);
|
pTmq->hbLiveTimer = taosTmrStart(tmqSendHbReq, pTmq->heartBeatIntervalMs, (void*)pTmq->refId, tmqMgmt.timer);
|
||||||
if (pTmq->hbLiveTimer == NULL) {
|
if (pTmq->hbLiveTimer == NULL) {
|
||||||
SET_ERROR_MSG_TMQ("start heartbeat timer failed")
|
SET_ERROR_MSG_TMQ("start heartbeat timer failed")
|
||||||
goto _failed;
|
goto _failed;
|
||||||
|
@ -1279,7 +1330,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
|
||||||
tscInfo("consumer:0x%" PRIx64 " cgroup:%s, subscribe %d topics", tmq->consumerId, tmq->groupId, sz);
|
tscInfo("consumer:0x%" PRIx64 " cgroup:%s, subscribe %d topics", tmq->consumerId, tmq->groupId, sz);
|
||||||
|
|
||||||
req.consumerId = tmq->consumerId;
|
req.consumerId = tmq->consumerId;
|
||||||
tstrncpy(req.clientId, tmq->clientId, 256);
|
tstrncpy(req.clientId, tmq->clientId, TSDB_CLIENT_ID_LEN);
|
||||||
tstrncpy(req.cgroup, tmq->groupId, TSDB_CGROUP_LEN);
|
tstrncpy(req.cgroup, tmq->groupId, TSDB_CGROUP_LEN);
|
||||||
|
|
||||||
req.topicNames = taosArrayInit(sz, sizeof(void*));
|
req.topicNames = taosArrayInit(sz, sizeof(void*));
|
||||||
|
@ -1291,6 +1342,8 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
|
||||||
req.withTbName = tmq->withTbName;
|
req.withTbName = tmq->withTbName;
|
||||||
req.autoCommit = tmq->autoCommit;
|
req.autoCommit = tmq->autoCommit;
|
||||||
req.autoCommitInterval = tmq->autoCommitInterval;
|
req.autoCommitInterval = tmq->autoCommitInterval;
|
||||||
|
req.sessionTimeoutMs = tmq->sessionTimeoutMs;
|
||||||
|
req.maxPollIntervalMs = tmq->maxPollIntervalMs;
|
||||||
req.resetOffsetCfg = tmq->resetOffsetCfg;
|
req.resetOffsetCfg = tmq->resetOffsetCfg;
|
||||||
req.enableReplay = tmq->replayEnable;
|
req.enableReplay = tmq->replayEnable;
|
||||||
req.enableBatchMeta = tmq->enableBatchMeta;
|
req.enableBatchMeta = tmq->enableBatchMeta;
|
||||||
|
@ -2343,6 +2396,8 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
atomic_val_compare_exchange_8(&pollFlag, 0, 1);
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
tmqHandleAllDelayedTask(tmq);
|
tmqHandleAllDelayedTask(tmq);
|
||||||
|
|
||||||
|
|
|
@ -482,16 +482,16 @@ static const SSysDbTableSchema connectionsSchema[] = {
|
||||||
|
|
||||||
|
|
||||||
static const SSysDbTableSchema consumerSchema[] = {
|
static const SSysDbTableSchema consumerSchema[] = {
|
||||||
{.name = "consumer_id", .bytes = 32, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
|
{.name = "consumer_id", .bytes = TSDB_CONSUMER_ID_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
|
||||||
{.name = "consumer_group", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
|
{.name = "consumer_group", .bytes = TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
|
||||||
{.name = "client_id", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
|
{.name = "client_id", .bytes = TSDB_CLIENT_ID_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
|
||||||
{.name = "status", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
|
{.name = "status", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
|
||||||
{.name = "topics", .bytes = TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
|
{.name = "topics", .bytes = TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
|
||||||
/*{.name = "end_point", .bytes = TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},*/
|
/*{.name = "end_point", .bytes = TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},*/
|
||||||
{.name = "up_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false},
|
{.name = "up_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false},
|
||||||
{.name = "subscribe_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false},
|
{.name = "subscribe_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false},
|
||||||
{.name = "rebalance_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false},
|
{.name = "rebalance_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false},
|
||||||
{.name = "parameters", .bytes = 64 + TSDB_OFFSET_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
|
{.name = "parameters", .bytes = 128 + TSDB_OFFSET_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
|
||||||
};
|
};
|
||||||
|
|
||||||
static const SSysDbTableSchema offsetSchema[] = {
|
static const SSysDbTableSchema offsetSchema[] = {
|
||||||
|
|
|
@ -7083,6 +7083,7 @@ int32_t tSerializeSMqHbReq(void *buf, int32_t bufLen, SMqHbReq *pReq) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (tEncodeI8(&encoder, pReq->pollFlag) < 0) return -1;
|
||||||
tEndEncode(&encoder);
|
tEndEncode(&encoder);
|
||||||
|
|
||||||
int32_t tlen = encoder.pos;
|
int32_t tlen = encoder.pos;
|
||||||
|
@ -7122,6 +7123,9 @@ int32_t tDeserializeSMqHbReq(void *buf, int32_t bufLen, SMqHbReq *pReq) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (!tDecodeIsEnd(&decoder)) {
|
||||||
|
if (tDecodeI8(&decoder, &pReq->pollFlag) < 0) return -1;
|
||||||
|
}
|
||||||
tEndDecode(&decoder);
|
tEndDecode(&decoder);
|
||||||
|
|
||||||
tDecoderClear(&decoder);
|
tDecoderClear(&decoder);
|
||||||
|
|
|
@ -25,7 +25,7 @@ extern "C" {
|
||||||
enum {
|
enum {
|
||||||
MQ_CONSUMER_STATUS_REBALANCE = 1,
|
MQ_CONSUMER_STATUS_REBALANCE = 1,
|
||||||
MQ_CONSUMER_STATUS_READY,
|
MQ_CONSUMER_STATUS_READY,
|
||||||
MQ_CONSUMER_STATUS_LOST,
|
// MQ_CONSUMER_STATUS_LOST,
|
||||||
};
|
};
|
||||||
|
|
||||||
int32_t mndInitConsumer(SMnode *pMnode);
|
int32_t mndInitConsumer(SMnode *pMnode);
|
||||||
|
|
|
@ -596,11 +596,12 @@ typedef struct {
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int64_t consumerId;
|
int64_t consumerId;
|
||||||
char cgroup[TSDB_CGROUP_LEN];
|
char cgroup[TSDB_CGROUP_LEN];
|
||||||
char clientId[256];
|
char clientId[TSDB_CLIENT_ID_LEN];
|
||||||
int8_t updateType; // used only for update
|
int8_t updateType; // used only for update
|
||||||
int32_t epoch;
|
int32_t epoch;
|
||||||
int32_t status;
|
int32_t status;
|
||||||
int32_t hbStatus; // hbStatus is not applicable to serialization
|
int32_t hbStatus; // hbStatus is not applicable to serialization
|
||||||
|
int32_t pollStatus; // pollStatus is not applicable to serialization
|
||||||
SRWLatch lock; // lock is used for topics update
|
SRWLatch lock; // lock is used for topics update
|
||||||
SArray* currentTopics; // SArray<char*>
|
SArray* currentTopics; // SArray<char*>
|
||||||
SArray* rebNewTopics; // SArray<char*>
|
SArray* rebNewTopics; // SArray<char*>
|
||||||
|
@ -620,6 +621,8 @@ typedef struct {
|
||||||
int8_t autoCommit;
|
int8_t autoCommit;
|
||||||
int32_t autoCommitInterval;
|
int32_t autoCommitInterval;
|
||||||
int32_t resetOffsetCfg;
|
int32_t resetOffsetCfg;
|
||||||
|
int32_t sessionTimeoutMs;
|
||||||
|
int32_t maxPollIntervalMs;
|
||||||
} SMqConsumerObj;
|
} SMqConsumerObj;
|
||||||
|
|
||||||
int32_t tNewSMqConsumerObj(int64_t consumerId, char *cgroup, int8_t updateType,
|
int32_t tNewSMqConsumerObj(int64_t consumerId, char *cgroup, int8_t updateType,
|
||||||
|
|
|
@ -25,7 +25,7 @@
|
||||||
#include "tcompare.h"
|
#include "tcompare.h"
|
||||||
#include "tname.h"
|
#include "tname.h"
|
||||||
|
|
||||||
#define MND_CONSUMER_VER_NUMBER 2
|
#define MND_CONSUMER_VER_NUMBER 3
|
||||||
#define MND_CONSUMER_RESERVE_SIZE 64
|
#define MND_CONSUMER_RESERVE_SIZE 64
|
||||||
|
|
||||||
#define MND_MAX_GROUP_PER_TOPIC 100
|
#define MND_MAX_GROUP_PER_TOPIC 100
|
||||||
|
@ -55,7 +55,6 @@ int32_t mndInitConsumer(SMnode *pMnode) {
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_TMQ_SUBSCRIBE, mndProcessSubscribeReq);
|
mndSetMsgHandle(pMnode, TDMT_MND_TMQ_SUBSCRIBE, mndProcessSubscribeReq);
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_TMQ_HB, mndProcessMqHbReq);
|
mndSetMsgHandle(pMnode, TDMT_MND_TMQ_HB, mndProcessMqHbReq);
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_TMQ_ASK_EP, mndProcessAskEpReq);
|
mndSetMsgHandle(pMnode, TDMT_MND_TMQ_ASK_EP, mndProcessAskEpReq);
|
||||||
// mndSetMsgHandle(pMnode, TDMT_MND_TMQ_TIMER, mndProcessMqTimerMsg);
|
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, mndProcessConsumerClearMsg);
|
mndSetMsgHandle(pMnode, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, mndProcessConsumerClearMsg);
|
||||||
|
|
||||||
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONSUMERS, mndRetrieveConsumer);
|
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONSUMERS, mndRetrieveConsumer);
|
||||||
|
@ -238,11 +237,10 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
|
||||||
MND_TMQ_RETURN_CHECK(mndAcquireConsumer(pMnode, consumerId, &pConsumer));
|
MND_TMQ_RETURN_CHECK(mndAcquireConsumer(pMnode, consumerId, &pConsumer));
|
||||||
MND_TMQ_RETURN_CHECK(checkPrivilege(pMnode, pConsumer, &rsp, pMsg->info.conn.user));
|
MND_TMQ_RETURN_CHECK(checkPrivilege(pMnode, pConsumer, &rsp, pMsg->info.conn.user));
|
||||||
atomic_store_32(&pConsumer->hbStatus, 0);
|
atomic_store_32(&pConsumer->hbStatus, 0);
|
||||||
int32_t status = atomic_load_32(&pConsumer->status);
|
if (req.pollFlag == 1){
|
||||||
if (status == MQ_CONSUMER_STATUS_LOST) {
|
atomic_store_32(&pConsumer->pollStatus, 0);
|
||||||
mInfo("try to recover consumer:0x%" PRIx64, consumerId);
|
|
||||||
MND_TMQ_RETURN_CHECK(mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_CONSUMER_RECOVER, &pMsg->info));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
storeOffsetRows(pMnode, &req, pConsumer);
|
storeOffsetRows(pMnode, &req, pConsumer);
|
||||||
code = buildMqHbRsp(pMsg, &rsp);
|
code = buildMqHbRsp(pMsg, &rsp);
|
||||||
|
|
||||||
|
@ -389,11 +387,9 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
|
||||||
code = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
|
code = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
|
||||||
goto END;
|
goto END;
|
||||||
}
|
}
|
||||||
atomic_store_32(&pConsumer->hbStatus, 0);
|
|
||||||
|
// 1. check consumer status
|
||||||
int32_t status = atomic_load_32(&pConsumer->status);
|
int32_t status = atomic_load_32(&pConsumer->status);
|
||||||
if (status == MQ_CONSUMER_STATUS_LOST) {
|
|
||||||
MND_TMQ_RETURN_CHECK(mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_CONSUMER_RECOVER, &pMsg->info));
|
|
||||||
}
|
|
||||||
if (status != MQ_CONSUMER_STATUS_READY) {
|
if (status != MQ_CONSUMER_STATUS_READY) {
|
||||||
mInfo("consumer:0x%" PRIx64 " not ready, status: %s", consumerId, mndConsumerStatusName(status));
|
mInfo("consumer:0x%" PRIx64 " not ready, status: %s", consumerId, mndConsumerStatusName(status));
|
||||||
code = TSDB_CODE_MND_CONSUMER_NOT_READY;
|
code = TSDB_CODE_MND_CONSUMER_NOT_READY;
|
||||||
|
@ -566,7 +562,7 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
|
||||||
STrans *pTrans = NULL;
|
STrans *pTrans = NULL;
|
||||||
|
|
||||||
SCMSubscribeReq subscribe = {0};
|
SCMSubscribeReq subscribe = {0};
|
||||||
MND_TMQ_RETURN_CHECK(tDeserializeSCMSubscribeReq(msgStr, &subscribe));
|
MND_TMQ_RETURN_CHECK(tDeserializeSCMSubscribeReq(msgStr, &subscribe, pMsg->contLen));
|
||||||
if(taosArrayGetSize(subscribe.topicNames) == 0){
|
if(taosArrayGetSize(subscribe.topicNames) == 0){
|
||||||
SMqConsumerObj *pConsumerTmp = NULL;
|
SMqConsumerObj *pConsumerTmp = NULL;
|
||||||
MND_TMQ_RETURN_CHECK(mndAcquireConsumer(pMnode, subscribe.consumerId, &pConsumerTmp));
|
MND_TMQ_RETURN_CHECK(mndAcquireConsumer(pMnode, subscribe.consumerId, &pConsumerTmp));
|
||||||
|
@ -701,17 +697,17 @@ static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void updateConsumerStatus(SMqConsumerObj *pConsumer) {
|
//static void updateConsumerStatus(SMqConsumerObj *pConsumer) {
|
||||||
int32_t status = pConsumer->status;
|
// int32_t status = pConsumer->status;
|
||||||
|
//
|
||||||
if (taosArrayGetSize(pConsumer->rebNewTopics) == 0 && taosArrayGetSize(pConsumer->rebRemovedTopics) == 0) {
|
// if (taosArrayGetSize(pConsumer->rebNewTopics) == 0 && taosArrayGetSize(pConsumer->rebRemovedTopics) == 0) {
|
||||||
if (status == MQ_CONSUMER_STATUS_REBALANCE) {
|
// if (status == MQ_CONSUMER_STATUS_REBALANCE) {
|
||||||
pConsumer->status = MQ_CONSUMER_STATUS_READY;
|
// pConsumer->status = MQ_CONSUMER_STATUS_READY;
|
||||||
} else if (status == MQ_CONSUMER_STATUS_READY && taosArrayGetSize(pConsumer->currentTopics) == 0) {
|
// } else if (status == MQ_CONSUMER_STATUS_READY && taosArrayGetSize(pConsumer->currentTopics) == 0) {
|
||||||
pConsumer->status = MQ_CONSUMER_STATUS_LOST;
|
// pConsumer->status = MQ_CONSUMER_STATUS_LOST;
|
||||||
}
|
// }
|
||||||
}
|
// }
|
||||||
}
|
//}
|
||||||
|
|
||||||
// remove from topic list
|
// remove from topic list
|
||||||
static void removeFromTopicList(SArray *topicList, const char *pTopic, int64_t consumerId, char *type) {
|
static void removeFromTopicList(SArray *topicList, const char *pTopic, int64_t consumerId, char *type) {
|
||||||
|
@ -757,21 +753,6 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
|
||||||
pOldConsumer->subscribeTime = taosGetTimestampMs();
|
pOldConsumer->subscribeTime = taosGetTimestampMs();
|
||||||
pOldConsumer->status = MQ_CONSUMER_STATUS_REBALANCE;
|
pOldConsumer->status = MQ_CONSUMER_STATUS_REBALANCE;
|
||||||
mInfo("consumer:0x%" PRIx64 " subscribe update, modify existed consumer", pOldConsumer->consumerId);
|
mInfo("consumer:0x%" PRIx64 " subscribe update, modify existed consumer", pOldConsumer->consumerId);
|
||||||
} else if (pNewConsumer->updateType == CONSUMER_UPDATE_REC) {
|
|
||||||
int32_t sz = taosArrayGetSize(pOldConsumer->assignedTopics);
|
|
||||||
for (int32_t i = 0; i < sz; i++) {
|
|
||||||
void * tmp = taosArrayGetP(pOldConsumer->assignedTopics, i);
|
|
||||||
if (tmp == NULL){
|
|
||||||
return TSDB_CODE_TMQ_INVALID_MSG;
|
|
||||||
}
|
|
||||||
char *topic = taosStrdup(tmp);
|
|
||||||
if (taosArrayPush(pOldConsumer->rebNewTopics, &topic) == NULL) {
|
|
||||||
taosMemoryFree(topic);
|
|
||||||
return TSDB_CODE_TMQ_INVALID_MSG;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
pOldConsumer->status = MQ_CONSUMER_STATUS_REBALANCE;
|
|
||||||
mInfo("consumer:0x%" PRIx64 " recover update", pOldConsumer->consumerId);
|
|
||||||
} else if (pNewConsumer->updateType == CONSUMER_UPDATE_REB) {
|
} else if (pNewConsumer->updateType == CONSUMER_UPDATE_REB) {
|
||||||
(void)atomic_add_fetch_32(&pOldConsumer->epoch, 1);
|
(void)atomic_add_fetch_32(&pOldConsumer->epoch, 1);
|
||||||
pOldConsumer->rebalanceTime = taosGetTimestampMs();
|
pOldConsumer->rebalanceTime = taosGetTimestampMs();
|
||||||
|
@ -796,7 +777,11 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t status = pOldConsumer->status;
|
int32_t status = pOldConsumer->status;
|
||||||
updateConsumerStatus(pOldConsumer);
|
// updateConsumerStatus(pOldConsumer);
|
||||||
|
if (taosArrayGetSize(pOldConsumer->rebNewTopics) == 0 && taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0) {
|
||||||
|
pOldConsumer->status = MQ_CONSUMER_STATUS_READY;
|
||||||
|
}
|
||||||
|
|
||||||
pOldConsumer->rebalanceTime = taosGetTimestampMs();
|
pOldConsumer->rebalanceTime = taosGetTimestampMs();
|
||||||
(void)atomic_add_fetch_32(&pOldConsumer->epoch, 1);
|
(void)atomic_add_fetch_32(&pOldConsumer->epoch, 1);
|
||||||
|
|
||||||
|
@ -816,7 +801,10 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
|
||||||
removeFromTopicList(pOldConsumer->currentTopics, topic, pOldConsumer->consumerId, "current");
|
removeFromTopicList(pOldConsumer->currentTopics, topic, pOldConsumer->consumerId, "current");
|
||||||
|
|
||||||
int32_t status = pOldConsumer->status;
|
int32_t status = pOldConsumer->status;
|
||||||
updateConsumerStatus(pOldConsumer);
|
// updateConsumerStatus(pOldConsumer);
|
||||||
|
if (taosArrayGetSize(pOldConsumer->rebNewTopics) == 0 && taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0) {
|
||||||
|
pOldConsumer->status = MQ_CONSUMER_STATUS_READY;
|
||||||
|
}
|
||||||
pOldConsumer->rebalanceTime = taosGetTimestampMs();
|
pOldConsumer->rebalanceTime = taosGetTimestampMs();
|
||||||
(void)atomic_add_fetch_32(&pOldConsumer->epoch, 1);
|
(void)atomic_add_fetch_32(&pOldConsumer->epoch, 1);
|
||||||
|
|
||||||
|
@ -852,6 +840,8 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *
|
||||||
int32_t numOfRows = 0;
|
int32_t numOfRows = 0;
|
||||||
SMqConsumerObj *pConsumer = NULL;
|
SMqConsumerObj *pConsumer = NULL;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
char *parasStr = NULL;
|
||||||
|
char *status = NULL;
|
||||||
|
|
||||||
while (numOfRows < rowsCapacity) {
|
while (numOfRows < rowsCapacity) {
|
||||||
pShow->pIter = sdbFetch(pSdb, SDB_CONSUMER, pShow->pIter, (void **)&pConsumer);
|
pShow->pIter = sdbFetch(pSdb, SDB_CONSUMER, pShow->pIter, (void **)&pConsumer);
|
||||||
|
@ -884,7 +874,7 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *
|
||||||
int32_t cols = 0;
|
int32_t cols = 0;
|
||||||
|
|
||||||
// consumer id
|
// consumer id
|
||||||
char consumerIdHex[32] = {0};
|
char consumerIdHex[TSDB_CONSUMER_ID_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||||
(void)sprintf(varDataVal(consumerIdHex), "0x%" PRIx64, pConsumer->consumerId);
|
(void)sprintf(varDataVal(consumerIdHex), "0x%" PRIx64, pConsumer->consumerId);
|
||||||
varDataSetLen(consumerIdHex, strlen(varDataVal(consumerIdHex)));
|
varDataSetLen(consumerIdHex, strlen(varDataVal(consumerIdHex)));
|
||||||
|
|
||||||
|
@ -901,7 +891,7 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *
|
||||||
MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)cgroup, false));
|
MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)cgroup, false));
|
||||||
|
|
||||||
// client id
|
// client id
|
||||||
char clientId[256 + VARSTR_HEADER_SIZE] = {0};
|
char clientId[TSDB_CLIENT_ID_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||||
STR_TO_VARSTR(clientId, pConsumer->clientId);
|
STR_TO_VARSTR(clientId, pConsumer->clientId);
|
||||||
|
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
@ -909,13 +899,15 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *
|
||||||
MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)clientId, false));
|
MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)clientId, false));
|
||||||
|
|
||||||
// status
|
// status
|
||||||
char status[20 + VARSTR_HEADER_SIZE] = {0};
|
|
||||||
const char *pStatusName = mndConsumerStatusName(pConsumer->status);
|
const char *pStatusName = mndConsumerStatusName(pConsumer->status);
|
||||||
|
status = taosMemoryCalloc(1, pShow->pMeta->pSchemas[cols].bytes);
|
||||||
|
MND_TMQ_NULL_CHECK(status);
|
||||||
STR_TO_VARSTR(status, pStatusName);
|
STR_TO_VARSTR(status, pStatusName);
|
||||||
|
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
MND_TMQ_NULL_CHECK(pColInfo);
|
MND_TMQ_NULL_CHECK(pColInfo);
|
||||||
MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)status, false));
|
MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)status, false));
|
||||||
|
taosMemoryFreeClear(status);
|
||||||
|
|
||||||
// one subscribed topic
|
// one subscribed topic
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
@ -948,7 +940,8 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *
|
||||||
STqOffsetVal pVal = {.type = pConsumer->resetOffsetCfg};
|
STqOffsetVal pVal = {.type = pConsumer->resetOffsetCfg};
|
||||||
tFormatOffset(buf, TSDB_OFFSET_LEN, &pVal);
|
tFormatOffset(buf, TSDB_OFFSET_LEN, &pVal);
|
||||||
|
|
||||||
char parasStr[64 + TSDB_OFFSET_LEN + VARSTR_HEADER_SIZE] = {0};
|
parasStr = taosMemoryCalloc(1, pShow->pMeta->pSchemas[cols].bytes);
|
||||||
|
MND_TMQ_NULL_CHECK(parasStr);
|
||||||
(void)sprintf(varDataVal(parasStr), "tbname:%d,commit:%d,interval:%dms,reset:%s", pConsumer->withTbName,
|
(void)sprintf(varDataVal(parasStr), "tbname:%d,commit:%d,interval:%dms,reset:%s", pConsumer->withTbName,
|
||||||
pConsumer->autoCommit, pConsumer->autoCommitInterval, buf);
|
pConsumer->autoCommit, pConsumer->autoCommitInterval, buf);
|
||||||
varDataSetLen(parasStr, strlen(varDataVal(parasStr)));
|
varDataSetLen(parasStr, strlen(varDataVal(parasStr)));
|
||||||
|
@ -956,7 +949,7 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
MND_TMQ_NULL_CHECK(pColInfo);
|
MND_TMQ_NULL_CHECK(pColInfo);
|
||||||
MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)parasStr, false));
|
MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)parasStr, false));
|
||||||
|
taosMemoryFreeClear(parasStr);
|
||||||
numOfRows++;
|
numOfRows++;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -970,6 +963,8 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *
|
||||||
return numOfRows;
|
return numOfRows;
|
||||||
|
|
||||||
END:
|
END:
|
||||||
|
taosMemoryFreeClear(status);
|
||||||
|
taosMemoryFreeClear(parasStr);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -982,8 +977,8 @@ const char *mndConsumerStatusName(int status) {
|
||||||
switch (status) {
|
switch (status) {
|
||||||
case MQ_CONSUMER_STATUS_READY:
|
case MQ_CONSUMER_STATUS_READY:
|
||||||
return "ready";
|
return "ready";
|
||||||
case MQ_CONSUMER_STATUS_LOST:
|
// case MQ_CONSUMER_STATUS_LOST:
|
||||||
return "lost";
|
// return "lost";
|
||||||
case MQ_CONSUMER_STATUS_REBALANCE:
|
case MQ_CONSUMER_STATUS_REBALANCE:
|
||||||
return "rebalancing";
|
return "rebalancing";
|
||||||
default:
|
default:
|
||||||
|
|
|
@ -288,6 +288,7 @@ int32_t tNewSMqConsumerObj(int64_t consumerId, char *cgroup, int8_t updateType,
|
||||||
pConsumer->epoch = 0;
|
pConsumer->epoch = 0;
|
||||||
pConsumer->status = MQ_CONSUMER_STATUS_REBALANCE;
|
pConsumer->status = MQ_CONSUMER_STATUS_REBALANCE;
|
||||||
pConsumer->hbStatus = 0;
|
pConsumer->hbStatus = 0;
|
||||||
|
pConsumer->pollStatus = 0;
|
||||||
|
|
||||||
taosInitRWLatch(&pConsumer->lock);
|
taosInitRWLatch(&pConsumer->lock);
|
||||||
pConsumer->createTime = taosGetTimestampMs();
|
pConsumer->createTime = taosGetTimestampMs();
|
||||||
|
@ -322,6 +323,8 @@ int32_t tNewSMqConsumerObj(int64_t consumerId, char *cgroup, int8_t updateType,
|
||||||
pConsumer->autoCommit = subscribe->autoCommit;
|
pConsumer->autoCommit = subscribe->autoCommit;
|
||||||
pConsumer->autoCommitInterval = subscribe->autoCommitInterval;
|
pConsumer->autoCommitInterval = subscribe->autoCommitInterval;
|
||||||
pConsumer->resetOffsetCfg = subscribe->resetOffsetCfg;
|
pConsumer->resetOffsetCfg = subscribe->resetOffsetCfg;
|
||||||
|
pConsumer->maxPollIntervalMs = subscribe->maxPollIntervalMs;
|
||||||
|
pConsumer->sessionTimeoutMs = subscribe->sessionTimeoutMs;
|
||||||
|
|
||||||
pConsumer->rebNewTopics = taosArrayDup(subscribe->topicNames, topicNameDup);
|
pConsumer->rebNewTopics = taosArrayDup(subscribe->topicNames, topicNameDup);
|
||||||
if (pConsumer->rebNewTopics == NULL){
|
if (pConsumer->rebNewTopics == NULL){
|
||||||
|
@ -424,6 +427,8 @@ int32_t tEncodeSMqConsumerObj(void **buf, const SMqConsumerObj *pConsumer) {
|
||||||
tlen += taosEncodeFixedI8(buf, pConsumer->autoCommit);
|
tlen += taosEncodeFixedI8(buf, pConsumer->autoCommit);
|
||||||
tlen += taosEncodeFixedI32(buf, pConsumer->autoCommitInterval);
|
tlen += taosEncodeFixedI32(buf, pConsumer->autoCommitInterval);
|
||||||
tlen += taosEncodeFixedI32(buf, pConsumer->resetOffsetCfg);
|
tlen += taosEncodeFixedI32(buf, pConsumer->resetOffsetCfg);
|
||||||
|
tlen += taosEncodeFixedI32(buf, pConsumer->maxPollIntervalMs);
|
||||||
|
tlen += taosEncodeFixedI32(buf, pConsumer->sessionTimeoutMs);
|
||||||
return tlen;
|
return tlen;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -495,6 +500,14 @@ void *tDecodeSMqConsumerObj(const void *buf, SMqConsumerObj *pConsumer, int8_t s
|
||||||
buf = taosDecodeFixedI32(buf, &pConsumer->autoCommitInterval);
|
buf = taosDecodeFixedI32(buf, &pConsumer->autoCommitInterval);
|
||||||
buf = taosDecodeFixedI32(buf, &pConsumer->resetOffsetCfg);
|
buf = taosDecodeFixedI32(buf, &pConsumer->resetOffsetCfg);
|
||||||
}
|
}
|
||||||
|
if (sver > 2){
|
||||||
|
buf = taosDecodeFixedI32(buf, &pConsumer->maxPollIntervalMs);
|
||||||
|
buf = taosDecodeFixedI32(buf, &pConsumer->sessionTimeoutMs);
|
||||||
|
} else{
|
||||||
|
pConsumer->maxPollIntervalMs = DEFAULT_MAX_POLL_INTERVAL;
|
||||||
|
pConsumer->sessionTimeoutMs = DEFAULT_SESSION_TIMEOUT;
|
||||||
|
}
|
||||||
|
|
||||||
return (void *)buf;
|
return (void *)buf;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -27,8 +27,7 @@
|
||||||
#define MND_SUBSCRIBE_VER_NUMBER 3
|
#define MND_SUBSCRIBE_VER_NUMBER 3
|
||||||
#define MND_SUBSCRIBE_RESERVE_SIZE 64
|
#define MND_SUBSCRIBE_RESERVE_SIZE 64
|
||||||
|
|
||||||
#define MND_CONSUMER_LOST_HB_CNT 6
|
//#define MND_CONSUMER_LOST_HB_CNT 6
|
||||||
#define MND_CONSUMER_LOST_CLEAR_THRESHOLD 43200
|
|
||||||
|
|
||||||
static int32_t mqRebInExecCnt = 0;
|
static int32_t mqRebInExecCnt = 0;
|
||||||
|
|
||||||
|
@ -331,6 +330,7 @@ static int32_t processRemoveAddVgs(SMnode *pMnode, SMqRebOutputObj *pOutput) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t totalVgNum = 0;
|
int32_t totalVgNum = 0;
|
||||||
SVgObj *pVgroup = NULL;
|
SVgObj *pVgroup = NULL;
|
||||||
|
SMqVgEp *pVgEp = NULL;
|
||||||
void *pIter = NULL;
|
void *pIter = NULL;
|
||||||
SArray *newVgs = taosArrayInit(0, POINTER_BYTES);
|
SArray *newVgs = taosArrayInit(0, POINTER_BYTES);
|
||||||
MND_TMQ_NULL_CHECK(newVgs);
|
MND_TMQ_NULL_CHECK(newVgs);
|
||||||
|
@ -346,11 +346,12 @@ static int32_t processRemoveAddVgs(SMnode *pMnode, SMqRebOutputObj *pOutput) {
|
||||||
}
|
}
|
||||||
|
|
||||||
totalVgNum++;
|
totalVgNum++;
|
||||||
SMqVgEp *pVgEp = taosMemoryMalloc(sizeof(SMqVgEp));
|
pVgEp = taosMemoryMalloc(sizeof(SMqVgEp));
|
||||||
MND_TMQ_NULL_CHECK(pVgEp);
|
MND_TMQ_NULL_CHECK(pVgEp);
|
||||||
pVgEp->epSet = mndGetVgroupEpset(pMnode, pVgroup);
|
pVgEp->epSet = mndGetVgroupEpset(pMnode, pVgroup);
|
||||||
pVgEp->vgId = pVgroup->vgId;
|
pVgEp->vgId = pVgroup->vgId;
|
||||||
MND_TMQ_NULL_CHECK(taosArrayPush(newVgs, &pVgEp));
|
MND_TMQ_NULL_CHECK(taosArrayPush(newVgs, &pVgEp));
|
||||||
|
pVgEp = NULL;
|
||||||
sdbRelease(pMnode->pSdb, pVgroup);
|
sdbRelease(pMnode->pSdb, pVgroup);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -361,13 +362,13 @@ static int32_t processRemoveAddVgs(SMnode *pMnode, SMqRebOutputObj *pOutput) {
|
||||||
SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
|
SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
|
||||||
int32_t j = 0;
|
int32_t j = 0;
|
||||||
while (j < taosArrayGetSize(pConsumerEp->vgs)) {
|
while (j < taosArrayGetSize(pConsumerEp->vgs)) {
|
||||||
SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j);
|
SMqVgEp *pVgEpTmp = taosArrayGetP(pConsumerEp->vgs, j);
|
||||||
MND_TMQ_NULL_CHECK(pVgEp);
|
MND_TMQ_NULL_CHECK(pVgEpTmp);
|
||||||
bool find = false;
|
bool find = false;
|
||||||
for (int32_t k = 0; k < taosArrayGetSize(newVgs); k++) {
|
for (int32_t k = 0; k < taosArrayGetSize(newVgs); k++) {
|
||||||
SMqVgEp *pnewVgEp = taosArrayGetP(newVgs, k);
|
SMqVgEp *pnewVgEp = taosArrayGetP(newVgs, k);
|
||||||
MND_TMQ_NULL_CHECK(pnewVgEp);
|
MND_TMQ_NULL_CHECK(pnewVgEp);
|
||||||
if (pVgEp->vgId == pnewVgEp->vgId) {
|
if (pVgEpTmp->vgId == pnewVgEp->vgId) {
|
||||||
tDeleteSMqVgEp(pnewVgEp);
|
tDeleteSMqVgEp(pnewVgEp);
|
||||||
taosArrayRemove(newVgs, k);
|
taosArrayRemove(newVgs, k);
|
||||||
find = true;
|
find = true;
|
||||||
|
@ -375,8 +376,8 @@ static int32_t processRemoveAddVgs(SMnode *pMnode, SMqRebOutputObj *pOutput) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (!find) {
|
if (!find) {
|
||||||
mInfo("[rebalance] processRemoveAddVgs old vgId:%d", pVgEp->vgId);
|
mInfo("[rebalance] processRemoveAddVgs old vgId:%d", pVgEpTmp->vgId);
|
||||||
tDeleteSMqVgEp(pVgEp);
|
tDeleteSMqVgEp(pVgEpTmp);
|
||||||
taosArrayRemove(pConsumerEp->vgs, j);
|
taosArrayRemove(pConsumerEp->vgs, j);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -387,12 +388,16 @@ static int32_t processRemoveAddVgs(SMnode *pMnode, SMqRebOutputObj *pOutput) {
|
||||||
if (taosArrayGetSize(pOutput->pSub->unassignedVgs) == 0 && taosArrayGetSize(newVgs) != 0) {
|
if (taosArrayGetSize(pOutput->pSub->unassignedVgs) == 0 && taosArrayGetSize(newVgs) != 0) {
|
||||||
MND_TMQ_NULL_CHECK(taosArrayAddAll(pOutput->pSub->unassignedVgs, newVgs));
|
MND_TMQ_NULL_CHECK(taosArrayAddAll(pOutput->pSub->unassignedVgs, newVgs));
|
||||||
mInfo("[rebalance] processRemoveAddVgs add new vg num:%d", (int)taosArrayGetSize(newVgs));
|
mInfo("[rebalance] processRemoveAddVgs add new vg num:%d", (int)taosArrayGetSize(newVgs));
|
||||||
(void)taosArrayDestroy(newVgs);
|
taosArrayDestroy(newVgs);
|
||||||
} else {
|
} else {
|
||||||
(void)taosArrayDestroyP(newVgs, (FDelete)tDeleteSMqVgEp);
|
taosArrayDestroyP(newVgs, (FDelete)tDeleteSMqVgEp);
|
||||||
}
|
}
|
||||||
return totalVgNum;
|
return totalVgNum;
|
||||||
|
|
||||||
END:
|
END:
|
||||||
|
sdbRelease(pMnode->pSdb, pVgroup);
|
||||||
|
taosMemoryFree(pVgEp);
|
||||||
|
taosArrayDestroyP(newVgs, (FDelete)tDeleteSMqVgEp);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -758,32 +763,32 @@ static int32_t mndCheckConsumer(SRpcMsg *pMsg, SHashObj *rebSubHash) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t hbStatus = atomic_add_fetch_32(&pConsumer->hbStatus, 1);
|
int32_t hbStatus = atomic_add_fetch_32(&pConsumer->hbStatus, 1);
|
||||||
|
int32_t pollStatus = atomic_add_fetch_32(&pConsumer->pollStatus, 1);
|
||||||
int32_t status = atomic_load_32(&pConsumer->status);
|
int32_t status = atomic_load_32(&pConsumer->status);
|
||||||
|
|
||||||
mDebug("[rebalance] check for consumer:0x%" PRIx64 " status:%d(%s), sub-time:%" PRId64 ", createTime:%" PRId64
|
mDebug("[rebalance] check for consumer:0x%" PRIx64 " status:%d(%s), sub-time:%" PRId64 ", createTime:%" PRId64
|
||||||
", hbstatus:%d",
|
", hbstatus:%d, pollStatus:%d",
|
||||||
pConsumer->consumerId, status, mndConsumerStatusName(status), pConsumer->subscribeTime,
|
pConsumer->consumerId, status, mndConsumerStatusName(status), pConsumer->subscribeTime,
|
||||||
pConsumer->createTime, hbStatus);
|
pConsumer->createTime, hbStatus, pollStatus);
|
||||||
|
|
||||||
if (status == MQ_CONSUMER_STATUS_READY) {
|
if (status == MQ_CONSUMER_STATUS_READY) {
|
||||||
if (taosArrayGetSize(pConsumer->assignedTopics) == 0) { // unsubscribe or close
|
if (taosArrayGetSize(pConsumer->currentTopics) == 0) { // unsubscribe or close
|
||||||
MND_TMQ_RETURN_CHECK(mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, &pMsg->info));
|
MND_TMQ_RETURN_CHECK(mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, &pMsg->info));
|
||||||
} else if (hbStatus > MND_CONSUMER_LOST_HB_CNT) {
|
} else if (hbStatus * tsMqRebalanceInterval * 1000 >= pConsumer->sessionTimeoutMs ||
|
||||||
|
pollStatus * tsMqRebalanceInterval * 1000 >= pConsumer->maxPollIntervalMs) {
|
||||||
taosRLockLatch(&pConsumer->lock);
|
taosRLockLatch(&pConsumer->lock);
|
||||||
MND_TMQ_RETURN_CHECK(buildRebInfo(rebSubHash, pConsumer->currentTopics, 0, pConsumer->cgroup, pConsumer->consumerId));
|
MND_TMQ_RETURN_CHECK(buildRebInfo(rebSubHash, pConsumer->currentTopics, 0, pConsumer->cgroup, pConsumer->consumerId));
|
||||||
taosRUnLockLatch(&pConsumer->lock);
|
taosRUnLockLatch(&pConsumer->lock);
|
||||||
} else {
|
} else {
|
||||||
checkForVgroupSplit(pMnode, pConsumer, rebSubHash);
|
checkForVgroupSplit(pMnode, pConsumer, rebSubHash);
|
||||||
}
|
}
|
||||||
} else if (status == MQ_CONSUMER_STATUS_LOST) {
|
} else if (status == MQ_CONSUMER_STATUS_REBALANCE) {
|
||||||
if (hbStatus > MND_CONSUMER_LOST_CLEAR_THRESHOLD) { // clear consumer if lost a day
|
|
||||||
MND_TMQ_RETURN_CHECK(mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, &pMsg->info));
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
taosRLockLatch(&pConsumer->lock);
|
taosRLockLatch(&pConsumer->lock);
|
||||||
MND_TMQ_RETURN_CHECK(buildRebInfo(rebSubHash, pConsumer->rebNewTopics, 1, pConsumer->cgroup, pConsumer->consumerId));
|
MND_TMQ_RETURN_CHECK(buildRebInfo(rebSubHash, pConsumer->rebNewTopics, 1, pConsumer->cgroup, pConsumer->consumerId));
|
||||||
MND_TMQ_RETURN_CHECK(buildRebInfo(rebSubHash, pConsumer->rebRemovedTopics, 0, pConsumer->cgroup, pConsumer->consumerId));
|
MND_TMQ_RETURN_CHECK(buildRebInfo(rebSubHash, pConsumer->rebRemovedTopics, 0, pConsumer->cgroup, pConsumer->consumerId));
|
||||||
taosRUnLockLatch(&pConsumer->lock);
|
taosRUnLockLatch(&pConsumer->lock);
|
||||||
|
} else {
|
||||||
|
MND_TMQ_RETURN_CHECK(mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, &pMsg->info));
|
||||||
}
|
}
|
||||||
|
|
||||||
mndReleaseConsumer(pMnode, pConsumer);
|
mndReleaseConsumer(pMnode, pConsumer);
|
||||||
|
@ -1013,37 +1018,37 @@ END:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndDropConsumerByGroup(SMnode *pMnode, STrans *pTrans, char *cgroup, char *topic) {
|
//static int32_t mndDropConsumerByGroup(SMnode *pMnode, STrans *pTrans, char *cgroup, char *topic) {
|
||||||
void *pIter = NULL;
|
// void *pIter = NULL;
|
||||||
SMqConsumerObj *pConsumer = NULL;
|
// SMqConsumerObj *pConsumer = NULL;
|
||||||
int code = 0;
|
// int code = 0;
|
||||||
while (1) {
|
// while (1) {
|
||||||
pIter = sdbFetch(pMnode->pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
|
// pIter = sdbFetch(pMnode->pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
|
||||||
if (pIter == NULL) {
|
// if (pIter == NULL) {
|
||||||
break;
|
// break;
|
||||||
}
|
// }
|
||||||
|
//
|
||||||
// drop consumer in lost status, other consumers not in lost status already deleted by rebalance
|
// // drop consumer in lost status, other consumers not in lost status already deleted by rebalance
|
||||||
if (pConsumer->status != MQ_CONSUMER_STATUS_LOST || strcmp(cgroup, pConsumer->cgroup) != 0) {
|
// if (pConsumer->status != MQ_CONSUMER_STATUS_LOST || strcmp(cgroup, pConsumer->cgroup) != 0) {
|
||||||
sdbRelease(pMnode->pSdb, pConsumer);
|
// sdbRelease(pMnode->pSdb, pConsumer);
|
||||||
continue;
|
// continue;
|
||||||
}
|
// }
|
||||||
int32_t sz = taosArrayGetSize(pConsumer->assignedTopics);
|
// int32_t sz = taosArrayGetSize(pConsumer->assignedTopics);
|
||||||
for (int32_t i = 0; i < sz; i++) {
|
// for (int32_t i = 0; i < sz; i++) {
|
||||||
char *name = taosArrayGetP(pConsumer->assignedTopics, i);
|
// char *name = taosArrayGetP(pConsumer->assignedTopics, i);
|
||||||
if (name && strcmp(topic, name) == 0) {
|
// if (name && strcmp(topic, name) == 0) {
|
||||||
MND_TMQ_RETURN_CHECK(mndSetConsumerDropLogs(pTrans, pConsumer));
|
// MND_TMQ_RETURN_CHECK(mndSetConsumerDropLogs(pTrans, pConsumer));
|
||||||
}
|
// }
|
||||||
}
|
// }
|
||||||
|
//
|
||||||
sdbRelease(pMnode->pSdb, pConsumer);
|
// sdbRelease(pMnode->pSdb, pConsumer);
|
||||||
}
|
// }
|
||||||
|
//
|
||||||
END:
|
//END:
|
||||||
sdbRelease(pMnode->pSdb, pConsumer);
|
// sdbRelease(pMnode->pSdb, pConsumer);
|
||||||
sdbCancelFetch(pMnode->pSdb, pIter);
|
// sdbCancelFetch(pMnode->pSdb, pIter);
|
||||||
return code;
|
// return code;
|
||||||
}
|
//}
|
||||||
|
|
||||||
static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) {
|
static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) {
|
||||||
SMnode *pMnode = pMsg->info.node;
|
SMnode *pMnode = pMsg->info.node;
|
||||||
|
@ -1079,7 +1084,6 @@ static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) {
|
||||||
mInfo("trans:%d, used to drop cgroup:%s on topic %s", pTrans->id, dropReq.cgroup, dropReq.topic);
|
mInfo("trans:%d, used to drop cgroup:%s on topic %s", pTrans->id, dropReq.cgroup, dropReq.topic);
|
||||||
mndTransSetDbName(pTrans, pSub->dbName, dropReq.cgroup);
|
mndTransSetDbName(pTrans, pSub->dbName, dropReq.cgroup);
|
||||||
MND_TMQ_RETURN_CHECK(mndTransCheckConflict(pMnode, pTrans));
|
MND_TMQ_RETURN_CHECK(mndTransCheckConflict(pMnode, pTrans));
|
||||||
MND_TMQ_RETURN_CHECK(mndDropConsumerByGroup(pMnode, pTrans, dropReq.cgroup, dropReq.topic));
|
|
||||||
MND_TMQ_RETURN_CHECK(sendDeleteSubToVnode(pMnode, pSub, pTrans));
|
MND_TMQ_RETURN_CHECK(sendDeleteSubToVnode(pMnode, pSub, pTrans));
|
||||||
MND_TMQ_RETURN_CHECK(mndSetDropSubCommitLogs(pMnode, pTrans, pSub));
|
MND_TMQ_RETURN_CHECK(mndSetDropSubCommitLogs(pMnode, pTrans, pSub));
|
||||||
MND_TMQ_RETURN_CHECK(mndTransPrepare(pMnode, pTrans));
|
MND_TMQ_RETURN_CHECK(mndTransPrepare(pMnode, pTrans));
|
||||||
|
|
|
@ -613,44 +613,44 @@ static bool checkTopic(SArray *topics, char *topicName){
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndDropConsumerByTopic(SMnode *pMnode, STrans *pTrans, char *topicName){
|
//static int32_t mndDropConsumerByTopic(SMnode *pMnode, STrans *pTrans, char *topicName){
|
||||||
int32_t code = 0;
|
// int32_t code = 0;
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
// SSdb *pSdb = pMnode->pSdb;
|
||||||
void *pIter = NULL;
|
// void *pIter = NULL;
|
||||||
SMqConsumerObj *pConsumer = NULL;
|
// SMqConsumerObj *pConsumer = NULL;
|
||||||
while (1) {
|
// while (1) {
|
||||||
pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
|
// pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
|
||||||
if (pIter == NULL) {
|
// if (pIter == NULL) {
|
||||||
break;
|
// break;
|
||||||
}
|
// }
|
||||||
|
//
|
||||||
bool found = checkTopic(pConsumer->assignedTopics, topicName);
|
// bool found = checkTopic(pConsumer->assignedTopics, topicName);
|
||||||
if (found){
|
// if (found){
|
||||||
if (pConsumer->status == MQ_CONSUMER_STATUS_LOST) {
|
// if (pConsumer->status == MQ_CONSUMER_STATUS_LOST) {
|
||||||
MND_TMQ_RETURN_CHECK(mndSetConsumerDropLogs(pTrans, pConsumer));
|
// MND_TMQ_RETURN_CHECK(mndSetConsumerDropLogs(pTrans, pConsumer));
|
||||||
sdbRelease(pSdb, pConsumer);
|
// sdbRelease(pSdb, pConsumer);
|
||||||
continue;
|
// continue;
|
||||||
}
|
// }
|
||||||
mError("topic:%s, failed to drop since subscribed by consumer:0x%" PRIx64 ", in consumer group %s",
|
// mError("topic:%s, failed to drop since subscribed by consumer:0x%" PRIx64 ", in consumer group %s",
|
||||||
topicName, pConsumer->consumerId, pConsumer->cgroup);
|
// topicName, pConsumer->consumerId, pConsumer->cgroup);
|
||||||
code = TSDB_CODE_MND_TOPIC_SUBSCRIBED;
|
// code = TSDB_CODE_MND_TOPIC_SUBSCRIBED;
|
||||||
goto END;
|
// goto END;
|
||||||
}
|
// }
|
||||||
|
//
|
||||||
if (checkTopic(pConsumer->rebNewTopics, topicName) || checkTopic(pConsumer->rebRemovedTopics, topicName)) {
|
// if (checkTopic(pConsumer->rebNewTopics, topicName) || checkTopic(pConsumer->rebRemovedTopics, topicName)) {
|
||||||
code = TSDB_CODE_MND_TOPIC_SUBSCRIBED;
|
// code = TSDB_CODE_MND_TOPIC_SUBSCRIBED;
|
||||||
mError("topic:%s, failed to drop since subscribed by consumer:%" PRId64 ", in consumer group %s (reb new)",
|
// mError("topic:%s, failed to drop since subscribed by consumer:%" PRId64 ", in consumer group %s (reb new)",
|
||||||
topicName, pConsumer->consumerId, pConsumer->cgroup);
|
// topicName, pConsumer->consumerId, pConsumer->cgroup);
|
||||||
goto END;
|
// goto END;
|
||||||
}
|
// }
|
||||||
sdbRelease(pSdb, pConsumer);
|
// sdbRelease(pSdb, pConsumer);
|
||||||
}
|
// }
|
||||||
|
//
|
||||||
END:
|
//END:
|
||||||
sdbRelease(pSdb, pConsumer);
|
// sdbRelease(pSdb, pConsumer);
|
||||||
sdbCancelFetch(pSdb, pIter);
|
// sdbCancelFetch(pSdb, pIter);
|
||||||
return code;
|
// return code;
|
||||||
}
|
//}
|
||||||
|
|
||||||
static int32_t mndDropCheckInfoByTopic(SMnode *pMnode, STrans *pTrans, SMqTopicObj *pTopic){
|
static int32_t mndDropCheckInfoByTopic(SMnode *pMnode, STrans *pTrans, SMqTopicObj *pTopic){
|
||||||
// broadcast to all vnode
|
// broadcast to all vnode
|
||||||
|
@ -722,9 +722,10 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
|
||||||
mndTransSetDbName(pTrans, pTopic->db, NULL);
|
mndTransSetDbName(pTrans, pTopic->db, NULL);
|
||||||
MND_TMQ_RETURN_CHECK(mndTransCheckConflict(pMnode, pTrans));
|
MND_TMQ_RETURN_CHECK(mndTransCheckConflict(pMnode, pTrans));
|
||||||
mInfo("trans:%d, used to drop topic:%s", pTrans->id, pTopic->name);
|
mInfo("trans:%d, used to drop topic:%s", pTrans->id, pTopic->name);
|
||||||
|
|
||||||
MND_TMQ_RETURN_CHECK(mndCheckTopicPrivilege(pMnode, pReq->info.conn.user, MND_OPER_DROP_TOPIC, pTopic));
|
MND_TMQ_RETURN_CHECK(mndCheckTopicPrivilege(pMnode, pReq->info.conn.user, MND_OPER_DROP_TOPIC, pTopic));
|
||||||
MND_TMQ_RETURN_CHECK(mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_READ_DB, pTopic->db));
|
MND_TMQ_RETURN_CHECK(mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_READ_DB, pTopic->db));
|
||||||
MND_TMQ_RETURN_CHECK(mndDropConsumerByTopic(pMnode, pTrans, dropReq.name));
|
// MND_TMQ_RETURN_CHECK(mndDropConsumerByTopic(pMnode, pTrans, dropReq.name));
|
||||||
MND_TMQ_RETURN_CHECK(mndDropSubByTopic(pMnode, pTrans, dropReq.name));
|
MND_TMQ_RETURN_CHECK(mndDropSubByTopic(pMnode, pTrans, dropReq.name));
|
||||||
|
|
||||||
if (pTopic->ntbUid != 0) {
|
if (pTopic->ntbUid != 0) {
|
||||||
|
|
|
@ -596,6 +596,7 @@ tmq_t* build_consumer() {
|
||||||
tmq_conf_set(conf, "enable.auto.commit", "true");
|
tmq_conf_set(conf, "enable.auto.commit", "true");
|
||||||
tmq_conf_set(conf, "auto.offset.reset", "earliest");
|
tmq_conf_set(conf, "auto.offset.reset", "earliest");
|
||||||
tmq_conf_set(conf, "msg.consume.excluded", "1");
|
tmq_conf_set(conf, "msg.consume.excluded", "1");
|
||||||
|
// tmq_conf_set(conf, "max.poll.interval.ms", "20000");
|
||||||
|
|
||||||
if (g_conf.snapShot) {
|
if (g_conf.snapShot) {
|
||||||
tmq_conf_set(conf, "experimental.snapshot.enable", "true");
|
tmq_conf_set(conf, "experimental.snapshot.enable", "true");
|
||||||
|
|
Loading…
Reference in New Issue