Merge pull request #21282 from taosdata/feature/TD-23117

feat:[TD-23117] add schema for ins_topics
This commit is contained in:
Haojun Liao 2023-05-24 14:54:42 +08:00 committed by GitHub
commit 99f669434a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 166 additions and 3 deletions

View File

@ -253,6 +253,7 @@ typedef enum ELogicConditionType {
#define TSDB_IPv4ADDR_LEN 16
#define TSDB_FILENAME_LEN 128
#define TSDB_SHOW_SQL_LEN 2048
#define TSDB_SHOW_SCHEMA_JSON_LEN TSDB_MAX_COLUMNS * 256
#define TSDB_SLOW_QUERY_SQL_LEN 512
#define TSDB_SHOW_SUBQUERY_LEN 1000

View File

@ -280,7 +280,9 @@ static const SSysDbTableSchema topicSchema[] = {
{.name = "db_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false},
{.name = "sql", .bytes = TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
// TODO config
{.name = "schema", .bytes = TSDB_SHOW_SCHEMA_JSON_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
{.name = "meta", .bytes = 4 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
{.name = "type", .bytes = 8 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
};

View File

@ -32,6 +32,7 @@ bool mndIsDbReady(SMnode *pMnode, SDbObj *pDb);
SSdbRaw *mndDbActionEncode(SDbObj *pDb);
const char *mndGetDbStr(const char *src);
const char *mndGetStableStr(const char *src);
int32_t mndProcessCompactDbReq(SRpcMsg *pReq);

View File

@ -521,6 +521,7 @@ typedef struct {
char* physicalPlan;
SSchemaWrapper schema;
int64_t stbUid;
char stbName[TSDB_TABLE_FNAME_LEN];
// forbid condition
int64_t ntbUid;
SArray* ntbColIds;

View File

@ -1543,6 +1543,13 @@ const char *mndGetDbStr(const char *src) {
return pos;
}
const char *mndGetStableStr(const char *src) {
char *pos = strstr(src, TS_PATH_DELIMITER);
if (pos != NULL) ++pos;
if (pos == NULL) return src;
return mndGetDbStr(pos);
}
static int64_t getValOfDiffPrecision(int8_t unit, int64_t val) {
int64_t v = 0;
switch (unit) {

View File

@ -285,6 +285,7 @@ void dumpTopic(SSdb *pSdb, SJson *json) {
tjsonAddStringToObject(item, "subType", i642str(pObj->subType));
tjsonAddStringToObject(item, "withMeta", i642str(pObj->withMeta));
tjsonAddStringToObject(item, "stbUid", i642str(pObj->stbUid));
tjsonAddStringToObject(item, "stbName", mndGetStableStr(pObj->stbName));
tjsonAddStringToObject(item, "sqlLen", i642str(pObj->sqlLen));
tjsonAddStringToObject(item, "astLen", i642str(pObj->astLen));
tjsonAddStringToObject(item, "sqlLen", i642str(pObj->sqlLen));

View File

@ -109,6 +109,7 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
SDB_SET_INT8(pRaw, dataPos, pTopic->withMeta, TOPIC_ENCODE_OVER);
SDB_SET_INT64(pRaw, dataPos, pTopic->stbUid, TOPIC_ENCODE_OVER);
SDB_SET_BINARY(pRaw, dataPos, pTopic->stbName, TSDB_TABLE_FNAME_LEN, TOPIC_ENCODE_OVER);
SDB_SET_INT32(pRaw, dataPos, pTopic->sqlLen, TOPIC_ENCODE_OVER);
SDB_SET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_ENCODE_OVER);
SDB_SET_INT32(pRaw, dataPos, pTopic->astLen, TOPIC_ENCODE_OVER);
@ -196,6 +197,7 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT8(pRaw, dataPos, &pTopic->withMeta, TOPIC_DECODE_OVER);
SDB_GET_INT64(pRaw, dataPos, &pTopic->stbUid, TOPIC_DECODE_OVER);
SDB_GET_BINARY(pRaw, dataPos, pTopic->stbName, TSDB_TABLE_FNAME_LEN, TOPIC_DECODE_OVER);
SDB_GET_INT32(pRaw, dataPos, &pTopic->sqlLen, TOPIC_DECODE_OVER);
pTopic->sql = taosMemoryCalloc(pTopic->sqlLen, sizeof(char));
if (pTopic->sql == NULL) {
@ -460,6 +462,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
return -1;
}
strcpy(topicObj.stbName, pCreate->subStbName);
topicObj.stbUid = pStb->uid;
mndReleaseStb(pMnode, pStb);
}
@ -830,6 +833,43 @@ int32_t mndGetNumOfTopics(SMnode *pMnode, char *dbName, int32_t *pNumOfTopics) {
return 0;
}
static void schemaToJson(SSchema *schema, int32_t nCols, char *schemaJson){
char* string = NULL;
cJSON* columns = cJSON_CreateArray();
if (columns == NULL) {
return;
}
for (int i = 0; i < nCols; i++) {
cJSON* column = cJSON_CreateObject();
SSchema* s = schema + i;
cJSON* cname = cJSON_CreateString(s->name);
cJSON_AddItemToObject(column, "name", cname);
cJSON* ctype = cJSON_CreateString(tDataTypes[s->type].name);
cJSON_AddItemToObject(column, "type", ctype);
int32_t length = 0;
if (s->type == TSDB_DATA_TYPE_BINARY) {
length = s->bytes - VARSTR_HEADER_SIZE;
} else if (s->type == TSDB_DATA_TYPE_NCHAR || s->type == TSDB_DATA_TYPE_JSON) {
length = (s->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
} else{
length = s->bytes;
}
cJSON* cbytes = cJSON_CreateNumber(length);
cJSON_AddItemToObject(column, "length", cbytes);
cJSON_AddItemToArray(columns, column);
}
string = cJSON_PrintUnformatted(columns);
cJSON_Delete(columns);
size_t len = strlen(string);
if(string && len <= TSDB_SHOW_SCHEMA_JSON_LEN){
STR_TO_VARSTR(schemaJson, string);
}else{
mError("mndRetrieveTopic build schema error json:%p, json len:%zu", string, len);
}
taosMemoryFree(string);
}
static int32_t mndRetrieveTopic(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb;
@ -868,6 +908,49 @@ static int32_t mndRetrieveTopic(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBl
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)sql, false);
char *schemaJson = taosMemoryMalloc(TSDB_SHOW_SCHEMA_JSON_LEN + VARSTR_HEADER_SIZE);
if(pTopic->subType == TOPIC_SUB_TYPE__COLUMN){
schemaToJson(pTopic->schema.pSchema, pTopic->schema.nCols, schemaJson);
}else if(pTopic->subType == TOPIC_SUB_TYPE__TABLE){
SStbObj *pStb = mndAcquireStb(pMnode, pTopic->stbName);
if (pStb == NULL) {
terrno = TSDB_CODE_MND_STB_NOT_EXIST;
taosMemoryFree(schemaJson);
return -1;
}
schemaToJson(pStb->pColumns, pStb->numOfColumns, schemaJson);
mndReleaseStb(pMnode, pStb);
}else{
STR_TO_VARSTR(schemaJson, "NULL");
}
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)schemaJson, false);
taosMemoryFree(schemaJson);
char mete[4 + VARSTR_HEADER_SIZE] = {0};
if(pTopic->withMeta){
STR_TO_VARSTR(mete, "yes");
}else{
STR_TO_VARSTR(mete, "no");
}
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)mete, false);
char type[8 + VARSTR_HEADER_SIZE] = {0};
if(pTopic->subType == TOPIC_SUB_TYPE__COLUMN){
STR_TO_VARSTR(type, "column");
}else if(pTopic->subType == TOPIC_SUB_TYPE__TABLE){
STR_TO_VARSTR(type, "stable");
}else{
STR_TO_VARSTR(type, "db");
}
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)type, false);
numOfRows++;
sdbRelease(pSdb, pTopic);
}

View File

@ -22,8 +22,8 @@ class TDTestCase:
tdSql.execute("insert into db.ctb using db.stb tags(1) (ts, c1) values (now, 1)")
tdSql.query("select count(*) from information_schema.ins_columns")
# enterprise version: 285, community version: 277
tdSql.checkData(0, 0, 285)
# enterprise version: 288, community version: 280
tdSql.checkData(0, 0, 288)
tdSql.query("select * from information_schema.ins_columns where table_name = 'ntb'")
tdSql.checkRows(14)

View File

@ -0,0 +1,67 @@
import taos
import sys
import time
import socket
import os
import threading
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
class TDTestCase:
hostname = socket.gethostname()
# rpcDebugFlagVal = '143'
#clientCfgDict = {'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''}
#clientCfgDict["rpcDebugFlag"] = rpcDebugFlagVal
#updatecfgDict = {'clientCfg': {}, 'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''}
# updatecfgDict["rpcDebugFlag"] = rpcDebugFlagVal
#print ("===================: ", updatecfgDict)
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor())
#tdSql.init(conn.cursor(), logSql) # output sql.txt file
def test(self):
tdLog.info("create database, stb, ctb")
tdSql.execute("create database if not exists db1 vgroups 4 wal_retention_period 3600")
tdSql.execute("create table if not exists db1.st(ts timestamp, c1 int, c2 bool, c3 tinyint, c4 double, c5 nchar(8)) tags(t1 int, t2 float, t3 binary(4))")
tdSql.execute("create table if not exists db1.nt(ts timestamp, c1 smallint, c2 float, c3 binary(64), c4 bigint)")
tdSql.execute("create table if not exists db1.st1 using db1.st tags(1, 9.3, \"st1\")")
tdLog.info("create topic")
tdSql.execute("create topic topic_1 as database db1")
tdSql.execute("create topic topic_2 with meta as stable db1.st")
tdSql.execute("create topic topic_3 as select * from db1.nt")
tdSql.execute("create topic topic_4 as select ts,c3,c5,t2 from db1.st")
tdSql.query("select * from information_schema.ins_topics order by topic_name")
tdSql.checkRows(4)
tdSql.checkData(0, 4, "NULL")
tdSql.checkData(0, 5, "no")
tdSql.checkData(0, 6, "db")
tdSql.checkData(1, 4, "[{\"name\":\"ts\",\"type\":\"TIMESTAMP\",\"length\":8},{\"name\":\"c1\",\"type\":\"INT\",\"length\":4},{\"name\":\"c2\",\"type\":\"BOOL\",\"length\":1},{\"name\":\"c3\",\"type\":\"TINYINT\",\"length\":1},{\"name\":\"c4\",\"type\":\"DOUBLE\",\"length\":8},{\"name\":\"c5\",\"type\":\"NCHAR\",\"length\":8}]")
tdSql.checkData(1, 5, "yes")
tdSql.checkData(1, 6, "stable")
tdSql.checkData(2, 4, "[{\"name\":\"ts\",\"type\":\"TIMESTAMP\",\"length\":8},{\"name\":\"c1\",\"type\":\"SMALLINT\",\"length\":2},{\"name\":\"c2\",\"type\":\"FLOAT\",\"length\":4},{\"name\":\"c3\",\"type\":\"VARCHAR\",\"length\":64},{\"name\":\"c4\",\"type\":\"BIGINT\",\"length\":8}]")
tdSql.checkData(2, 5, "no")
tdSql.checkData(2, 6, "column")
tdSql.checkData(3, 4, "[{\"name\":\"ts\",\"type\":\"TIMESTAMP\",\"length\":8},{\"name\":\"c3\",\"type\":\"TINYINT\",\"length\":1},{\"name\":\"c5\",\"type\":\"NCHAR\",\"length\":8},{\"name\":\"t2\",\"type\":\"FLOAT\",\"length\":4}]")
tdSql.checkData(3, 5, "no")
tdSql.checkData(3, 6, "column")
tdLog.printNoPrefix("======== test case end ...... ")
def run(self):
self.test()
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())