diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 70cf9c8b58..63fe4271d9 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2837,6 +2837,7 @@ typedef struct { int64_t consumerId; char cgroup[TSDB_CGROUP_LEN]; char clientId[TSDB_CLIENT_ID_LEN]; + char user[TSDB_USER_LEN]; SArray* topicNames; // SArray int8_t withTbName; @@ -2870,6 +2871,7 @@ static FORCE_INLINE int32_t tSerializeSCMSubscribeReq(void** buf, const SCMSubsc tlen += taosEncodeFixedI8(buf, pReq->enableBatchMeta); tlen += taosEncodeFixedI32(buf, pReq->sessionTimeoutMs); tlen += taosEncodeFixedI32(buf, pReq->maxPollIntervalMs); + tlen += taosEncodeString(buf, pReq->user); return tlen; } @@ -2904,6 +2906,7 @@ static FORCE_INLINE int32_t tDeserializeSCMSubscribeReq(void* buf, SCMSubscribeR if ((char*)buf - (char*)start < len) { buf = taosDecodeFixedI32(buf, &pReq->sessionTimeoutMs); buf = taosDecodeFixedI32(buf, &pReq->maxPollIntervalMs); + buf = taosDecodeStringTo(buf, pReq->user); } else { pReq->sessionTimeoutMs = DEFAULT_SESSION_TIMEOUT; pReq->maxPollIntervalMs = DEFAULT_MAX_POLL_INTERVAL; diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 197a65add8..61037c2e68 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -82,6 +82,7 @@ struct tmq_t { int64_t refId; char groupId[TSDB_CGROUP_LEN]; char clientId[TSDB_CLIENT_ID_LEN]; + char user[TSDB_USER_LEN]; int8_t withTbName; int8_t useSnapshot; int8_t autoCommit; @@ -1265,6 +1266,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { pTmq->replayEnable = conf->replayEnable; pTmq->sourceExcluded = conf->sourceExcluded; pTmq->enableBatchMeta = conf->enableBatchMeta; + tstrncpy(pTmq->user, user, TSDB_USER_LEN); if (conf->replayEnable) { pTmq->autoCommit = false; } @@ -1332,6 +1334,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { req.consumerId = tmq->consumerId; tstrncpy(req.clientId, tmq->clientId, TSDB_CLIENT_ID_LEN); tstrncpy(req.cgroup, tmq->groupId, TSDB_CGROUP_LEN); + tstrncpy(req.user, tmq->user, TSDB_USER_LEN); req.topicNames = taosArrayInit(sz, sizeof(void*)); if (req.topicNames == NULL) { diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 62e77867f6..0505f604a2 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -597,6 +597,7 @@ typedef struct { int64_t consumerId; char cgroup[TSDB_CGROUP_LEN]; char clientId[TSDB_CLIENT_ID_LEN]; + char user[TSDB_USER_LEN]; int8_t updateType; // used only for update int32_t epoch; int32_t status; diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 6116d2da19..5f3f794ef9 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -903,6 +903,14 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock * MND_TMQ_NULL_CHECK(pColInfo); MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)clientId, false)); + // user + char user[TSDB_USER_LEN + VARSTR_HEADER_SIZE] = {0}; + STR_TO_VARSTR(user, pConsumer->user); + + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + MND_TMQ_NULL_CHECK(pColInfo); + MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)user, false)); + // status const char *pStatusName = mndConsumerStatusName(pConsumer->status); status = taosMemoryCalloc(1, pShow->pMeta->pSchemas[cols].bytes); diff --git a/source/dnode/mnode/impl/src/mndDef.c b/source/dnode/mnode/impl/src/mndDef.c index 695bf4d30d..c7de16d824 100644 --- a/source/dnode/mnode/impl/src/mndDef.c +++ b/source/dnode/mnode/impl/src/mndDef.c @@ -429,6 +429,7 @@ int32_t tEncodeSMqConsumerObj(void **buf, const SMqConsumerObj *pConsumer) { tlen += taosEncodeFixedI32(buf, pConsumer->resetOffsetCfg); tlen += taosEncodeFixedI32(buf, pConsumer->maxPollIntervalMs); tlen += taosEncodeFixedI32(buf, pConsumer->sessionTimeoutMs); + tlen += taosEncodeString(buf, pConsumer->user); return tlen; } @@ -503,6 +504,7 @@ void *tDecodeSMqConsumerObj(const void *buf, SMqConsumerObj *pConsumer, int8_t s if (sver > 2){ buf = taosDecodeFixedI32(buf, &pConsumer->maxPollIntervalMs); buf = taosDecodeFixedI32(buf, &pConsumer->sessionTimeoutMs); + buf = taosDecodeStringTo(buf, pConsumer->user); } else{ pConsumer->maxPollIntervalMs = DEFAULT_MAX_POLL_INTERVAL; pConsumer->sessionTimeoutMs = DEFAULT_SESSION_TIMEOUT;