feat: add user tags
This commit is contained in:
parent
cede4b7624
commit
fcc87f09fe
|
@ -35,6 +35,7 @@ extern "C" {
|
||||||
#define TSDB_INS_TABLE_USER_INDEXES "user_indexes"
|
#define TSDB_INS_TABLE_USER_INDEXES "user_indexes"
|
||||||
#define TSDB_INS_TABLE_USER_STABLES "user_stables"
|
#define TSDB_INS_TABLE_USER_STABLES "user_stables"
|
||||||
#define TSDB_INS_TABLE_USER_TABLES "user_tables"
|
#define TSDB_INS_TABLE_USER_TABLES "user_tables"
|
||||||
|
#define TSDB_INS_TABLE_USER_TAGS "user_tags"
|
||||||
#define TSDB_INS_TABLE_USER_TABLE_DISTRIBUTED "user_table_distributed"
|
#define TSDB_INS_TABLE_USER_TABLE_DISTRIBUTED "user_table_distributed"
|
||||||
#define TSDB_INS_TABLE_USER_USERS "user_users"
|
#define TSDB_INS_TABLE_USER_USERS "user_users"
|
||||||
#define TSDB_INS_TABLE_LICENCES "grants"
|
#define TSDB_INS_TABLE_LICENCES "grants"
|
||||||
|
|
|
@ -153,6 +153,16 @@ static const SSysDbTableSchema userTblsSchema[] = {
|
||||||
{.name = "type", .bytes = 21 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
{.name = "type", .bytes = 21 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
static const SSysDbTableSchema userTagsSchema[] = {
|
||||||
|
{.name = "table_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
|
{.name = "uid", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT},
|
||||||
|
{.name = "stable_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
|
{.name = "db_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
|
{.name = "tag_name", .bytes = TSDB_COL_NAME_LEN - 1 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BIGINT},
|
||||||
|
{.name = "tag_type", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
||||||
|
{.name = "tag_value", .bytes = TSDB_MAX_TAGS_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
|
};
|
||||||
|
|
||||||
static const SSysDbTableSchema userTblDistSchema[] = {
|
static const SSysDbTableSchema userTblDistSchema[] = {
|
||||||
{.name = "db_name", .bytes = 32 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
{.name = "db_name", .bytes = 32 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
{.name = "table_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
|
{.name = "table_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
|
@ -255,6 +265,7 @@ static const SSysTableMeta infosMeta[] = {
|
||||||
{TSDB_INS_TABLE_USER_STABLES, userStbsSchema, tListLen(userStbsSchema)},
|
{TSDB_INS_TABLE_USER_STABLES, userStbsSchema, tListLen(userStbsSchema)},
|
||||||
{TSDB_PERFS_TABLE_STREAMS, streamSchema, tListLen(streamSchema)},
|
{TSDB_PERFS_TABLE_STREAMS, streamSchema, tListLen(streamSchema)},
|
||||||
{TSDB_INS_TABLE_USER_TABLES, userTblsSchema, tListLen(userTblsSchema)},
|
{TSDB_INS_TABLE_USER_TABLES, userTblsSchema, tListLen(userTblsSchema)},
|
||||||
|
{TSDB_INS_TABLE_USER_TAGS, userTagsSchema, tListLen(userTagsSchema)},
|
||||||
// {TSDB_INS_TABLE_USER_TABLE_DISTRIBUTED, userTblDistSchema, tListLen(userTblDistSchema)},
|
// {TSDB_INS_TABLE_USER_TABLE_DISTRIBUTED, userTblDistSchema, tListLen(userTblDistSchema)},
|
||||||
{TSDB_INS_TABLE_USER_USERS, userUsersSchema, tListLen(userUsersSchema)},
|
{TSDB_INS_TABLE_USER_USERS, userUsersSchema, tListLen(userUsersSchema)},
|
||||||
{TSDB_INS_TABLE_LICENCES, grantsSchema, tListLen(grantsSchema)},
|
{TSDB_INS_TABLE_LICENCES, grantsSchema, tListLen(grantsSchema)},
|
||||||
|
|
|
@ -1765,6 +1765,172 @@ static SSDataBlock* buildSysTableMetaBlock() {
|
||||||
return pBlock;
|
return pBlock;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
|
||||||
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
SSysTableScanInfo* pInfo = pOperator->info;
|
||||||
|
const char* name = tNameGetTableName(&pInfo->name);
|
||||||
|
if (pOperator->status == OP_EXEC_DONE) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pInfo->pCur == NULL) {
|
||||||
|
pInfo->pCur = metaOpenTbCursor(pInfo->readHandle.meta);
|
||||||
|
}
|
||||||
|
|
||||||
|
blockDataCleanup(pInfo->pRes);
|
||||||
|
int32_t numOfRows = 0;
|
||||||
|
|
||||||
|
const char* db = NULL;
|
||||||
|
int32_t vgId = 0;
|
||||||
|
vnodeGetInfo(pInfo->readHandle.vnode, &db, &vgId);
|
||||||
|
|
||||||
|
SName sn = {0};
|
||||||
|
char dbname[TSDB_DB_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||||
|
tNameFromString(&sn, db, T_NAME_ACCT | T_NAME_DB);
|
||||||
|
|
||||||
|
tNameGetDbName(&sn, varDataVal(dbname));
|
||||||
|
varDataSetLen(dbname, strlen(varDataVal(dbname)));
|
||||||
|
|
||||||
|
SSDataBlock* p = buildSysTableMetaBlock();
|
||||||
|
blockDataEnsureCapacity(p, pOperator->resultInfo.capacity);
|
||||||
|
|
||||||
|
char n[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||||
|
|
||||||
|
int32_t ret = 0;
|
||||||
|
while ((ret = metaTbCursorNext(pInfo->pCur)) == 0) {
|
||||||
|
STR_TO_VARSTR(n, pInfo->pCur->mr.me.name);
|
||||||
|
|
||||||
|
// table name
|
||||||
|
SColumnInfoData* pColInfoData = taosArrayGet(p->pDataBlock, 0);
|
||||||
|
colDataAppend(pColInfoData, numOfRows, n, false);
|
||||||
|
|
||||||
|
// database name
|
||||||
|
pColInfoData = taosArrayGet(p->pDataBlock, 1);
|
||||||
|
colDataAppend(pColInfoData, numOfRows, dbname, false);
|
||||||
|
|
||||||
|
// vgId
|
||||||
|
pColInfoData = taosArrayGet(p->pDataBlock, 6);
|
||||||
|
colDataAppend(pColInfoData, numOfRows, (char*)&vgId, false);
|
||||||
|
|
||||||
|
int32_t tableType = pInfo->pCur->mr.me.type;
|
||||||
|
if (tableType == TSDB_CHILD_TABLE) {
|
||||||
|
// create time
|
||||||
|
int64_t ts = pInfo->pCur->mr.me.ctbEntry.ctime;
|
||||||
|
pColInfoData = taosArrayGet(p->pDataBlock, 2);
|
||||||
|
colDataAppend(pColInfoData, numOfRows, (char*)&ts, false);
|
||||||
|
|
||||||
|
SMetaReader mr = {0};
|
||||||
|
metaReaderInit(&mr, pInfo->readHandle.meta, 0);
|
||||||
|
|
||||||
|
uint64_t suid = pInfo->pCur->mr.me.ctbEntry.suid;
|
||||||
|
int32_t code = metaGetTableEntryByUid(&mr, suid);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
qError("failed to get super table meta, uid:0x%" PRIx64 ", code:%s, %s", suid, tstrerror(terrno),
|
||||||
|
GET_TASKID(pTaskInfo));
|
||||||
|
metaReaderClear(&mr);
|
||||||
|
metaCloseTbCursor(pInfo->pCur);
|
||||||
|
pInfo->pCur = NULL;
|
||||||
|
longjmp(pTaskInfo->env, terrno);
|
||||||
|
}
|
||||||
|
|
||||||
|
// number of columns
|
||||||
|
pColInfoData = taosArrayGet(p->pDataBlock, 3);
|
||||||
|
colDataAppend(pColInfoData, numOfRows, (char*)&mr.me.stbEntry.schemaRow.nCols, false);
|
||||||
|
|
||||||
|
// super table name
|
||||||
|
STR_TO_VARSTR(n, mr.me.name);
|
||||||
|
pColInfoData = taosArrayGet(p->pDataBlock, 4);
|
||||||
|
colDataAppend(pColInfoData, numOfRows, n, false);
|
||||||
|
metaReaderClear(&mr);
|
||||||
|
|
||||||
|
// table comment
|
||||||
|
pColInfoData = taosArrayGet(p->pDataBlock, 8);
|
||||||
|
if (pInfo->pCur->mr.me.ctbEntry.commentLen > 0) {
|
||||||
|
char comment[TSDB_TB_COMMENT_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||||
|
STR_TO_VARSTR(comment, pInfo->pCur->mr.me.ctbEntry.comment);
|
||||||
|
colDataAppend(pColInfoData, numOfRows, comment, false);
|
||||||
|
} else if (pInfo->pCur->mr.me.ctbEntry.commentLen == 0) {
|
||||||
|
char comment[VARSTR_HEADER_SIZE + VARSTR_HEADER_SIZE] = {0};
|
||||||
|
STR_TO_VARSTR(comment, "");
|
||||||
|
colDataAppend(pColInfoData, numOfRows, comment, false);
|
||||||
|
} else {
|
||||||
|
colDataAppendNULL(pColInfoData, numOfRows);
|
||||||
|
}
|
||||||
|
|
||||||
|
// uid
|
||||||
|
pColInfoData = taosArrayGet(p->pDataBlock, 5);
|
||||||
|
colDataAppend(pColInfoData, numOfRows, (char*)&pInfo->pCur->mr.me.uid, false);
|
||||||
|
|
||||||
|
// ttl
|
||||||
|
pColInfoData = taosArrayGet(p->pDataBlock, 7);
|
||||||
|
colDataAppend(pColInfoData, numOfRows, (char*)&pInfo->pCur->mr.me.ctbEntry.ttlDays, false);
|
||||||
|
|
||||||
|
STR_TO_VARSTR(n, "CHILD_TABLE");
|
||||||
|
} else if (tableType == TSDB_NORMAL_TABLE) {
|
||||||
|
// create time
|
||||||
|
pColInfoData = taosArrayGet(p->pDataBlock, 2);
|
||||||
|
colDataAppend(pColInfoData, numOfRows, (char*)&pInfo->pCur->mr.me.ntbEntry.ctime, false);
|
||||||
|
|
||||||
|
// number of columns
|
||||||
|
pColInfoData = taosArrayGet(p->pDataBlock, 3);
|
||||||
|
colDataAppend(pColInfoData, numOfRows, (char*)&pInfo->pCur->mr.me.ntbEntry.schemaRow.nCols, false);
|
||||||
|
|
||||||
|
// super table name
|
||||||
|
pColInfoData = taosArrayGet(p->pDataBlock, 4);
|
||||||
|
colDataAppendNULL(pColInfoData, numOfRows);
|
||||||
|
|
||||||
|
// table comment
|
||||||
|
pColInfoData = taosArrayGet(p->pDataBlock, 8);
|
||||||
|
if (pInfo->pCur->mr.me.ntbEntry.commentLen > 0) {
|
||||||
|
char comment[TSDB_TB_COMMENT_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||||
|
STR_TO_VARSTR(comment, pInfo->pCur->mr.me.ntbEntry.comment);
|
||||||
|
colDataAppend(pColInfoData, numOfRows, comment, false);
|
||||||
|
} else if (pInfo->pCur->mr.me.ntbEntry.commentLen == 0) {
|
||||||
|
char comment[VARSTR_HEADER_SIZE + VARSTR_HEADER_SIZE] = {0};
|
||||||
|
STR_TO_VARSTR(comment, "");
|
||||||
|
colDataAppend(pColInfoData, numOfRows, comment, false);
|
||||||
|
} else {
|
||||||
|
colDataAppendNULL(pColInfoData, numOfRows);
|
||||||
|
}
|
||||||
|
|
||||||
|
// uid
|
||||||
|
pColInfoData = taosArrayGet(p->pDataBlock, 5);
|
||||||
|
colDataAppend(pColInfoData, numOfRows, (char*)&pInfo->pCur->mr.me.uid, false);
|
||||||
|
|
||||||
|
// ttl
|
||||||
|
pColInfoData = taosArrayGet(p->pDataBlock, 7);
|
||||||
|
colDataAppend(pColInfoData, numOfRows, (char*)&pInfo->pCur->mr.me.ntbEntry.ttlDays, false);
|
||||||
|
|
||||||
|
STR_TO_VARSTR(n, "NORMAL_TABLE");
|
||||||
|
}
|
||||||
|
|
||||||
|
pColInfoData = taosArrayGet(p->pDataBlock, 9);
|
||||||
|
colDataAppend(pColInfoData, numOfRows, n, false);
|
||||||
|
|
||||||
|
if (++numOfRows >= pOperator->resultInfo.capacity) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// todo temporarily free the cursor here, the true reason why the free is not valid needs to be found
|
||||||
|
if (ret != 0) {
|
||||||
|
metaCloseTbCursor(pInfo->pCur);
|
||||||
|
pInfo->pCur = NULL;
|
||||||
|
doSetOperatorCompleted(pOperator);
|
||||||
|
}
|
||||||
|
|
||||||
|
p->info.rows = numOfRows;
|
||||||
|
pInfo->pRes->info.rows = numOfRows;
|
||||||
|
|
||||||
|
relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock, false);
|
||||||
|
doFilterResult(pInfo);
|
||||||
|
|
||||||
|
blockDataDestroy(p);
|
||||||
|
|
||||||
|
pInfo->loadInfo.totalRows += pInfo->pRes->info.rows;
|
||||||
|
return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes;
|
||||||
|
}
|
||||||
|
|
||||||
static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
|
static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
|
||||||
// build message and send to mnode to fetch the content of system tables.
|
// build message and send to mnode to fetch the content of system tables.
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
@ -2906,4 +3072,3 @@ _error:
|
||||||
taosMemoryFree(pOperator);
|
taosMemoryFree(pOperator);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue