Merge branch '3.0' of https://github.com/taosdata/TDengine into fix/TD-31459

This commit is contained in:
54liuyao 2024-08-15 14:28:40 +08:00
commit fa43095d37
13 changed files with 83 additions and 21 deletions

View File

@ -146,7 +146,7 @@ Note: 从 3.0.1.7 开始,只提供 TDengine 客户端的 Windows 客户端的
</Tabs>
:::info
下载其他组件、最新 Beta 版及之前版本的安装包,请点击[发布历史页面](../../releases/tdengine)。
下载其他组件、最新 Beta 版及之前版本的安装包,请点击[发布历史页面](https://docs.taosdata.com/releases/tdengine/)。
:::
:::note

View File

@ -99,7 +99,7 @@ TDengine 提供了丰富的应用程序开发接口,为了便于用户快速
- **安装前准备**
- 安装 Python。新近版本 taospy 包要求 Python 3.6.2+。早期版本 taospy 包要求 Python 3.7+。taos-ws-py 包要求 Python 3.7+。如果系统上还没有 Python 可参考 [Python BeginnersGuide](https://wiki.python.org/moin/BeginnersGuide/Download) 安装。
- 安装 [pip](https://pypi.org/project/pip/)。大部分情况下 Python 的安装包都自带了 pip 工具, 如果没有请参考 [pip documentation](https://pip.pypa.io/en/stable/installation/) 安装。
- 如果使用原生连接,还需[安装客户端驱动](../#安装客户端驱动)。客户端软件包含了 TDengine 客户端动态链接库(libtaos.so 或 taos.dll) 和 TDengine CLI。
- 如果使用原生连接,还需[安装客户端驱动](../connect/#安装客户端驱动-taosc)。客户端软件包含了 TDengine 客户端动态链接库(libtaos.so 或 taos.dll) 和 TDengine CLI。
- **使用 pip 安装**
- 卸载旧版本

View File

@ -101,7 +101,7 @@ head 文件是时序数据存储文件data 文件)的 BRINBlock Range In
head 文件中存储了多个 BRIN 记录块及其索引。BRIN 记录块采用列存压缩的方式这种方式可以大大减少空间占用同时保持较高的查询性能。BRIN 索引结构如下图所示:
[BRIN 索引结构](./brin.png)
![BRIN 索引结构](./brin.png)
#### data 文件

View File

@ -2837,6 +2837,8 @@ typedef struct {
int64_t consumerId;
char cgroup[TSDB_CGROUP_LEN];
char clientId[TSDB_CLIENT_ID_LEN];
char user[TSDB_USER_LEN];
char fqdn[TSDB_FQDN_LEN];
SArray* topicNames; // SArray<char**>
int8_t withTbName;
@ -2870,6 +2872,8 @@ static FORCE_INLINE int32_t tSerializeSCMSubscribeReq(void** buf, const SCMSubsc
tlen += taosEncodeFixedI8(buf, pReq->enableBatchMeta);
tlen += taosEncodeFixedI32(buf, pReq->sessionTimeoutMs);
tlen += taosEncodeFixedI32(buf, pReq->maxPollIntervalMs);
tlen += taosEncodeString(buf, pReq->user);
tlen += taosEncodeString(buf, pReq->fqdn);
return tlen;
}
@ -2904,6 +2908,8 @@ static FORCE_INLINE int32_t tDeserializeSCMSubscribeReq(void* buf, SCMSubscribeR
if ((char*)buf - (char*)start < len) {
buf = taosDecodeFixedI32(buf, &pReq->sessionTimeoutMs);
buf = taosDecodeFixedI32(buf, &pReq->maxPollIntervalMs);
buf = taosDecodeStringTo(buf, pReq->user);
buf = taosDecodeStringTo(buf, pReq->fqdn);
} else {
pReq->sessionTimeoutMs = DEFAULT_SESSION_TIMEOUT;
pReq->maxPollIntervalMs = DEFAULT_MAX_POLL_INTERVAL;

View File

@ -82,6 +82,8 @@ struct tmq_t {
int64_t refId;
char groupId[TSDB_CGROUP_LEN];
char clientId[TSDB_CLIENT_ID_LEN];
char user[TSDB_USER_LEN];
char fqdn[TSDB_FQDN_LEN];
int8_t withTbName;
int8_t useSnapshot;
int8_t autoCommit;
@ -1265,6 +1267,10 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
pTmq->replayEnable = conf->replayEnable;
pTmq->sourceExcluded = conf->sourceExcluded;
pTmq->enableBatchMeta = conf->enableBatchMeta;
tstrncpy(pTmq->user, user, TSDB_USER_LEN);
if (taosGetFqdn(pTmq->fqdn) != 0) {
(void)strcpy(pTmq->fqdn, "localhost");
}
if (conf->replayEnable) {
pTmq->autoCommit = false;
}
@ -1332,6 +1338,8 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
req.consumerId = tmq->consumerId;
tstrncpy(req.clientId, tmq->clientId, TSDB_CLIENT_ID_LEN);
tstrncpy(req.cgroup, tmq->groupId, TSDB_CGROUP_LEN);
tstrncpy(req.user, tmq->user, TSDB_USER_LEN);
tstrncpy(req.fqdn, tmq->fqdn, TSDB_FQDN_LEN);
req.topicNames = taosArrayInit(sz, sizeof(void*));
if (req.topicNames == NULL) {

View File

@ -344,7 +344,9 @@ static const SSysDbTableSchema subscriptionSchema[] = {
{.name = "topic_name", .bytes = TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
{.name = "consumer_group", .bytes = TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
{.name = "vgroup_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false},
{.name = "consumer_id", .bytes = 32, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
{.name = "consumer_id", .bytes = TSDB_CLIENT_ID_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
{.name = "user", .bytes = TSDB_USER_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
{.name = "fqdn", .bytes = TSDB_FQDN_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
{.name = "offset", .bytes = TSDB_OFFSET_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
{.name = "rows", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false},
};
@ -480,11 +482,12 @@ static const SSysDbTableSchema connectionsSchema[] = {
{.name = "last_access", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false},
};
static const SSysDbTableSchema consumerSchema[] = {
{.name = "consumer_id", .bytes = TSDB_CONSUMER_ID_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
{.name = "consumer_group", .bytes = TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
{.name = "client_id", .bytes = TSDB_CLIENT_ID_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
{.name = "user", .bytes = TSDB_USER_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
{.name = "fqdn", .bytes = TSDB_FQDN_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
{.name = "status", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
{.name = "topics", .bytes = TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
/*{.name = "end_point", .bytes = TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},*/

View File

@ -597,6 +597,8 @@ typedef struct {
int64_t consumerId;
char cgroup[TSDB_CGROUP_LEN];
char clientId[TSDB_CLIENT_ID_LEN];
char user[TSDB_USER_LEN];
char fqdn[TSDB_FQDN_LEN];
int8_t updateType; // used only for update
int32_t epoch;
int32_t status;

View File

@ -903,6 +903,22 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *
MND_TMQ_NULL_CHECK(pColInfo);
MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)clientId, false));
// user
char user[TSDB_USER_LEN + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(user, pConsumer->user);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
MND_TMQ_NULL_CHECK(pColInfo);
MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)user, false));
// fqdn
char fqdn[TSDB_FQDN_LEN + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(fqdn, pConsumer->fqdn);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
MND_TMQ_NULL_CHECK(pColInfo);
MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)fqdn, false));
// status
const char *pStatusName = mndConsumerStatusName(pConsumer->status);
status = taosMemoryCalloc(1, pShow->pMeta->pSchemas[cols].bytes);

View File

@ -325,6 +325,8 @@ int32_t tNewSMqConsumerObj(int64_t consumerId, char *cgroup, int8_t updateType,
pConsumer->resetOffsetCfg = subscribe->resetOffsetCfg;
pConsumer->maxPollIntervalMs = subscribe->maxPollIntervalMs;
pConsumer->sessionTimeoutMs = subscribe->sessionTimeoutMs;
tstrncpy(pConsumer->user, subscribe->user, TSDB_USER_LEN);
tstrncpy(pConsumer->fqdn, subscribe->fqdn, TSDB_FQDN_LEN);
pConsumer->rebNewTopics = taosArrayDup(subscribe->topicNames, topicNameDup);
if (pConsumer->rebNewTopics == NULL){
@ -429,6 +431,8 @@ int32_t tEncodeSMqConsumerObj(void **buf, const SMqConsumerObj *pConsumer) {
tlen += taosEncodeFixedI32(buf, pConsumer->resetOffsetCfg);
tlen += taosEncodeFixedI32(buf, pConsumer->maxPollIntervalMs);
tlen += taosEncodeFixedI32(buf, pConsumer->sessionTimeoutMs);
tlen += taosEncodeString(buf, pConsumer->user);
tlen += taosEncodeString(buf, pConsumer->fqdn);
return tlen;
}
@ -503,6 +507,8 @@ void *tDecodeSMqConsumerObj(const void *buf, SMqConsumerObj *pConsumer, int8_t s
if (sver > 2){
buf = taosDecodeFixedI32(buf, &pConsumer->maxPollIntervalMs);
buf = taosDecodeFixedI32(buf, &pConsumer->sessionTimeoutMs);
buf = taosDecodeStringTo(buf, pConsumer->user);
buf = taosDecodeStringTo(buf, pConsumer->fqdn);
} else{
pConsumer->maxPollIntervalMs = DEFAULT_MAX_POLL_INTERVAL;
pConsumer->sessionTimeoutMs = DEFAULT_SESSION_TIMEOUT;

View File

@ -1330,8 +1330,8 @@ END:
TAOS_RETURN(code);
}
static int32_t buildResult(SSDataBlock *pBlock, int32_t *numOfRows, int64_t consumerId, const char *topic,
const char *cgroup, SArray *vgs, SArray *offsetRows) {
static int32_t buildResult(SSDataBlock *pBlock, int32_t *numOfRows, int64_t consumerId, const char* user, const char* fqdn,
const char *topic, const char *cgroup, SArray *vgs, SArray *offsetRows) {
int32_t code = 0;
int32_t sz = taosArrayGetSize(vgs);
for (int32_t j = 0; j < sz; j++) {
@ -1355,7 +1355,7 @@ static int32_t buildResult(SSDataBlock *pBlock, int32_t *numOfRows, int64_t cons
MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)&pVgEp->vgId, false));
// consumer id
char consumerIdHex[32] = {0};
char consumerIdHex[TSDB_CONSUMER_ID_LEN] = {0};
(void)sprintf(varDataVal(consumerIdHex), "0x%" PRIx64, consumerId);
varDataSetLen(consumerIdHex, strlen(varDataVal(consumerIdHex)));
@ -1363,6 +1363,18 @@ static int32_t buildResult(SSDataBlock *pBlock, int32_t *numOfRows, int64_t cons
MND_TMQ_NULL_CHECK(pColInfo);
MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)consumerIdHex, consumerId == -1));
char userStr[TSDB_USER_LEN + VARSTR_HEADER_SIZE] = {0};
if (user) STR_TO_VARSTR(userStr, user);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
MND_TMQ_NULL_CHECK(pColInfo);
MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, userStr, user == NULL));
char fqdnStr[TSDB_FQDN_LEN + VARSTR_HEADER_SIZE] = {0};
if (fqdn) STR_TO_VARSTR(fqdnStr, fqdn);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
MND_TMQ_NULL_CHECK(pColInfo);
MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, fqdnStr, fqdn == NULL));
mInfo("mnd show subscriptions: topic %s, consumer:0x%" PRIx64 " cgroup %s vgid %d", varDataVal(topic), consumerId,
varDataVal(cgroup), pVgEp->vgId);
@ -1435,16 +1447,25 @@ int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock
SMqConsumerEp *pConsumerEp = NULL;
void *pIter = NULL;
while (1) {
pIter = taosHashIterate(pSub->consumerHash, pIter);
if (pIter == NULL) break;
pConsumerEp = (SMqConsumerEp *)pIter;
MND_TMQ_RETURN_CHECK(buildResult(pBlock, &numOfRows, pConsumerEp->consumerId, topic, cgroup, pConsumerEp->vgs,
char *user = NULL;
char *fqdn = NULL;
SMqConsumerObj *pConsumer = sdbAcquire(pSdb, SDB_CONSUMER, &pConsumerEp->consumerId);
if (pConsumer != NULL) {
user = pConsumer->user;
fqdn = pConsumer->fqdn;
sdbRelease(pSdb, pConsumer);
}
MND_TMQ_RETURN_CHECK(buildResult(pBlock, &numOfRows, pConsumerEp->consumerId, user, fqdn, topic, cgroup, pConsumerEp->vgs,
pConsumerEp->offsetRows));
}
MND_TMQ_RETURN_CHECK(buildResult(pBlock, &numOfRows, -1, topic, cgroup, pSub->unassignedVgs, pSub->offsetRows));
MND_TMQ_RETURN_CHECK(buildResult(pBlock, &numOfRows, -1, NULL, NULL, topic, cgroup, pSub->unassignedVgs, pSub->offsetRows));
pBlock->info.rows = numOfRows;

View File

@ -222,10 +222,10 @@ class TDTestCase:
tdSql.query("select * from information_schema.ins_columns where db_name ='information_schema'")
tdLog.info(len(tdSql.queryResult))
tdSql.checkEqual(True, len(tdSql.queryResult) in range(261, 269))
tdSql.checkEqual(True, len(tdSql.queryResult) in range(261, 271))
tdSql.query("select * from information_schema.ins_columns where db_name ='performance_schema'")
tdSql.checkEqual(54, len(tdSql.queryResult))
tdSql.checkEqual(56, len(tdSql.queryResult))
def ins_dnodes_check(self):
tdSql.execute('drop database if exists db2')

View File

@ -85,7 +85,7 @@ class TDTestCase:
time.sleep(4) # wait for heart beat
tdSql.query(f'show subscriptions;')
sub = tdSql.getData(0, 4);
sub = tdSql.getData(0, 6);
print(sub)
if not sub.startswith("tsdb"):
tdLog.exit(f"show subscriptions error")
@ -196,7 +196,7 @@ class TDTestCase:
time.sleep(4) # wait for heart beat
tdSql.query(f'show subscriptions;')
sub = tdSql.getData(0, 4);
sub = tdSql.getData(0, 6);
print(sub)
if not sub.startswith("tsdb"):
tdLog.exit(f"show subscriptions error")
@ -306,7 +306,7 @@ class TDTestCase:
time.sleep(4) # wait for heart beat
tdSql.query(f'show subscriptions;')
sub = tdSql.getData(0, 4);
sub = tdSql.getData(0, 6);
print(sub)
if not sub.startswith("tsdb"):
tdLog.exit(f"show subscriptions error")
@ -416,7 +416,7 @@ class TDTestCase:
time.sleep(4) # wait for heart beat
tdSql.query(f'show subscriptions;')
sub = tdSql.getData(0, 4);
sub = tdSql.getData(0, 6);
print(sub)
if not sub.startswith("tsdb"):
tdLog.exit(f"show subscriptions error")
@ -517,7 +517,7 @@ class TDTestCase:
consumer.close()
tdSql.query(f'show subscriptions;')
sub = tdSql.getData(0, 4);
sub = tdSql.getData(0, 6);
print(sub)
if not sub.startswith("tsdb"):
tdLog.exit(f"show subscriptions error")

View File

@ -598,12 +598,12 @@ class TDTestCase:
tdSql.query(f'show consumers')
tdSql.checkRows(1)
tdSql.checkData(0, 1, 'g1')
tdSql.checkData(0, 4, 't2')
tdSql.checkData(0, 6, 't2')
tdSql.execute(f'drop consumer group g1 on t1')
tdSql.query(f'show consumers')
tdSql.checkRows(1)
tdSql.checkData(0, 1, 'g1')
tdSql.checkData(0, 4, 't2')
tdSql.checkData(0, 6, 't2')
tdSql.query(f'show subscriptions')
tdSql.checkRows(1)
@ -641,7 +641,7 @@ class TDTestCase:
tdSql.query(f'show consumers')
tdSql.checkRows(1)
tdSql.checkData(0, 1, 'g1')
tdSql.checkData(0, 4, 't2')
tdSql.checkData(0, 6, 't2')
tdSql.execute(f'insert into t4 using st tags(3) values(now, 1)')
try: