From d5b990e4ed2e77eaf23b2a865b6716d4b901799a Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Thu, 8 Aug 2024 16:38:11 +0800 Subject: [PATCH 1/3] fix:[TS-5156]add user in show consumers --- include/common/tmsg.h | 3 +++ source/client/src/clientTmq.c | 3 +++ source/dnode/mnode/impl/inc/mndDef.h | 1 + source/dnode/mnode/impl/src/mndConsumer.c | 8 ++++++++ source/dnode/mnode/impl/src/mndDef.c | 2 ++ 5 files changed, 17 insertions(+) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 70cf9c8b58..63fe4271d9 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2837,6 +2837,7 @@ typedef struct { int64_t consumerId; char cgroup[TSDB_CGROUP_LEN]; char clientId[TSDB_CLIENT_ID_LEN]; + char user[TSDB_USER_LEN]; SArray* topicNames; // SArray int8_t withTbName; @@ -2870,6 +2871,7 @@ static FORCE_INLINE int32_t tSerializeSCMSubscribeReq(void** buf, const SCMSubsc tlen += taosEncodeFixedI8(buf, pReq->enableBatchMeta); tlen += taosEncodeFixedI32(buf, pReq->sessionTimeoutMs); tlen += taosEncodeFixedI32(buf, pReq->maxPollIntervalMs); + tlen += taosEncodeString(buf, pReq->user); return tlen; } @@ -2904,6 +2906,7 @@ static FORCE_INLINE int32_t tDeserializeSCMSubscribeReq(void* buf, SCMSubscribeR if ((char*)buf - (char*)start < len) { buf = taosDecodeFixedI32(buf, &pReq->sessionTimeoutMs); buf = taosDecodeFixedI32(buf, &pReq->maxPollIntervalMs); + buf = taosDecodeStringTo(buf, pReq->user); } else { pReq->sessionTimeoutMs = DEFAULT_SESSION_TIMEOUT; pReq->maxPollIntervalMs = DEFAULT_MAX_POLL_INTERVAL; diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 197a65add8..61037c2e68 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -82,6 +82,7 @@ struct tmq_t { int64_t refId; char groupId[TSDB_CGROUP_LEN]; char clientId[TSDB_CLIENT_ID_LEN]; + char user[TSDB_USER_LEN]; int8_t withTbName; int8_t useSnapshot; int8_t autoCommit; @@ -1265,6 +1266,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { pTmq->replayEnable = conf->replayEnable; pTmq->sourceExcluded = conf->sourceExcluded; pTmq->enableBatchMeta = conf->enableBatchMeta; + tstrncpy(pTmq->user, user, TSDB_USER_LEN); if (conf->replayEnable) { pTmq->autoCommit = false; } @@ -1332,6 +1334,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { req.consumerId = tmq->consumerId; tstrncpy(req.clientId, tmq->clientId, TSDB_CLIENT_ID_LEN); tstrncpy(req.cgroup, tmq->groupId, TSDB_CGROUP_LEN); + tstrncpy(req.user, tmq->user, TSDB_USER_LEN); req.topicNames = taosArrayInit(sz, sizeof(void*)); if (req.topicNames == NULL) { diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 62e77867f6..0505f604a2 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -597,6 +597,7 @@ typedef struct { int64_t consumerId; char cgroup[TSDB_CGROUP_LEN]; char clientId[TSDB_CLIENT_ID_LEN]; + char user[TSDB_USER_LEN]; int8_t updateType; // used only for update int32_t epoch; int32_t status; diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 6116d2da19..5f3f794ef9 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -903,6 +903,14 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock * MND_TMQ_NULL_CHECK(pColInfo); MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)clientId, false)); + // user + char user[TSDB_USER_LEN + VARSTR_HEADER_SIZE] = {0}; + STR_TO_VARSTR(user, pConsumer->user); + + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + MND_TMQ_NULL_CHECK(pColInfo); + MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)user, false)); + // status const char *pStatusName = mndConsumerStatusName(pConsumer->status); status = taosMemoryCalloc(1, pShow->pMeta->pSchemas[cols].bytes); diff --git a/source/dnode/mnode/impl/src/mndDef.c b/source/dnode/mnode/impl/src/mndDef.c index 695bf4d30d..c7de16d824 100644 --- a/source/dnode/mnode/impl/src/mndDef.c +++ b/source/dnode/mnode/impl/src/mndDef.c @@ -429,6 +429,7 @@ int32_t tEncodeSMqConsumerObj(void **buf, const SMqConsumerObj *pConsumer) { tlen += taosEncodeFixedI32(buf, pConsumer->resetOffsetCfg); tlen += taosEncodeFixedI32(buf, pConsumer->maxPollIntervalMs); tlen += taosEncodeFixedI32(buf, pConsumer->sessionTimeoutMs); + tlen += taosEncodeString(buf, pConsumer->user); return tlen; } @@ -503,6 +504,7 @@ void *tDecodeSMqConsumerObj(const void *buf, SMqConsumerObj *pConsumer, int8_t s if (sver > 2){ buf = taosDecodeFixedI32(buf, &pConsumer->maxPollIntervalMs); buf = taosDecodeFixedI32(buf, &pConsumer->sessionTimeoutMs); + buf = taosDecodeStringTo(buf, pConsumer->user); } else{ pConsumer->maxPollIntervalMs = DEFAULT_MAX_POLL_INTERVAL; pConsumer->sessionTimeoutMs = DEFAULT_SESSION_TIMEOUT; From 9e018e8e58d6ce05db6bb3443ae487017e6b9fa7 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 9 Aug 2024 14:28:58 +0800 Subject: [PATCH 2/3] fix:[TS-5156]add user/fqdn in show consumers --- include/common/tmsg.h | 3 +++ source/client/src/clientTmq.c | 5 ++++ source/common/src/systable.c | 7 +++-- source/dnode/mnode/impl/inc/mndDef.h | 1 + source/dnode/mnode/impl/src/mndConsumer.c | 8 ++++++ source/dnode/mnode/impl/src/mndDef.c | 4 +++ source/dnode/mnode/impl/src/mndSubscribe.c | 31 ++++++++++++++++++---- 7 files changed, 52 insertions(+), 7 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 63fe4271d9..736267c3b9 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2838,6 +2838,7 @@ typedef struct { char cgroup[TSDB_CGROUP_LEN]; char clientId[TSDB_CLIENT_ID_LEN]; char user[TSDB_USER_LEN]; + char fqdn[TSDB_FQDN_LEN]; SArray* topicNames; // SArray int8_t withTbName; @@ -2872,6 +2873,7 @@ static FORCE_INLINE int32_t tSerializeSCMSubscribeReq(void** buf, const SCMSubsc tlen += taosEncodeFixedI32(buf, pReq->sessionTimeoutMs); tlen += taosEncodeFixedI32(buf, pReq->maxPollIntervalMs); tlen += taosEncodeString(buf, pReq->user); + tlen += taosEncodeString(buf, pReq->fqdn); return tlen; } @@ -2907,6 +2909,7 @@ static FORCE_INLINE int32_t tDeserializeSCMSubscribeReq(void* buf, SCMSubscribeR buf = taosDecodeFixedI32(buf, &pReq->sessionTimeoutMs); buf = taosDecodeFixedI32(buf, &pReq->maxPollIntervalMs); buf = taosDecodeStringTo(buf, pReq->user); + buf = taosDecodeStringTo(buf, pReq->fqdn); } else { pReq->sessionTimeoutMs = DEFAULT_SESSION_TIMEOUT; pReq->maxPollIntervalMs = DEFAULT_MAX_POLL_INTERVAL; diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 61037c2e68..a69af05900 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -83,6 +83,7 @@ struct tmq_t { char groupId[TSDB_CGROUP_LEN]; char clientId[TSDB_CLIENT_ID_LEN]; char user[TSDB_USER_LEN]; + char fqdn[TSDB_FQDN_LEN]; int8_t withTbName; int8_t useSnapshot; int8_t autoCommit; @@ -1267,6 +1268,9 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { pTmq->sourceExcluded = conf->sourceExcluded; pTmq->enableBatchMeta = conf->enableBatchMeta; tstrncpy(pTmq->user, user, TSDB_USER_LEN); + if (taosGetFqdn(pTmq->fqdn) != 0) { + (void)strcpy(pTmq->fqdn, "localhost"); + } if (conf->replayEnable) { pTmq->autoCommit = false; } @@ -1335,6 +1339,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { tstrncpy(req.clientId, tmq->clientId, TSDB_CLIENT_ID_LEN); tstrncpy(req.cgroup, tmq->groupId, TSDB_CGROUP_LEN); tstrncpy(req.user, tmq->user, TSDB_USER_LEN); + tstrncpy(req.fqdn, tmq->fqdn, TSDB_FQDN_LEN); req.topicNames = taosArrayInit(sz, sizeof(void*)); if (req.topicNames == NULL) { diff --git a/source/common/src/systable.c b/source/common/src/systable.c index 2d69a687a6..3f27ab2b2b 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -344,7 +344,9 @@ static const SSysDbTableSchema subscriptionSchema[] = { {.name = "topic_name", .bytes = TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, {.name = "consumer_group", .bytes = TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, {.name = "vgroup_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false}, - {.name = "consumer_id", .bytes = 32, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, + {.name = "consumer_id", .bytes = TSDB_CLIENT_ID_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, + {.name = "user", .bytes = TSDB_USER_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, + {.name = "fqdn", .bytes = TSDB_FQDN_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, {.name = "offset", .bytes = TSDB_OFFSET_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, {.name = "rows", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false}, }; @@ -480,11 +482,12 @@ static const SSysDbTableSchema connectionsSchema[] = { {.name = "last_access", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false}, }; - static const SSysDbTableSchema consumerSchema[] = { {.name = "consumer_id", .bytes = TSDB_CONSUMER_ID_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, {.name = "consumer_group", .bytes = TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, {.name = "client_id", .bytes = TSDB_CLIENT_ID_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, + {.name = "user", .bytes = TSDB_USER_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, + {.name = "fqdn", .bytes = TSDB_FQDN_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, {.name = "status", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, {.name = "topics", .bytes = TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, /*{.name = "end_point", .bytes = TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},*/ diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 0505f604a2..99e59662ac 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -598,6 +598,7 @@ typedef struct { char cgroup[TSDB_CGROUP_LEN]; char clientId[TSDB_CLIENT_ID_LEN]; char user[TSDB_USER_LEN]; + char fqdn[TSDB_FQDN_LEN]; int8_t updateType; // used only for update int32_t epoch; int32_t status; diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 5f3f794ef9..37eb899ed5 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -911,6 +911,14 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock * MND_TMQ_NULL_CHECK(pColInfo); MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)user, false)); + // fqdn + char fqdn[TSDB_FQDN_LEN + VARSTR_HEADER_SIZE] = {0}; + STR_TO_VARSTR(fqdn, pConsumer->fqdn); + + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + MND_TMQ_NULL_CHECK(pColInfo); + MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)fqdn, false)); + // status const char *pStatusName = mndConsumerStatusName(pConsumer->status); status = taosMemoryCalloc(1, pShow->pMeta->pSchemas[cols].bytes); diff --git a/source/dnode/mnode/impl/src/mndDef.c b/source/dnode/mnode/impl/src/mndDef.c index c7de16d824..c604e58588 100644 --- a/source/dnode/mnode/impl/src/mndDef.c +++ b/source/dnode/mnode/impl/src/mndDef.c @@ -325,6 +325,8 @@ int32_t tNewSMqConsumerObj(int64_t consumerId, char *cgroup, int8_t updateType, pConsumer->resetOffsetCfg = subscribe->resetOffsetCfg; pConsumer->maxPollIntervalMs = subscribe->maxPollIntervalMs; pConsumer->sessionTimeoutMs = subscribe->sessionTimeoutMs; + tstrncpy(pConsumer->user, subscribe->user, TSDB_USER_LEN); + tstrncpy(pConsumer->fqdn, subscribe->fqdn, TSDB_FQDN_LEN); pConsumer->rebNewTopics = taosArrayDup(subscribe->topicNames, topicNameDup); if (pConsumer->rebNewTopics == NULL){ @@ -430,6 +432,7 @@ int32_t tEncodeSMqConsumerObj(void **buf, const SMqConsumerObj *pConsumer) { tlen += taosEncodeFixedI32(buf, pConsumer->maxPollIntervalMs); tlen += taosEncodeFixedI32(buf, pConsumer->sessionTimeoutMs); tlen += taosEncodeString(buf, pConsumer->user); + tlen += taosEncodeString(buf, pConsumer->fqdn); return tlen; } @@ -505,6 +508,7 @@ void *tDecodeSMqConsumerObj(const void *buf, SMqConsumerObj *pConsumer, int8_t s buf = taosDecodeFixedI32(buf, &pConsumer->maxPollIntervalMs); buf = taosDecodeFixedI32(buf, &pConsumer->sessionTimeoutMs); buf = taosDecodeStringTo(buf, pConsumer->user); + buf = taosDecodeStringTo(buf, pConsumer->fqdn); } else{ pConsumer->maxPollIntervalMs = DEFAULT_MAX_POLL_INTERVAL; pConsumer->sessionTimeoutMs = DEFAULT_SESSION_TIMEOUT; diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index bff313dbaf..db5bb2eacd 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -1330,8 +1330,8 @@ END: TAOS_RETURN(code); } -static int32_t buildResult(SSDataBlock *pBlock, int32_t *numOfRows, int64_t consumerId, const char *topic, - const char *cgroup, SArray *vgs, SArray *offsetRows) { +static int32_t buildResult(SSDataBlock *pBlock, int32_t *numOfRows, int64_t consumerId, const char* user, const char* fqdn, + const char *topic, const char *cgroup, SArray *vgs, SArray *offsetRows) { int32_t code = 0; int32_t sz = taosArrayGetSize(vgs); for (int32_t j = 0; j < sz; j++) { @@ -1355,7 +1355,7 @@ static int32_t buildResult(SSDataBlock *pBlock, int32_t *numOfRows, int64_t cons MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)&pVgEp->vgId, false)); // consumer id - char consumerIdHex[32] = {0}; + char consumerIdHex[TSDB_CONSUMER_ID_LEN] = {0}; (void)sprintf(varDataVal(consumerIdHex), "0x%" PRIx64, consumerId); varDataSetLen(consumerIdHex, strlen(varDataVal(consumerIdHex))); @@ -1363,6 +1363,18 @@ static int32_t buildResult(SSDataBlock *pBlock, int32_t *numOfRows, int64_t cons MND_TMQ_NULL_CHECK(pColInfo); MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)consumerIdHex, consumerId == -1)); + char userStr[TSDB_USER_LEN + VARSTR_HEADER_SIZE] = {0}; + if (user) STR_TO_VARSTR(userStr, user); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + MND_TMQ_NULL_CHECK(pColInfo); + MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, userStr, user == NULL)); + + char fqdnStr[TSDB_FQDN_LEN + VARSTR_HEADER_SIZE] = {0}; + if (fqdn) STR_TO_VARSTR(fqdnStr, fqdn); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + MND_TMQ_NULL_CHECK(pColInfo); + MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, fqdnStr, fqdn == NULL)); + mInfo("mnd show subscriptions: topic %s, consumer:0x%" PRIx64 " cgroup %s vgid %d", varDataVal(topic), consumerId, varDataVal(cgroup), pVgEp->vgId); @@ -1435,16 +1447,25 @@ int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock SMqConsumerEp *pConsumerEp = NULL; void *pIter = NULL; + while (1) { pIter = taosHashIterate(pSub->consumerHash, pIter); if (pIter == NULL) break; pConsumerEp = (SMqConsumerEp *)pIter; - MND_TMQ_RETURN_CHECK(buildResult(pBlock, &numOfRows, pConsumerEp->consumerId, topic, cgroup, pConsumerEp->vgs, + char *user = NULL; + char *fqdn = NULL; + SMqConsumerObj *pConsumer = sdbAcquire(pSdb, SDB_CONSUMER, &pConsumerEp->consumerId); + if (pConsumer != NULL) { + user = pConsumer->user; + fqdn = pConsumer->fqdn; + sdbRelease(pSdb, pConsumer); + } + MND_TMQ_RETURN_CHECK(buildResult(pBlock, &numOfRows, pConsumerEp->consumerId, user, fqdn, topic, cgroup, pConsumerEp->vgs, pConsumerEp->offsetRows)); } - MND_TMQ_RETURN_CHECK(buildResult(pBlock, &numOfRows, -1, topic, cgroup, pSub->unassignedVgs, pSub->offsetRows)); + MND_TMQ_RETURN_CHECK(buildResult(pBlock, &numOfRows, -1, NULL, NULL, topic, cgroup, pSub->unassignedVgs, pSub->offsetRows)); pBlock->info.rows = numOfRows; From 78f991b2c9db9aac551a4b0cbde767f6990da936 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 9 Aug 2024 16:56:35 +0800 Subject: [PATCH 3/3] fix:[TS-5156]case error --- tests/system-test/0-others/information_schema.py | 4 ++-- tests/system-test/7-tmq/tmq_primary_key.py | 10 +++++----- tests/system-test/7-tmq/tmq_taosx.py | 6 +++--- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/tests/system-test/0-others/information_schema.py b/tests/system-test/0-others/information_schema.py index d7a5540544..616cd034ab 100644 --- a/tests/system-test/0-others/information_schema.py +++ b/tests/system-test/0-others/information_schema.py @@ -222,10 +222,10 @@ class TDTestCase: tdSql.query("select * from information_schema.ins_columns where db_name ='information_schema'") tdLog.info(len(tdSql.queryResult)) - tdSql.checkEqual(True, len(tdSql.queryResult) in range(261, 269)) + tdSql.checkEqual(True, len(tdSql.queryResult) in range(261, 271)) tdSql.query("select * from information_schema.ins_columns where db_name ='performance_schema'") - tdSql.checkEqual(54, len(tdSql.queryResult)) + tdSql.checkEqual(56, len(tdSql.queryResult)) def ins_dnodes_check(self): tdSql.execute('drop database if exists db2') diff --git a/tests/system-test/7-tmq/tmq_primary_key.py b/tests/system-test/7-tmq/tmq_primary_key.py index 80888ddbe6..13d6bd565d 100644 --- a/tests/system-test/7-tmq/tmq_primary_key.py +++ b/tests/system-test/7-tmq/tmq_primary_key.py @@ -85,7 +85,7 @@ class TDTestCase: time.sleep(4) # wait for heart beat tdSql.query(f'show subscriptions;') - sub = tdSql.getData(0, 4); + sub = tdSql.getData(0, 6); print(sub) if not sub.startswith("tsdb"): tdLog.exit(f"show subscriptions error") @@ -196,7 +196,7 @@ class TDTestCase: time.sleep(4) # wait for heart beat tdSql.query(f'show subscriptions;') - sub = tdSql.getData(0, 4); + sub = tdSql.getData(0, 6); print(sub) if not sub.startswith("tsdb"): tdLog.exit(f"show subscriptions error") @@ -306,7 +306,7 @@ class TDTestCase: time.sleep(4) # wait for heart beat tdSql.query(f'show subscriptions;') - sub = tdSql.getData(0, 4); + sub = tdSql.getData(0, 6); print(sub) if not sub.startswith("tsdb"): tdLog.exit(f"show subscriptions error") @@ -416,7 +416,7 @@ class TDTestCase: time.sleep(4) # wait for heart beat tdSql.query(f'show subscriptions;') - sub = tdSql.getData(0, 4); + sub = tdSql.getData(0, 6); print(sub) if not sub.startswith("tsdb"): tdLog.exit(f"show subscriptions error") @@ -517,7 +517,7 @@ class TDTestCase: consumer.close() tdSql.query(f'show subscriptions;') - sub = tdSql.getData(0, 4); + sub = tdSql.getData(0, 6); print(sub) if not sub.startswith("tsdb"): tdLog.exit(f"show subscriptions error") diff --git a/tests/system-test/7-tmq/tmq_taosx.py b/tests/system-test/7-tmq/tmq_taosx.py index d0e682cffb..4e90aefe7c 100644 --- a/tests/system-test/7-tmq/tmq_taosx.py +++ b/tests/system-test/7-tmq/tmq_taosx.py @@ -598,12 +598,12 @@ class TDTestCase: tdSql.query(f'show consumers') tdSql.checkRows(1) tdSql.checkData(0, 1, 'g1') - tdSql.checkData(0, 4, 't2') + tdSql.checkData(0, 6, 't2') tdSql.execute(f'drop consumer group g1 on t1') tdSql.query(f'show consumers') tdSql.checkRows(1) tdSql.checkData(0, 1, 'g1') - tdSql.checkData(0, 4, 't2') + tdSql.checkData(0, 6, 't2') tdSql.query(f'show subscriptions') tdSql.checkRows(1) @@ -641,7 +641,7 @@ class TDTestCase: tdSql.query(f'show consumers') tdSql.checkRows(1) tdSql.checkData(0, 1, 'g1') - tdSql.checkData(0, 4, 't2') + tdSql.checkData(0, 6, 't2') tdSql.execute(f'insert into t4 using st tags(3) values(now, 1)') try: