fix:[TS-5156]add user/fqdn in show consumers
This commit is contained in:
parent
d5b990e4ed
commit
9e018e8e58
|
@ -2838,6 +2838,7 @@ typedef struct {
|
||||||
char cgroup[TSDB_CGROUP_LEN];
|
char cgroup[TSDB_CGROUP_LEN];
|
||||||
char clientId[TSDB_CLIENT_ID_LEN];
|
char clientId[TSDB_CLIENT_ID_LEN];
|
||||||
char user[TSDB_USER_LEN];
|
char user[TSDB_USER_LEN];
|
||||||
|
char fqdn[TSDB_FQDN_LEN];
|
||||||
SArray* topicNames; // SArray<char**>
|
SArray* topicNames; // SArray<char**>
|
||||||
|
|
||||||
int8_t withTbName;
|
int8_t withTbName;
|
||||||
|
@ -2872,6 +2873,7 @@ static FORCE_INLINE int32_t tSerializeSCMSubscribeReq(void** buf, const SCMSubsc
|
||||||
tlen += taosEncodeFixedI32(buf, pReq->sessionTimeoutMs);
|
tlen += taosEncodeFixedI32(buf, pReq->sessionTimeoutMs);
|
||||||
tlen += taosEncodeFixedI32(buf, pReq->maxPollIntervalMs);
|
tlen += taosEncodeFixedI32(buf, pReq->maxPollIntervalMs);
|
||||||
tlen += taosEncodeString(buf, pReq->user);
|
tlen += taosEncodeString(buf, pReq->user);
|
||||||
|
tlen += taosEncodeString(buf, pReq->fqdn);
|
||||||
|
|
||||||
return tlen;
|
return tlen;
|
||||||
}
|
}
|
||||||
|
@ -2907,6 +2909,7 @@ static FORCE_INLINE int32_t tDeserializeSCMSubscribeReq(void* buf, SCMSubscribeR
|
||||||
buf = taosDecodeFixedI32(buf, &pReq->sessionTimeoutMs);
|
buf = taosDecodeFixedI32(buf, &pReq->sessionTimeoutMs);
|
||||||
buf = taosDecodeFixedI32(buf, &pReq->maxPollIntervalMs);
|
buf = taosDecodeFixedI32(buf, &pReq->maxPollIntervalMs);
|
||||||
buf = taosDecodeStringTo(buf, pReq->user);
|
buf = taosDecodeStringTo(buf, pReq->user);
|
||||||
|
buf = taosDecodeStringTo(buf, pReq->fqdn);
|
||||||
} else {
|
} else {
|
||||||
pReq->sessionTimeoutMs = DEFAULT_SESSION_TIMEOUT;
|
pReq->sessionTimeoutMs = DEFAULT_SESSION_TIMEOUT;
|
||||||
pReq->maxPollIntervalMs = DEFAULT_MAX_POLL_INTERVAL;
|
pReq->maxPollIntervalMs = DEFAULT_MAX_POLL_INTERVAL;
|
||||||
|
|
|
@ -83,6 +83,7 @@ struct tmq_t {
|
||||||
char groupId[TSDB_CGROUP_LEN];
|
char groupId[TSDB_CGROUP_LEN];
|
||||||
char clientId[TSDB_CLIENT_ID_LEN];
|
char clientId[TSDB_CLIENT_ID_LEN];
|
||||||
char user[TSDB_USER_LEN];
|
char user[TSDB_USER_LEN];
|
||||||
|
char fqdn[TSDB_FQDN_LEN];
|
||||||
int8_t withTbName;
|
int8_t withTbName;
|
||||||
int8_t useSnapshot;
|
int8_t useSnapshot;
|
||||||
int8_t autoCommit;
|
int8_t autoCommit;
|
||||||
|
@ -1267,6 +1268,9 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
|
||||||
pTmq->sourceExcluded = conf->sourceExcluded;
|
pTmq->sourceExcluded = conf->sourceExcluded;
|
||||||
pTmq->enableBatchMeta = conf->enableBatchMeta;
|
pTmq->enableBatchMeta = conf->enableBatchMeta;
|
||||||
tstrncpy(pTmq->user, user, TSDB_USER_LEN);
|
tstrncpy(pTmq->user, user, TSDB_USER_LEN);
|
||||||
|
if (taosGetFqdn(pTmq->fqdn) != 0) {
|
||||||
|
(void)strcpy(pTmq->fqdn, "localhost");
|
||||||
|
}
|
||||||
if (conf->replayEnable) {
|
if (conf->replayEnable) {
|
||||||
pTmq->autoCommit = false;
|
pTmq->autoCommit = false;
|
||||||
}
|
}
|
||||||
|
@ -1335,6 +1339,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
|
||||||
tstrncpy(req.clientId, tmq->clientId, TSDB_CLIENT_ID_LEN);
|
tstrncpy(req.clientId, tmq->clientId, TSDB_CLIENT_ID_LEN);
|
||||||
tstrncpy(req.cgroup, tmq->groupId, TSDB_CGROUP_LEN);
|
tstrncpy(req.cgroup, tmq->groupId, TSDB_CGROUP_LEN);
|
||||||
tstrncpy(req.user, tmq->user, TSDB_USER_LEN);
|
tstrncpy(req.user, tmq->user, TSDB_USER_LEN);
|
||||||
|
tstrncpy(req.fqdn, tmq->fqdn, TSDB_FQDN_LEN);
|
||||||
|
|
||||||
req.topicNames = taosArrayInit(sz, sizeof(void*));
|
req.topicNames = taosArrayInit(sz, sizeof(void*));
|
||||||
if (req.topicNames == NULL) {
|
if (req.topicNames == NULL) {
|
||||||
|
|
|
@ -344,7 +344,9 @@ static const SSysDbTableSchema subscriptionSchema[] = {
|
||||||
{.name = "topic_name", .bytes = TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
|
{.name = "topic_name", .bytes = TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
|
||||||
{.name = "consumer_group", .bytes = TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
|
{.name = "consumer_group", .bytes = TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
|
||||||
{.name = "vgroup_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false},
|
{.name = "vgroup_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false},
|
||||||
{.name = "consumer_id", .bytes = 32, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
|
{.name = "consumer_id", .bytes = TSDB_CLIENT_ID_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
|
||||||
|
{.name = "user", .bytes = TSDB_USER_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
|
||||||
|
{.name = "fqdn", .bytes = TSDB_FQDN_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
|
||||||
{.name = "offset", .bytes = TSDB_OFFSET_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
|
{.name = "offset", .bytes = TSDB_OFFSET_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
|
||||||
{.name = "rows", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false},
|
{.name = "rows", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false},
|
||||||
};
|
};
|
||||||
|
@ -480,11 +482,12 @@ static const SSysDbTableSchema connectionsSchema[] = {
|
||||||
{.name = "last_access", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false},
|
{.name = "last_access", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
static const SSysDbTableSchema consumerSchema[] = {
|
static const SSysDbTableSchema consumerSchema[] = {
|
||||||
{.name = "consumer_id", .bytes = TSDB_CONSUMER_ID_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
|
{.name = "consumer_id", .bytes = TSDB_CONSUMER_ID_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
|
||||||
{.name = "consumer_group", .bytes = TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
|
{.name = "consumer_group", .bytes = TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
|
||||||
{.name = "client_id", .bytes = TSDB_CLIENT_ID_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
|
{.name = "client_id", .bytes = TSDB_CLIENT_ID_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
|
||||||
|
{.name = "user", .bytes = TSDB_USER_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
|
||||||
|
{.name = "fqdn", .bytes = TSDB_FQDN_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
|
||||||
{.name = "status", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
|
{.name = "status", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
|
||||||
{.name = "topics", .bytes = TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
|
{.name = "topics", .bytes = TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
|
||||||
/*{.name = "end_point", .bytes = TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},*/
|
/*{.name = "end_point", .bytes = TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},*/
|
||||||
|
|
|
@ -598,6 +598,7 @@ typedef struct {
|
||||||
char cgroup[TSDB_CGROUP_LEN];
|
char cgroup[TSDB_CGROUP_LEN];
|
||||||
char clientId[TSDB_CLIENT_ID_LEN];
|
char clientId[TSDB_CLIENT_ID_LEN];
|
||||||
char user[TSDB_USER_LEN];
|
char user[TSDB_USER_LEN];
|
||||||
|
char fqdn[TSDB_FQDN_LEN];
|
||||||
int8_t updateType; // used only for update
|
int8_t updateType; // used only for update
|
||||||
int32_t epoch;
|
int32_t epoch;
|
||||||
int32_t status;
|
int32_t status;
|
||||||
|
|
|
@ -911,6 +911,14 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *
|
||||||
MND_TMQ_NULL_CHECK(pColInfo);
|
MND_TMQ_NULL_CHECK(pColInfo);
|
||||||
MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)user, false));
|
MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)user, false));
|
||||||
|
|
||||||
|
// fqdn
|
||||||
|
char fqdn[TSDB_FQDN_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||||
|
STR_TO_VARSTR(fqdn, pConsumer->fqdn);
|
||||||
|
|
||||||
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
MND_TMQ_NULL_CHECK(pColInfo);
|
||||||
|
MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)fqdn, false));
|
||||||
|
|
||||||
// status
|
// status
|
||||||
const char *pStatusName = mndConsumerStatusName(pConsumer->status);
|
const char *pStatusName = mndConsumerStatusName(pConsumer->status);
|
||||||
status = taosMemoryCalloc(1, pShow->pMeta->pSchemas[cols].bytes);
|
status = taosMemoryCalloc(1, pShow->pMeta->pSchemas[cols].bytes);
|
||||||
|
|
|
@ -325,6 +325,8 @@ int32_t tNewSMqConsumerObj(int64_t consumerId, char *cgroup, int8_t updateType,
|
||||||
pConsumer->resetOffsetCfg = subscribe->resetOffsetCfg;
|
pConsumer->resetOffsetCfg = subscribe->resetOffsetCfg;
|
||||||
pConsumer->maxPollIntervalMs = subscribe->maxPollIntervalMs;
|
pConsumer->maxPollIntervalMs = subscribe->maxPollIntervalMs;
|
||||||
pConsumer->sessionTimeoutMs = subscribe->sessionTimeoutMs;
|
pConsumer->sessionTimeoutMs = subscribe->sessionTimeoutMs;
|
||||||
|
tstrncpy(pConsumer->user, subscribe->user, TSDB_USER_LEN);
|
||||||
|
tstrncpy(pConsumer->fqdn, subscribe->fqdn, TSDB_FQDN_LEN);
|
||||||
|
|
||||||
pConsumer->rebNewTopics = taosArrayDup(subscribe->topicNames, topicNameDup);
|
pConsumer->rebNewTopics = taosArrayDup(subscribe->topicNames, topicNameDup);
|
||||||
if (pConsumer->rebNewTopics == NULL){
|
if (pConsumer->rebNewTopics == NULL){
|
||||||
|
@ -430,6 +432,7 @@ int32_t tEncodeSMqConsumerObj(void **buf, const SMqConsumerObj *pConsumer) {
|
||||||
tlen += taosEncodeFixedI32(buf, pConsumer->maxPollIntervalMs);
|
tlen += taosEncodeFixedI32(buf, pConsumer->maxPollIntervalMs);
|
||||||
tlen += taosEncodeFixedI32(buf, pConsumer->sessionTimeoutMs);
|
tlen += taosEncodeFixedI32(buf, pConsumer->sessionTimeoutMs);
|
||||||
tlen += taosEncodeString(buf, pConsumer->user);
|
tlen += taosEncodeString(buf, pConsumer->user);
|
||||||
|
tlen += taosEncodeString(buf, pConsumer->fqdn);
|
||||||
return tlen;
|
return tlen;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -505,6 +508,7 @@ void *tDecodeSMqConsumerObj(const void *buf, SMqConsumerObj *pConsumer, int8_t s
|
||||||
buf = taosDecodeFixedI32(buf, &pConsumer->maxPollIntervalMs);
|
buf = taosDecodeFixedI32(buf, &pConsumer->maxPollIntervalMs);
|
||||||
buf = taosDecodeFixedI32(buf, &pConsumer->sessionTimeoutMs);
|
buf = taosDecodeFixedI32(buf, &pConsumer->sessionTimeoutMs);
|
||||||
buf = taosDecodeStringTo(buf, pConsumer->user);
|
buf = taosDecodeStringTo(buf, pConsumer->user);
|
||||||
|
buf = taosDecodeStringTo(buf, pConsumer->fqdn);
|
||||||
} else{
|
} else{
|
||||||
pConsumer->maxPollIntervalMs = DEFAULT_MAX_POLL_INTERVAL;
|
pConsumer->maxPollIntervalMs = DEFAULT_MAX_POLL_INTERVAL;
|
||||||
pConsumer->sessionTimeoutMs = DEFAULT_SESSION_TIMEOUT;
|
pConsumer->sessionTimeoutMs = DEFAULT_SESSION_TIMEOUT;
|
||||||
|
|
|
@ -1330,8 +1330,8 @@ END:
|
||||||
TAOS_RETURN(code);
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t buildResult(SSDataBlock *pBlock, int32_t *numOfRows, int64_t consumerId, const char *topic,
|
static int32_t buildResult(SSDataBlock *pBlock, int32_t *numOfRows, int64_t consumerId, const char* user, const char* fqdn,
|
||||||
const char *cgroup, SArray *vgs, SArray *offsetRows) {
|
const char *topic, const char *cgroup, SArray *vgs, SArray *offsetRows) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t sz = taosArrayGetSize(vgs);
|
int32_t sz = taosArrayGetSize(vgs);
|
||||||
for (int32_t j = 0; j < sz; j++) {
|
for (int32_t j = 0; j < sz; j++) {
|
||||||
|
@ -1355,7 +1355,7 @@ static int32_t buildResult(SSDataBlock *pBlock, int32_t *numOfRows, int64_t cons
|
||||||
MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)&pVgEp->vgId, false));
|
MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)&pVgEp->vgId, false));
|
||||||
|
|
||||||
// consumer id
|
// consumer id
|
||||||
char consumerIdHex[32] = {0};
|
char consumerIdHex[TSDB_CONSUMER_ID_LEN] = {0};
|
||||||
(void)sprintf(varDataVal(consumerIdHex), "0x%" PRIx64, consumerId);
|
(void)sprintf(varDataVal(consumerIdHex), "0x%" PRIx64, consumerId);
|
||||||
varDataSetLen(consumerIdHex, strlen(varDataVal(consumerIdHex)));
|
varDataSetLen(consumerIdHex, strlen(varDataVal(consumerIdHex)));
|
||||||
|
|
||||||
|
@ -1363,6 +1363,18 @@ static int32_t buildResult(SSDataBlock *pBlock, int32_t *numOfRows, int64_t cons
|
||||||
MND_TMQ_NULL_CHECK(pColInfo);
|
MND_TMQ_NULL_CHECK(pColInfo);
|
||||||
MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)consumerIdHex, consumerId == -1));
|
MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)consumerIdHex, consumerId == -1));
|
||||||
|
|
||||||
|
char userStr[TSDB_USER_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||||
|
if (user) STR_TO_VARSTR(userStr, user);
|
||||||
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
MND_TMQ_NULL_CHECK(pColInfo);
|
||||||
|
MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, userStr, user == NULL));
|
||||||
|
|
||||||
|
char fqdnStr[TSDB_FQDN_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||||
|
if (fqdn) STR_TO_VARSTR(fqdnStr, fqdn);
|
||||||
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
MND_TMQ_NULL_CHECK(pColInfo);
|
||||||
|
MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, fqdnStr, fqdn == NULL));
|
||||||
|
|
||||||
mInfo("mnd show subscriptions: topic %s, consumer:0x%" PRIx64 " cgroup %s vgid %d", varDataVal(topic), consumerId,
|
mInfo("mnd show subscriptions: topic %s, consumer:0x%" PRIx64 " cgroup %s vgid %d", varDataVal(topic), consumerId,
|
||||||
varDataVal(cgroup), pVgEp->vgId);
|
varDataVal(cgroup), pVgEp->vgId);
|
||||||
|
|
||||||
|
@ -1435,16 +1447,25 @@ int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock
|
||||||
|
|
||||||
SMqConsumerEp *pConsumerEp = NULL;
|
SMqConsumerEp *pConsumerEp = NULL;
|
||||||
void *pIter = NULL;
|
void *pIter = NULL;
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
pIter = taosHashIterate(pSub->consumerHash, pIter);
|
pIter = taosHashIterate(pSub->consumerHash, pIter);
|
||||||
if (pIter == NULL) break;
|
if (pIter == NULL) break;
|
||||||
pConsumerEp = (SMqConsumerEp *)pIter;
|
pConsumerEp = (SMqConsumerEp *)pIter;
|
||||||
|
|
||||||
MND_TMQ_RETURN_CHECK(buildResult(pBlock, &numOfRows, pConsumerEp->consumerId, topic, cgroup, pConsumerEp->vgs,
|
char *user = NULL;
|
||||||
|
char *fqdn = NULL;
|
||||||
|
SMqConsumerObj *pConsumer = sdbAcquire(pSdb, SDB_CONSUMER, &pConsumerEp->consumerId);
|
||||||
|
if (pConsumer != NULL) {
|
||||||
|
user = pConsumer->user;
|
||||||
|
fqdn = pConsumer->fqdn;
|
||||||
|
sdbRelease(pSdb, pConsumer);
|
||||||
|
}
|
||||||
|
MND_TMQ_RETURN_CHECK(buildResult(pBlock, &numOfRows, pConsumerEp->consumerId, user, fqdn, topic, cgroup, pConsumerEp->vgs,
|
||||||
pConsumerEp->offsetRows));
|
pConsumerEp->offsetRows));
|
||||||
}
|
}
|
||||||
|
|
||||||
MND_TMQ_RETURN_CHECK(buildResult(pBlock, &numOfRows, -1, topic, cgroup, pSub->unassignedVgs, pSub->offsetRows));
|
MND_TMQ_RETURN_CHECK(buildResult(pBlock, &numOfRows, -1, NULL, NULL, topic, cgroup, pSub->unassignedVgs, pSub->offsetRows));
|
||||||
|
|
||||||
pBlock->info.rows = numOfRows;
|
pBlock->info.rows = numOfRows;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue