fix:[TS-5156]add user in show consumers
This commit is contained in:
parent
fa7afafe6d
commit
d5b990e4ed
|
@ -2837,6 +2837,7 @@ typedef struct {
|
||||||
int64_t consumerId;
|
int64_t consumerId;
|
||||||
char cgroup[TSDB_CGROUP_LEN];
|
char cgroup[TSDB_CGROUP_LEN];
|
||||||
char clientId[TSDB_CLIENT_ID_LEN];
|
char clientId[TSDB_CLIENT_ID_LEN];
|
||||||
|
char user[TSDB_USER_LEN];
|
||||||
SArray* topicNames; // SArray<char**>
|
SArray* topicNames; // SArray<char**>
|
||||||
|
|
||||||
int8_t withTbName;
|
int8_t withTbName;
|
||||||
|
@ -2870,6 +2871,7 @@ static FORCE_INLINE int32_t tSerializeSCMSubscribeReq(void** buf, const SCMSubsc
|
||||||
tlen += taosEncodeFixedI8(buf, pReq->enableBatchMeta);
|
tlen += taosEncodeFixedI8(buf, pReq->enableBatchMeta);
|
||||||
tlen += taosEncodeFixedI32(buf, pReq->sessionTimeoutMs);
|
tlen += taosEncodeFixedI32(buf, pReq->sessionTimeoutMs);
|
||||||
tlen += taosEncodeFixedI32(buf, pReq->maxPollIntervalMs);
|
tlen += taosEncodeFixedI32(buf, pReq->maxPollIntervalMs);
|
||||||
|
tlen += taosEncodeString(buf, pReq->user);
|
||||||
|
|
||||||
return tlen;
|
return tlen;
|
||||||
}
|
}
|
||||||
|
@ -2904,6 +2906,7 @@ static FORCE_INLINE int32_t tDeserializeSCMSubscribeReq(void* buf, SCMSubscribeR
|
||||||
if ((char*)buf - (char*)start < len) {
|
if ((char*)buf - (char*)start < len) {
|
||||||
buf = taosDecodeFixedI32(buf, &pReq->sessionTimeoutMs);
|
buf = taosDecodeFixedI32(buf, &pReq->sessionTimeoutMs);
|
||||||
buf = taosDecodeFixedI32(buf, &pReq->maxPollIntervalMs);
|
buf = taosDecodeFixedI32(buf, &pReq->maxPollIntervalMs);
|
||||||
|
buf = taosDecodeStringTo(buf, pReq->user);
|
||||||
} else {
|
} else {
|
||||||
pReq->sessionTimeoutMs = DEFAULT_SESSION_TIMEOUT;
|
pReq->sessionTimeoutMs = DEFAULT_SESSION_TIMEOUT;
|
||||||
pReq->maxPollIntervalMs = DEFAULT_MAX_POLL_INTERVAL;
|
pReq->maxPollIntervalMs = DEFAULT_MAX_POLL_INTERVAL;
|
||||||
|
|
|
@ -82,6 +82,7 @@ struct tmq_t {
|
||||||
int64_t refId;
|
int64_t refId;
|
||||||
char groupId[TSDB_CGROUP_LEN];
|
char groupId[TSDB_CGROUP_LEN];
|
||||||
char clientId[TSDB_CLIENT_ID_LEN];
|
char clientId[TSDB_CLIENT_ID_LEN];
|
||||||
|
char user[TSDB_USER_LEN];
|
||||||
int8_t withTbName;
|
int8_t withTbName;
|
||||||
int8_t useSnapshot;
|
int8_t useSnapshot;
|
||||||
int8_t autoCommit;
|
int8_t autoCommit;
|
||||||
|
@ -1265,6 +1266,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
|
||||||
pTmq->replayEnable = conf->replayEnable;
|
pTmq->replayEnable = conf->replayEnable;
|
||||||
pTmq->sourceExcluded = conf->sourceExcluded;
|
pTmq->sourceExcluded = conf->sourceExcluded;
|
||||||
pTmq->enableBatchMeta = conf->enableBatchMeta;
|
pTmq->enableBatchMeta = conf->enableBatchMeta;
|
||||||
|
tstrncpy(pTmq->user, user, TSDB_USER_LEN);
|
||||||
if (conf->replayEnable) {
|
if (conf->replayEnable) {
|
||||||
pTmq->autoCommit = false;
|
pTmq->autoCommit = false;
|
||||||
}
|
}
|
||||||
|
@ -1332,6 +1334,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
|
||||||
req.consumerId = tmq->consumerId;
|
req.consumerId = tmq->consumerId;
|
||||||
tstrncpy(req.clientId, tmq->clientId, TSDB_CLIENT_ID_LEN);
|
tstrncpy(req.clientId, tmq->clientId, TSDB_CLIENT_ID_LEN);
|
||||||
tstrncpy(req.cgroup, tmq->groupId, TSDB_CGROUP_LEN);
|
tstrncpy(req.cgroup, tmq->groupId, TSDB_CGROUP_LEN);
|
||||||
|
tstrncpy(req.user, tmq->user, TSDB_USER_LEN);
|
||||||
|
|
||||||
req.topicNames = taosArrayInit(sz, sizeof(void*));
|
req.topicNames = taosArrayInit(sz, sizeof(void*));
|
||||||
if (req.topicNames == NULL) {
|
if (req.topicNames == NULL) {
|
||||||
|
|
|
@ -597,6 +597,7 @@ typedef struct {
|
||||||
int64_t consumerId;
|
int64_t consumerId;
|
||||||
char cgroup[TSDB_CGROUP_LEN];
|
char cgroup[TSDB_CGROUP_LEN];
|
||||||
char clientId[TSDB_CLIENT_ID_LEN];
|
char clientId[TSDB_CLIENT_ID_LEN];
|
||||||
|
char user[TSDB_USER_LEN];
|
||||||
int8_t updateType; // used only for update
|
int8_t updateType; // used only for update
|
||||||
int32_t epoch;
|
int32_t epoch;
|
||||||
int32_t status;
|
int32_t status;
|
||||||
|
|
|
@ -903,6 +903,14 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *
|
||||||
MND_TMQ_NULL_CHECK(pColInfo);
|
MND_TMQ_NULL_CHECK(pColInfo);
|
||||||
MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)clientId, false));
|
MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)clientId, false));
|
||||||
|
|
||||||
|
// user
|
||||||
|
char user[TSDB_USER_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||||
|
STR_TO_VARSTR(user, pConsumer->user);
|
||||||
|
|
||||||
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
MND_TMQ_NULL_CHECK(pColInfo);
|
||||||
|
MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)user, false));
|
||||||
|
|
||||||
// status
|
// status
|
||||||
const char *pStatusName = mndConsumerStatusName(pConsumer->status);
|
const char *pStatusName = mndConsumerStatusName(pConsumer->status);
|
||||||
status = taosMemoryCalloc(1, pShow->pMeta->pSchemas[cols].bytes);
|
status = taosMemoryCalloc(1, pShow->pMeta->pSchemas[cols].bytes);
|
||||||
|
|
|
@ -429,6 +429,7 @@ int32_t tEncodeSMqConsumerObj(void **buf, const SMqConsumerObj *pConsumer) {
|
||||||
tlen += taosEncodeFixedI32(buf, pConsumer->resetOffsetCfg);
|
tlen += taosEncodeFixedI32(buf, pConsumer->resetOffsetCfg);
|
||||||
tlen += taosEncodeFixedI32(buf, pConsumer->maxPollIntervalMs);
|
tlen += taosEncodeFixedI32(buf, pConsumer->maxPollIntervalMs);
|
||||||
tlen += taosEncodeFixedI32(buf, pConsumer->sessionTimeoutMs);
|
tlen += taosEncodeFixedI32(buf, pConsumer->sessionTimeoutMs);
|
||||||
|
tlen += taosEncodeString(buf, pConsumer->user);
|
||||||
return tlen;
|
return tlen;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -503,6 +504,7 @@ void *tDecodeSMqConsumerObj(const void *buf, SMqConsumerObj *pConsumer, int8_t s
|
||||||
if (sver > 2){
|
if (sver > 2){
|
||||||
buf = taosDecodeFixedI32(buf, &pConsumer->maxPollIntervalMs);
|
buf = taosDecodeFixedI32(buf, &pConsumer->maxPollIntervalMs);
|
||||||
buf = taosDecodeFixedI32(buf, &pConsumer->sessionTimeoutMs);
|
buf = taosDecodeFixedI32(buf, &pConsumer->sessionTimeoutMs);
|
||||||
|
buf = taosDecodeStringTo(buf, pConsumer->user);
|
||||||
} else{
|
} else{
|
||||||
pConsumer->maxPollIntervalMs = DEFAULT_MAX_POLL_INTERVAL;
|
pConsumer->maxPollIntervalMs = DEFAULT_MAX_POLL_INTERVAL;
|
||||||
pConsumer->sessionTimeoutMs = DEFAULT_SESSION_TIMEOUT;
|
pConsumer->sessionTimeoutMs = DEFAULT_SESSION_TIMEOUT;
|
||||||
|
|
Loading…
Reference in New Issue