diff --git a/include/common/tmsg.h b/include/common/tmsg.h index a83aa4da44..c812138282 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2772,6 +2772,9 @@ enum { TOPIC_SUB_TYPE__COLUMN, }; +#define DEFAULT_MAX_POLL_INTERVAL 3000000 +#define DEFAULT_SESSION_TIMEOUT 10000 + typedef struct { char name[TSDB_TOPIC_FNAME_LEN]; // accout.topic int8_t igExists; @@ -2794,7 +2797,7 @@ typedef struct { typedef struct { int64_t consumerId; char cgroup[TSDB_CGROUP_LEN]; - char clientId[256]; + char clientId[TSDB_CLIENT_ID_LEN]; SArray* topicNames; // SArray int8_t withTbName; @@ -2803,6 +2806,8 @@ typedef struct { int8_t resetOffsetCfg; int8_t enableReplay; int8_t enableBatchMeta; + int32_t sessionTimeoutMs; + int32_t maxPollIntervalMs; } SCMSubscribeReq; static FORCE_INLINE int32_t tSerializeSCMSubscribeReq(void** buf, const SCMSubscribeReq* pReq) { @@ -2824,11 +2829,14 @@ static FORCE_INLINE int32_t tSerializeSCMSubscribeReq(void** buf, const SCMSubsc tlen += taosEncodeFixedI8(buf, pReq->resetOffsetCfg); tlen += taosEncodeFixedI8(buf, pReq->enableReplay); tlen += taosEncodeFixedI8(buf, pReq->enableBatchMeta); + tlen += taosEncodeFixedI32(buf, pReq->sessionTimeoutMs); + tlen += taosEncodeFixedI32(buf, pReq->maxPollIntervalMs); return tlen; } -static FORCE_INLINE void* tDeserializeSCMSubscribeReq(void* buf, SCMSubscribeReq* pReq) { +static FORCE_INLINE void* tDeserializeSCMSubscribeReq(void* buf, SCMSubscribeReq* pReq, int32_t len) { + void* start = buf; buf = taosDecodeFixedI64(buf, &pReq->consumerId); buf = taosDecodeStringTo(buf, pReq->cgroup); buf = taosDecodeStringTo(buf, pReq->clientId); @@ -2849,6 +2857,14 @@ static FORCE_INLINE void* tDeserializeSCMSubscribeReq(void* buf, SCMSubscribeReq buf = taosDecodeFixedI8(buf, &pReq->resetOffsetCfg); buf = taosDecodeFixedI8(buf, &pReq->enableReplay); buf = taosDecodeFixedI8(buf, &pReq->enableBatchMeta); + if (buf - start < len) { + buf = taosDecodeFixedI32(buf, &pReq->sessionTimeoutMs); + buf = taosDecodeFixedI32(buf, &pReq->maxPollIntervalMs); + } else { + pReq->sessionTimeoutMs = DEFAULT_SESSION_TIMEOUT; + pReq->maxPollIntervalMs = DEFAULT_MAX_POLL_INTERVAL; + } + return buf; } @@ -4060,6 +4076,7 @@ typedef struct { int64_t consumerId; int32_t epoch; SArray* topics; + int8_t pollFlag; } SMqHbReq; typedef struct { diff --git a/include/util/tdef.h b/include/util/tdef.h index 9c2858ed30..70358c861c 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -221,6 +221,8 @@ typedef enum ELogicConditionType { #define TSDB_TABLE_NAME_LEN 193 // it is a null-terminated string #define TSDB_TOPIC_NAME_LEN 193 // it is a null-terminated string #define TSDB_CGROUP_LEN 193 // it is a null-terminated string +#define TSDB_CLIENT_ID_LEN 256 // it is a null-terminated string +#define TSDB_CONSUMER_ID_LEN 32 // it is a null-terminated string #define TSDB_OFFSET_LEN 64 // it is a null-terminated string #define TSDB_USER_CGROUP_LEN (TSDB_USER_LEN + TSDB_CGROUP_LEN) // it is a null-terminated string #define TSDB_STREAM_NAME_LEN 193 // it is a null-terminated string diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 21d1a528da..2921b7b333 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -37,6 +37,7 @@ struct SMqMgmt { static TdThreadOnce tmqInit = PTHREAD_ONCE_INIT; // initialize only once volatile int32_t tmqInitRes = 0; // initialize rsp code static struct SMqMgmt tmqMgmt = {0}; +static int8_t pollFlag = 0; typedef struct { int32_t code; @@ -56,7 +57,7 @@ struct tmq_list_t { }; struct tmq_conf_t { - char clientId[256]; + char clientId[TSDB_CLIENT_ID_LEN]; char groupId[TSDB_CGROUP_LEN]; int8_t autoCommit; int8_t resetOffset; @@ -66,6 +67,9 @@ struct tmq_conf_t { int8_t sourceExcluded; // do not consume, bit uint16_t port; int32_t autoCommitInterval; + int32_t sessionTimeoutMs; + int32_t heartBeatIntervalMs; + int32_t maxPollIntervalMs; char* ip; char* user; char* pass; @@ -77,15 +81,18 @@ struct tmq_conf_t { struct tmq_t { int64_t refId; char groupId[TSDB_CGROUP_LEN]; - char clientId[256]; + char clientId[TSDB_CLIENT_ID_LEN]; int8_t withTbName; int8_t useSnapshot; int8_t autoCommit; int32_t autoCommitInterval; + int32_t sessionTimeoutMs; + int32_t heartBeatIntervalMs; + int32_t maxPollIntervalMs; int8_t resetOffsetCfg; int8_t replayEnable; int8_t sourceExcluded; // do not consume, bit - uint64_t consumerId; + int64_t consumerId; tmq_commit_cb* commitCb; void* commitCbUserParam; int8_t enableBatchMeta; @@ -272,6 +279,9 @@ tmq_conf_t* tmq_conf_new() { conf->autoCommitInterval = DEFAULT_AUTO_COMMIT_INTERVAL; conf->resetOffset = TMQ_OFFSET__RESET_LATEST; conf->enableBatchMeta = false; + conf->heartBeatIntervalMs = DEFAULT_HEARTBEAT_INTERVAL; + conf->maxPollIntervalMs = DEFAULT_MAX_POLL_INTERVAL; + conf->sessionTimeoutMs = DEFAULT_SESSION_TIMEOUT; return conf; } @@ -301,7 +311,7 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value } if (strcasecmp(key, "client.id") == 0) { - tstrncpy(conf->clientId, value, 256); + tstrncpy(conf->clientId, value, TSDB_CLIENT_ID_LEN); return TMQ_CONF_OK; } @@ -318,7 +328,38 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value } if (strcasecmp(key, "auto.commit.interval.ms") == 0) { - conf->autoCommitInterval = taosStr2int64(value); + int64_t tmp = taosStr2int64(value); + if (tmp < 0 || EINVAL == errno || ERANGE == errno) { + return TMQ_CONF_INVALID; + } + conf->autoCommitInterval = (tmp > INT32_MAX ? INT32_MAX : tmp); + return TMQ_CONF_OK; + } + + if (strcasecmp(key, "session.timeout.ms") == 0) { + int64_t tmp = taosStr2int64(value); + if (tmp < 6000 || tmp > 1800000){ + return TMQ_CONF_INVALID; + } + conf->sessionTimeoutMs = tmp; + return TMQ_CONF_OK; + } + + if (strcasecmp(key, "heartbeat.interval.ms") == 0) { + int64_t tmp = taosStr2int64(value); + if (tmp < 1000 || tmp >= conf->sessionTimeoutMs){ + return TMQ_CONF_INVALID; + } + conf->heartBeatIntervalMs = tmp; + return TMQ_CONF_OK; + } + + if (strcasecmp(key, "max.poll.interval.ms") == 0) { + int64_t tmp = taosStr2int64(value); + if (tmp < 1000 || tmp > INT32_MAX){ + return TMQ_CONF_INVALID; + } + conf->maxPollIntervalMs = tmp; return TMQ_CONF_OK; } @@ -377,7 +418,12 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value } if (strcasecmp(key, "td.connect.port") == 0) { - conf->port = taosStr2int64(value); + int64_t tmp = taosStr2int64(value); + if (tmp <= 0 || tmp > 65535) { + return TMQ_CONF_INVALID; + } + + conf->port = tmp; return TMQ_CONF_OK; } @@ -813,6 +859,7 @@ void tmqSendHbReq(void* param, void* tmrId) { SMqHbReq req = {0}; req.consumerId = tmq->consumerId; req.epoch = tmq->epoch; + req.pollFlag = atomic_load_8(&pollFlag); taosRLockLatch(&tmq->lock); req.topics = taosArrayInit(taosArrayGetSize(tmq->clientTopics), sizeof(TopicOffsetRows)); for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) { @@ -878,9 +925,10 @@ void tmqSendHbReq(void* param, void* tmrId) { tscError("tmqSendHbReq asyncSendMsgToServer failed"); } + atomic_val_compare_exchange_8(&pollFlag, 1, 0); OVER: tDestroySMqHbReq(&req); - taosTmrReset(tmqSendHbReq, DEFAULT_HEARTBEAT_INTERVAL, param, tmqMgmt.timer, &tmq->hbLiveTimer); + taosTmrReset(tmqSendHbReq, tmq->heartBeatIntervalMs, param, tmqMgmt.timer, &tmq->hbLiveTimer); taosReleaseRef(tmqMgmt.rsetId, refId); } @@ -1134,6 +1182,9 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { pTmq->useSnapshot = conf->snapEnable; pTmq->autoCommit = conf->autoCommit; pTmq->autoCommitInterval = conf->autoCommitInterval; + pTmq->sessionTimeoutMs = conf->sessionTimeoutMs; + pTmq->heartBeatIntervalMs = conf->heartBeatIntervalMs; + pTmq->maxPollIntervalMs = conf->maxPollIntervalMs; pTmq->commitCb = conf->commitCb; pTmq->commitCbUserParam = conf->commitCbUserParam; pTmq->resetOffsetCfg = conf->resetOffset; @@ -1173,7 +1224,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t)); *pRefId = pTmq->refId; - pTmq->hbLiveTimer = taosTmrStart(tmqSendHbReq, DEFAULT_HEARTBEAT_INTERVAL, pRefId, tmqMgmt.timer); + pTmq->hbLiveTimer = taosTmrStart(tmqSendHbReq, pTmq->heartBeatIntervalMs, pRefId, tmqMgmt.timer); char buf[TSDB_OFFSET_LEN] = {0}; STqOffsetVal offset = {.type = pTmq->resetOffsetCfg}; @@ -1203,7 +1254,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { tscInfo("consumer:0x%" PRIx64 " cgroup:%s, subscribe %d topics", tmq->consumerId, tmq->groupId, sz); req.consumerId = tmq->consumerId; - tstrncpy(req.clientId, tmq->clientId, 256); + tstrncpy(req.clientId, tmq->clientId, TSDB_CLIENT_ID_LEN); tstrncpy(req.cgroup, tmq->groupId, TSDB_CGROUP_LEN); req.topicNames = taosArrayInit(sz, sizeof(void*)); @@ -1215,6 +1266,8 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { req.withTbName = tmq->withTbName; req.autoCommit = tmq->autoCommit; req.autoCommitInterval = tmq->autoCommitInterval; + req.sessionTimeoutMs = tmq->sessionTimeoutMs; + req.maxPollIntervalMs = tmq->maxPollIntervalMs; req.resetOffsetCfg = tmq->resetOffsetCfg; req.enableReplay = tmq->replayEnable; req.enableBatchMeta = tmq->enableBatchMeta; @@ -2207,6 +2260,8 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { } } + atomic_val_compare_exchange_8(&pollFlag, 0, 1); + while (1) { tmqHandleAllDelayedTask(tmq); diff --git a/source/common/src/systable.c b/source/common/src/systable.c index 0c0073b4a7..2d69a687a6 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -482,16 +482,16 @@ static const SSysDbTableSchema connectionsSchema[] = { static const SSysDbTableSchema consumerSchema[] = { - {.name = "consumer_id", .bytes = 32, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, - {.name = "consumer_group", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, - {.name = "client_id", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, + {.name = "consumer_id", .bytes = TSDB_CONSUMER_ID_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, + {.name = "consumer_group", .bytes = TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, + {.name = "client_id", .bytes = TSDB_CLIENT_ID_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, {.name = "status", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, {.name = "topics", .bytes = TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, /*{.name = "end_point", .bytes = TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},*/ {.name = "up_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false}, {.name = "subscribe_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false}, {.name = "rebalance_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false}, - {.name = "parameters", .bytes = 64 + TSDB_OFFSET_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, + {.name = "parameters", .bytes = 128 + TSDB_OFFSET_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, }; static const SSysDbTableSchema offsetSchema[] = { diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 10719674f5..3612e6553c 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -7002,6 +7002,7 @@ int32_t tSerializeSMqHbReq(void *buf, int32_t bufLen, SMqHbReq *pReq) { } } + if (tEncodeI8(&encoder, pReq->pollFlag) < 0) return -1; tEndEncode(&encoder); int32_t tlen = encoder.pos; @@ -7041,6 +7042,9 @@ int32_t tDeserializeSMqHbReq(void *buf, int32_t bufLen, SMqHbReq *pReq) { } } } + if (!tDecodeIsEnd(&decoder)) { + if (tDecodeI8(&decoder, &pReq->pollFlag) < 0) return -1; + } tEndDecode(&decoder); tDecoderClear(&decoder); diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 089c4a10b3..b700c440a5 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -596,11 +596,12 @@ typedef struct { typedef struct { int64_t consumerId; char cgroup[TSDB_CGROUP_LEN]; - char clientId[256]; + char clientId[TSDB_CLIENT_ID_LEN]; int8_t updateType; // used only for update int32_t epoch; int32_t status; int32_t hbStatus; // hbStatus is not applicable to serialization + int32_t pollStatus; // pollStatus is not applicable to serialization SRWLatch lock; // lock is used for topics update SArray* currentTopics; // SArray SArray* rebNewTopics; // SArray @@ -620,6 +621,8 @@ typedef struct { int8_t autoCommit; int32_t autoCommitInterval; int32_t resetOffsetCfg; + int32_t sessionTimeoutMs; + int32_t maxPollIntervalMs; } SMqConsumerObj; SMqConsumerObj *tNewSMqConsumerObj(int64_t consumerId, char *cgroup, int8_t updateType, char *topic, SCMSubscribeReq *subscribe); diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 9a7a8155ec..c37739252c 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -25,7 +25,7 @@ #include "tcompare.h" #include "tname.h" -#define MND_CONSUMER_VER_NUMBER 2 +#define MND_CONSUMER_VER_NUMBER 3 #define MND_CONSUMER_RESERVE_SIZE 64 #define MND_MAX_GROUP_PER_TOPIC 100 @@ -40,7 +40,7 @@ static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg); static int32_t mndProcessAskEpReq(SRpcMsg *pMsg); static int32_t mndProcessMqHbReq(SRpcMsg *pMsg); static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg); -static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg); +//static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg); int32_t mndInitConsumer(SMnode *pMnode) { SSdbTable table = { @@ -57,7 +57,7 @@ int32_t mndInitConsumer(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_MND_TMQ_HB, mndProcessMqHbReq); mndSetMsgHandle(pMnode, TDMT_MND_TMQ_ASK_EP, mndProcessAskEpReq); // mndSetMsgHandle(pMnode, TDMT_MND_TMQ_TIMER, mndProcessMqTimerMsg); - mndSetMsgHandle(pMnode, TDMT_MND_TMQ_CONSUMER_RECOVER, mndProcessConsumerRecoverMsg); +// mndSetMsgHandle(pMnode, TDMT_MND_TMQ_CONSUMER_RECOVER, mndProcessConsumerRecoverMsg); mndSetMsgHandle(pMnode, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, mndProcessConsumerClearMsg); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONSUMERS, mndRetrieveConsumer); @@ -144,56 +144,56 @@ FAILED: return code; } -static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) { - int32_t code = 0; - SMnode *pMnode = pMsg->info.node; - SMqConsumerRecoverMsg *pRecoverMsg = pMsg->pCont; - SMqConsumerObj *pConsumerNew = NULL; - STrans *pTrans = NULL; - SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, pRecoverMsg->consumerId); - if (pConsumer == NULL) { - mError("cannot find consumer %" PRId64 " when processing consumer recover msg", pRecoverMsg->consumerId); - code = -1; - goto END; - } - - mInfo("receive consumer recover msg, consumer:0x%" PRIx64 " status:%d(%s)", pRecoverMsg->consumerId, - pConsumer->status, mndConsumerStatusName(pConsumer->status)); - - if (pConsumer->status != MQ_CONSUMER_STATUS_LOST) { - terrno = TSDB_CODE_MND_CONSUMER_NOT_READY; - code = -1; - goto END; - } - - pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup, CONSUMER_UPDATE_REC, NULL, NULL); - if (pConsumerNew == NULL){ - code = -1; - goto END; - } - - pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "recover-csm"); - if (pTrans == NULL) { - code = -1; - goto END; - } - code = validateTopics(pConsumer->assignedTopics, pMnode, pMsg->info.conn.user, false); - if (code != 0) { - goto END; - } - - code = mndSetConsumerCommitLogs(pTrans, pConsumerNew); - if (code != 0) { - goto END; - } - - code = mndTransPrepare(pMnode, pTrans); -END: - mndReleaseConsumer(pMnode, pConsumer); - tDeleteSMqConsumerObj(pConsumerNew); - mndTransDrop(pTrans); - return code; -} +//static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) { +// int32_t code = 0; +// SMnode *pMnode = pMsg->info.node; +// SMqConsumerRecoverMsg *pRecoverMsg = pMsg->pCont; +// SMqConsumerObj *pConsumerNew = NULL; +// STrans *pTrans = NULL; +// SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, pRecoverMsg->consumerId); +// if (pConsumer == NULL) { +// mError("cannot find consumer %" PRId64 " when processing consumer recover msg", pRecoverMsg->consumerId); +// code = -1; +// goto END; +// } +// +// mInfo("receive consumer recover msg, consumer:0x%" PRIx64 " status:%d(%s)", pRecoverMsg->consumerId, +// pConsumer->status, mndConsumerStatusName(pConsumer->status)); +// +// if (pConsumer->status != MQ_CONSUMER_STATUS_LOST) { +// terrno = TSDB_CODE_MND_CONSUMER_NOT_READY; +// code = -1; +// goto END; +// } +// +// pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup, CONSUMER_UPDATE_REC, NULL, NULL); +// if (pConsumerNew == NULL){ +// code = -1; +// goto END; +// } +// +// pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "recover-csm"); +// if (pTrans == NULL) { +// code = -1; +// goto END; +// } +// code = validateTopics(pConsumer->assignedTopics, pMnode, pMsg->info.conn.user, false); +// if (code != 0) { +// goto END; +// } +// +// code = mndSetConsumerCommitLogs(pTrans, pConsumerNew); +// if (code != 0) { +// goto END; +// } +// +// code = mndTransPrepare(pMnode, pTrans); +//END: +// mndReleaseConsumer(pMnode, pConsumer); +// tDeleteSMqConsumerObj(pConsumerNew); +// mndTransDrop(pTrans); +// return code; +//} static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg) { int32_t code = 0; @@ -328,13 +328,15 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) { } atomic_store_32(&pConsumer->hbStatus, 0); - - int32_t status = atomic_load_32(&pConsumer->status); - - if (status == MQ_CONSUMER_STATUS_LOST) { - mInfo("try to recover consumer:0x%" PRIx64, consumerId); - mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_CONSUMER_RECOVER, &pMsg->info); + if (req.pollFlag == 1){ + atomic_store_32(&pConsumer->pollStatus, 0); } +// int32_t status = atomic_load_32(&pConsumer->status); +// +// if (status == MQ_CONSUMER_STATUS_LOST) { +// mInfo("try to recover consumer:0x%" PRIx64, consumerId); +// mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_CONSUMER_RECOVER, &pMsg->info); +// } storeOffsetRows(pMnode, &req, pConsumer); code = buildMqHbRsp(pMsg, &rsp); @@ -480,14 +482,12 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { goto END; } - atomic_store_32(&pConsumer->hbStatus, 0); - // 1. check consumer status int32_t status = atomic_load_32(&pConsumer->status); - - if (status == MQ_CONSUMER_STATUS_LOST) { - mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_CONSUMER_RECOVER, &pMsg->info); - } +// +// if (status == MQ_CONSUMER_STATUS_LOST) { +// mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_CONSUMER_RECOVER, &pMsg->info); +// } if (status != MQ_CONSUMER_STATUS_READY) { mInfo("consumer:0x%" PRIx64 " not ready, status: %s", consumerId, mndConsumerStatusName(status)); @@ -652,7 +652,7 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { int32_t code = 0; SCMSubscribeReq subscribe = {0}; - tDeserializeSCMSubscribeReq(msgStr, &subscribe); + tDeserializeSCMSubscribeReq(msgStr, &subscribe, pMsg->contLen); SMqConsumerObj *pConsumerNew = NULL; STrans *pTrans = NULL; @@ -806,17 +806,17 @@ static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer) { return 0; } -static void updateConsumerStatus(SMqConsumerObj *pConsumer) { - int32_t status = pConsumer->status; - - if (taosArrayGetSize(pConsumer->rebNewTopics) == 0 && taosArrayGetSize(pConsumer->rebRemovedTopics) == 0) { - if (status == MQ_CONSUMER_STATUS_REBALANCE) { - pConsumer->status = MQ_CONSUMER_STATUS_READY; - } else if (status == MQ_CONSUMER_STATUS_READY && taosArrayGetSize(pConsumer->currentTopics) == 0) { - pConsumer->status = MQ_CONSUMER_STATUS_LOST; - } - } -} +//static void updateConsumerStatus(SMqConsumerObj *pConsumer) { +// int32_t status = pConsumer->status; +// +// if (taosArrayGetSize(pConsumer->rebNewTopics) == 0 && taosArrayGetSize(pConsumer->rebRemovedTopics) == 0) { +// if (status == MQ_CONSUMER_STATUS_REBALANCE) { +// pConsumer->status = MQ_CONSUMER_STATUS_READY; +// } else if (status == MQ_CONSUMER_STATUS_READY && taosArrayGetSize(pConsumer->currentTopics) == 0) { +// pConsumer->status = MQ_CONSUMER_STATUS_LOST; +// } +// } +//} // remove from topic list static void removeFromTopicList(SArray *topicList, const char *pTopic, int64_t consumerId, char *type) { @@ -863,14 +863,14 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, pOldConsumer->subscribeTime = taosGetTimestampMs(); pOldConsumer->status = MQ_CONSUMER_STATUS_REBALANCE; mInfo("consumer:0x%" PRIx64 " subscribe update, modify existed consumer", pOldConsumer->consumerId); - } else if (pNewConsumer->updateType == CONSUMER_UPDATE_REC) { - int32_t sz = taosArrayGetSize(pOldConsumer->assignedTopics); - for (int32_t i = 0; i < sz; i++) { - char *topic = taosStrdup(taosArrayGetP(pOldConsumer->assignedTopics, i)); - taosArrayPush(pOldConsumer->rebNewTopics, &topic); - } - pOldConsumer->status = MQ_CONSUMER_STATUS_REBALANCE; - mInfo("consumer:0x%" PRIx64 " recover update", pOldConsumer->consumerId); +// } else if (pNewConsumer->updateType == CONSUMER_UPDATE_REC) { +// int32_t sz = taosArrayGetSize(pOldConsumer->assignedTopics); +// for (int32_t i = 0; i < sz; i++) { +// char *topic = taosStrdup(taosArrayGetP(pOldConsumer->assignedTopics, i)); +// taosArrayPush(pOldConsumer->rebNewTopics, &topic); +// } +// pOldConsumer->status = MQ_CONSUMER_STATUS_REBALANCE; +// mInfo("consumer:0x%" PRIx64 " recover update", pOldConsumer->consumerId); } else if (pNewConsumer->updateType == CONSUMER_UPDATE_REB) { atomic_add_fetch_32(&pOldConsumer->epoch, 1); @@ -889,7 +889,11 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, } int32_t status = pOldConsumer->status; - updateConsumerStatus(pOldConsumer); +// updateConsumerStatus(pOldConsumer); + if (taosArrayGetSize(pOldConsumer->rebNewTopics) == 0 && taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0) { + pOldConsumer->status = MQ_CONSUMER_STATUS_READY; + } + pOldConsumer->rebalanceTime = taosGetTimestampMs(); atomic_add_fetch_32(&pOldConsumer->epoch, 1); @@ -906,7 +910,10 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, removeFromTopicList(pOldConsumer->currentTopics, topic, pOldConsumer->consumerId, "current"); int32_t status = pOldConsumer->status; - updateConsumerStatus(pOldConsumer); +// updateConsumerStatus(pOldConsumer); + if (taosArrayGetSize(pOldConsumer->rebNewTopics) == 0 && taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0) { + pOldConsumer->status = MQ_CONSUMER_STATUS_READY; + } pOldConsumer->rebalanceTime = taosGetTimestampMs(); atomic_add_fetch_32(&pOldConsumer->epoch, 1); @@ -973,7 +980,7 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock * int32_t cols = 0; // consumer id - char consumerIdHex[32] = {0}; + char consumerIdHex[TSDB_CONSUMER_ID_LEN + VARSTR_HEADER_SIZE] = {0}; sprintf(varDataVal(consumerIdHex), "0x%" PRIx64, pConsumer->consumerId); varDataSetLen(consumerIdHex, strlen(varDataVal(consumerIdHex))); @@ -988,19 +995,20 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock * colDataSetVal(pColInfo, numOfRows, (const char *)cgroup, false); // client id - char clientId[256 + VARSTR_HEADER_SIZE] = {0}; + char clientId[TSDB_CLIENT_ID_LEN + VARSTR_HEADER_SIZE] = {0}; STR_TO_VARSTR(clientId, pConsumer->clientId); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, (const char *)clientId, false); // status - char status[20 + VARSTR_HEADER_SIZE] = {0}; const char *pStatusName = mndConsumerStatusName(pConsumer->status); + char *status = taosMemoryCalloc(1, pShow->pMeta->pSchemas[cols].bytes); STR_TO_VARSTR(status, pStatusName); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, (const char *)status, false); + taosMemoryFree(status); // one subscribed topic pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); @@ -1033,14 +1041,14 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock * STqOffsetVal pVal = {.type = pConsumer->resetOffsetCfg}; tFormatOffset(buf, TSDB_OFFSET_LEN, &pVal); - char parasStr[64 + TSDB_OFFSET_LEN + VARSTR_HEADER_SIZE] = {0}; - sprintf(varDataVal(parasStr), "tbname:%d,commit:%d,interval:%dms,reset:%s", pConsumer->withTbName, - pConsumer->autoCommit, pConsumer->autoCommitInterval, buf); + char *parasStr = taosMemoryCalloc(1, pShow->pMeta->pSchemas[cols].bytes); + sprintf(varDataVal(parasStr), "tbname:%d,commit:%d,interval:%dms,reset:%s,maxPoll:%d,timeout:%d", pConsumer->withTbName, + pConsumer->autoCommit, pConsumer->autoCommitInterval, buf, pConsumer->maxPollIntervalMs, pConsumer->sessionTimeoutMs); varDataSetLen(parasStr, strlen(varDataVal(parasStr))); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, (const char *)parasStr, false); - + taosMemoryFree(parasStr); numOfRows++; } @@ -1063,8 +1071,8 @@ const char *mndConsumerStatusName(int status) { switch (status) { case MQ_CONSUMER_STATUS_READY: return "ready"; - case MQ_CONSUMER_STATUS_LOST: - return "lost"; +// case MQ_CONSUMER_STATUS_LOST: +// return "lost"; case MQ_CONSUMER_STATUS_REBALANCE: return "rebalancing"; default: diff --git a/source/dnode/mnode/impl/src/mndDef.c b/source/dnode/mnode/impl/src/mndDef.c index 5164557184..1373691cdd 100644 --- a/source/dnode/mnode/impl/src/mndDef.c +++ b/source/dnode/mnode/impl/src/mndDef.c @@ -266,6 +266,7 @@ SMqConsumerObj *tNewSMqConsumerObj(int64_t consumerId, char *cgroup, int8_t upda pConsumer->epoch = 0; pConsumer->status = MQ_CONSUMER_STATUS_REBALANCE; pConsumer->hbStatus = 0; + pConsumer->pollStatus = 0; taosInitRWLatch(&pConsumer->lock); pConsumer->createTime = taosGetTimestampMs(); @@ -294,6 +295,8 @@ SMqConsumerObj *tNewSMqConsumerObj(int64_t consumerId, char *cgroup, int8_t upda pConsumer->autoCommit = subscribe->autoCommit; pConsumer->autoCommitInterval = subscribe->autoCommitInterval; pConsumer->resetOffsetCfg = subscribe->resetOffsetCfg; + pConsumer->maxPollIntervalMs = subscribe->maxPollIntervalMs; + pConsumer->sessionTimeoutMs = subscribe->sessionTimeoutMs; pConsumer->rebNewTopics = taosArrayDup(subscribe->topicNames, topicNameDup); @@ -396,6 +399,8 @@ int32_t tEncodeSMqConsumerObj(void **buf, const SMqConsumerObj *pConsumer) { tlen += taosEncodeFixedI8(buf, pConsumer->autoCommit); tlen += taosEncodeFixedI32(buf, pConsumer->autoCommitInterval); tlen += taosEncodeFixedI32(buf, pConsumer->resetOffsetCfg); + tlen += taosEncodeFixedI32(buf, pConsumer->maxPollIntervalMs); + tlen += taosEncodeFixedI32(buf, pConsumer->sessionTimeoutMs); return tlen; } @@ -456,6 +461,14 @@ void *tDecodeSMqConsumerObj(const void *buf, SMqConsumerObj *pConsumer, int8_t s buf = taosDecodeFixedI32(buf, &pConsumer->autoCommitInterval); buf = taosDecodeFixedI32(buf, &pConsumer->resetOffsetCfg); } + if (sver > 2){ + buf = taosDecodeFixedI32(buf, &pConsumer->maxPollIntervalMs); + buf = taosDecodeFixedI32(buf, &pConsumer->sessionTimeoutMs); + } else{ + pConsumer->maxPollIntervalMs = DEFAULT_MAX_POLL_INTERVAL; + pConsumer->sessionTimeoutMs = DEFAULT_SESSION_TIMEOUT; + } + return (void *)buf; } diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index e2bedc258a..d2615d7b2f 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -27,8 +27,7 @@ #define MND_SUBSCRIBE_VER_NUMBER 3 #define MND_SUBSCRIBE_RESERVE_SIZE 64 -#define MND_CONSUMER_LOST_HB_CNT 6 -#define MND_CONSUMER_LOST_CLEAR_THRESHOLD 43200 +//#define MND_CONSUMER_LOST_HB_CNT 6 static int32_t mqRebInExecCnt = 0; @@ -234,7 +233,7 @@ static void processRemovedConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, c int32_t numOfRemoved = taosArrayGetSize(pInput->pRebInfo->removedConsumers); int32_t actualRemoved = 0; for (int32_t i = 0; i < numOfRemoved; i++) { - uint64_t consumerId = *(uint64_t *)taosArrayGet(pInput->pRebInfo->removedConsumers, i); + int64_t consumerId = *(int64_t *)taosArrayGet(pInput->pRebInfo->removedConsumers, i); SMqConsumerEp *pConsumerEp = taosHashGet(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t)); if (pConsumerEp == NULL) { continue; @@ -378,12 +377,10 @@ static int32_t processRemoveAddVgs(SMnode *pMnode, SMqRebOutputObj *pOutput) { } } - if (taosArrayGetSize(pOutput->pSub->unassignedVgs) == 0 && taosArrayGetSize(newVgs) != 0) { + if (taosArrayGetSize(newVgs) != 0) { taosArrayAddAll(pOutput->pSub->unassignedVgs, newVgs); mInfo("[rebalance] processRemoveAddVgs add new vg num:%d", (int)taosArrayGetSize(newVgs)); taosArrayDestroy(newVgs); - } else { - taosArrayDestroyP(newVgs, (FDelete)tDeleteSMqVgEp); } return totalVgNum; } @@ -678,7 +675,7 @@ static void freeRebalanceItem(void *param) { static void buildRebInfo(SHashObj *rebSubHash, SArray *topicList, int8_t type, char *group, int64_t consumerId) { int32_t topicNum = taosArrayGetSize(topicList); for (int32_t i = 0; i < topicNum; i++) { - char key[TSDB_SUBSCRIBE_KEY_LEN]; + char key[TSDB_SUBSCRIBE_KEY_LEN] = {0}; char *removedTopic = taosArrayGetP(topicList, i); mndMakeSubscribeKey(key, group, removedTopic); SMqRebInfo *pRebSub = mndGetOrCreateRebSub(rebSubHash, key); @@ -707,7 +704,7 @@ static void checkForVgroupSplit(SMnode *pMnode, SMqConsumerObj *pConsumer, SHash SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j); SVgObj *pVgroup = mndAcquireVgroup(pMnode, pVgEp->vgId); if (!pVgroup) { - char key[TSDB_SUBSCRIBE_KEY_LEN]; + char key[TSDB_SUBSCRIBE_KEY_LEN] = {0}; mndMakeSubscribeKey(key, pConsumer->cgroup, topic); mndGetOrCreateRebSub(rebSubHash, key); mInfo("vnode splitted, vgId:%d rebalance will be triggered", pVgEp->vgId); @@ -733,27 +730,25 @@ static void mndCheckConsumer(SRpcMsg *pMsg, SHashObj *rebSubHash) { } int32_t hbStatus = atomic_add_fetch_32(&pConsumer->hbStatus, 1); + int32_t pollStatus = atomic_add_fetch_32(&pConsumer->pollStatus, 1); int32_t status = atomic_load_32(&pConsumer->status); mDebug("[rebalance] check for consumer:0x%" PRIx64 " status:%d(%s), sub-time:%" PRId64 ", createTime:%" PRId64 - ", hbstatus:%d", + ", hbstatus:%d, pollStatus:%d", pConsumer->consumerId, status, mndConsumerStatusName(status), pConsumer->subscribeTime, - pConsumer->createTime, hbStatus); + pConsumer->createTime, hbStatus, pollStatus); if (status == MQ_CONSUMER_STATUS_READY) { - if (taosArrayGetSize(pConsumer->assignedTopics) == 0) { // unsubscribe or close + if (taosArrayGetSize(pConsumer->currentTopics) == 0) { // unsubscribe or close mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, &pMsg->info); - } else if (hbStatus > MND_CONSUMER_LOST_HB_CNT) { + } else if (hbStatus * tsMqRebalanceInterval * 1000 >= pConsumer->sessionTimeoutMs || + pollStatus * tsMqRebalanceInterval * 1000 >= pConsumer->maxPollIntervalMs) { taosRLockLatch(&pConsumer->lock); buildRebInfo(rebSubHash, pConsumer->currentTopics, 0, pConsumer->cgroup, pConsumer->consumerId); taosRUnLockLatch(&pConsumer->lock); } else { checkForVgroupSplit(pMnode, pConsumer, rebSubHash); } - } else if (status == MQ_CONSUMER_STATUS_LOST) { - if (hbStatus > MND_CONSUMER_LOST_CLEAR_THRESHOLD) { // clear consumer if lost a day - mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, &pMsg->info); - } } else { taosRLockLatch(&pConsumer->lock); buildRebInfo(rebSubHash, pConsumer->rebNewTopics, 1, pConsumer->cgroup, pConsumer->consumerId); @@ -832,8 +827,8 @@ static int32_t buildRebOutput(SMnode *pMnode, SMqRebInputObj *rebInput, SMqRebOu if (pSub == NULL) { // split sub key and extract topic - char topic[TSDB_TOPIC_FNAME_LEN]; - char cgroup[TSDB_CGROUP_LEN]; + char topic[TSDB_TOPIC_FNAME_LEN] = {0}; + char cgroup[TSDB_CGROUP_LEN] = {0}; mndSplitSubscribeKey(key, topic, cgroup, true); SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic); @@ -878,7 +873,7 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) { SMnode *pMnode = pMsg->info.node; mDebug("[rebalance] start to process mq timer") - if (!mndRebTryStart()) { + if (!mndRebTryStart()) { mInfo("[rebalance] mq rebalance already in progress, do nothing") return code; }