feat: add user tags to system table
This commit is contained in:
parent
96495363d7
commit
97d30c6858
|
@ -170,6 +170,7 @@ typedef enum ENodeType {
|
||||||
QUERY_NODE_SHOW_STABLES_STMT,
|
QUERY_NODE_SHOW_STABLES_STMT,
|
||||||
QUERY_NODE_SHOW_STREAMS_STMT,
|
QUERY_NODE_SHOW_STREAMS_STMT,
|
||||||
QUERY_NODE_SHOW_TABLES_STMT,
|
QUERY_NODE_SHOW_TABLES_STMT,
|
||||||
|
QUERY_NODE_SHOW_TAGS_STMT,
|
||||||
QUERY_NODE_SHOW_USERS_STMT,
|
QUERY_NODE_SHOW_USERS_STMT,
|
||||||
QUERY_NODE_SHOW_LICENCE_STMT,
|
QUERY_NODE_SHOW_LICENCE_STMT,
|
||||||
QUERY_NODE_SHOW_VGROUPS_STMT,
|
QUERY_NODE_SHOW_VGROUPS_STMT,
|
||||||
|
|
|
@ -157,7 +157,7 @@ static const SSysDbTableSchema userTagsSchema[] = {
|
||||||
{.name = "table_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
|
{.name = "table_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
{.name = "db_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
|
{.name = "db_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
{.name = "stable_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
|
{.name = "stable_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
{.name = "tag_name", .bytes = TSDB_COL_NAME_LEN - 1 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BIGINT},
|
{.name = "tag_name", .bytes = TSDB_COL_NAME_LEN - 1 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
{.name = "tag_type", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT},
|
{.name = "tag_type", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT},
|
||||||
{.name = "tag_value", .bytes = TSDB_MAX_TAGS_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
{.name = "tag_value", .bytes = TSDB_MAX_TAGS_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
};
|
};
|
||||||
|
|
|
@ -1596,8 +1596,7 @@ static void destroySysScanOperator(void* param, int32_t numOfOutput) {
|
||||||
|
|
||||||
const char* name = tNameGetTableName(&pInfo->name);
|
const char* name = tNameGetTableName(&pInfo->name);
|
||||||
if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, TSDB_TABLE_FNAME_LEN) == 0 ||
|
if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, TSDB_TABLE_FNAME_LEN) == 0 ||
|
||||||
strncasecmp(name, TSDB_INS_TABLE_USER_TAGS, TSDB_TABLE_FNAME_LEN) == 0 ||
|
strncasecmp(name, TSDB_INS_TABLE_USER_TAGS, TSDB_TABLE_FNAME_LEN) == 0 || pInfo->pCur != NULL) {
|
||||||
pInfo->pCur != NULL) {
|
|
||||||
metaCloseTbCursor(pInfo->pCur);
|
metaCloseTbCursor(pInfo->pCur);
|
||||||
pInfo->pCur = NULL;
|
pInfo->pCur = NULL;
|
||||||
}
|
}
|
||||||
|
@ -1767,6 +1766,76 @@ static SSDataBlock* buildInfoSchemaTableMetaBlock(char* tableName) {
|
||||||
return pBlock;
|
return pBlock;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//TODO: check more datatype, json? and return detailed error when len is not enough
|
||||||
|
static int32_t convertTagDataToVarchar(int8_t tagType, char* tagVal, char* str, size_t len) {
|
||||||
|
switch (tagType) {
|
||||||
|
case TSDB_DATA_TYPE_TINYINT:
|
||||||
|
snprintf(str, len, "%d", *((int8_t*)tagVal));
|
||||||
|
break;
|
||||||
|
|
||||||
|
case TSDB_DATA_TYPE_UTINYINT:
|
||||||
|
snprintf(str, len, "%u", *((uint8_t*)tagVal));
|
||||||
|
break;
|
||||||
|
|
||||||
|
case TSDB_DATA_TYPE_SMALLINT:
|
||||||
|
snprintf(str, len, "%d", *((int16_t*)tagVal));
|
||||||
|
break;
|
||||||
|
|
||||||
|
case TSDB_DATA_TYPE_USMALLINT:
|
||||||
|
snprintf(str, len, "%u", *((uint16_t*)tagVal));
|
||||||
|
break;
|
||||||
|
|
||||||
|
case TSDB_DATA_TYPE_INT:
|
||||||
|
snprintf(str, len, "%d", *((int32_t*)tagVal));
|
||||||
|
break;
|
||||||
|
|
||||||
|
case TSDB_DATA_TYPE_UINT:
|
||||||
|
snprintf(str, len, "%u", *((uint32_t*)tagVal));
|
||||||
|
break;
|
||||||
|
|
||||||
|
case TSDB_DATA_TYPE_BIGINT:
|
||||||
|
snprintf(str, len, "%" PRId64, *((int64_t*)tagVal));
|
||||||
|
break;
|
||||||
|
|
||||||
|
case TSDB_DATA_TYPE_UBIGINT:
|
||||||
|
snprintf(str, len, "%" PRIu64, *((uint64_t*)tagVal));
|
||||||
|
break;
|
||||||
|
|
||||||
|
case TSDB_DATA_TYPE_FLOAT: {
|
||||||
|
float fv = 0;
|
||||||
|
fv = GET_FLOAT_VAL(tagVal);
|
||||||
|
snprintf(str, len, "%f", fv);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
case TSDB_DATA_TYPE_DOUBLE: {
|
||||||
|
double dv = 0;
|
||||||
|
dv = GET_DOUBLE_VAL(tagVal);
|
||||||
|
snprintf(str, len, "%lf", dv);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
case TSDB_DATA_TYPE_BINARY:
|
||||||
|
case TSDB_DATA_TYPE_NCHAR:
|
||||||
|
case TSDB_DATA_TYPE_JSON: {
|
||||||
|
memcpy(str, tagVal, len);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
case TSDB_DATA_TYPE_TIMESTAMP:
|
||||||
|
snprintf(str, len, "%" PRId64, *((int64_t*)tagVal));
|
||||||
|
break;
|
||||||
|
|
||||||
|
case TSDB_DATA_TYPE_BOOL:
|
||||||
|
snprintf(str, len, "%d", *((int8_t*)tagVal));
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
return TSDB_CODE_FAILED;
|
||||||
|
}
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
|
static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
SSysTableScanInfo* pInfo = pOperator->info;
|
SSysTableScanInfo* pInfo = pOperator->info;
|
||||||
|
@ -1856,9 +1925,11 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
|
||||||
} else {
|
} else {
|
||||||
tagData = (char*)pVal;
|
tagData = (char*)pVal;
|
||||||
}
|
}
|
||||||
|
//TODO: memory allocation for varchar size and make sure enough memory
|
||||||
|
char tagStr[1024];
|
||||||
|
convertTagDataToVarchar(tagType, tagData, tagStr, 1024);
|
||||||
pColInfoData = taosArrayGet(p->pDataBlock, 5);
|
pColInfoData = taosArrayGet(p->pDataBlock, 5);
|
||||||
colDataAppend(pColInfoData, numOfRows, tagData,
|
colDataAppend(pColInfoData, numOfRows, tagStr,
|
||||||
(tagData == NULL) || (tagType == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(tagData)));
|
(tagData == NULL) || (tagType == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(tagData)));
|
||||||
|
|
||||||
if (tagType != TSDB_DATA_TYPE_JSON && p != NULL && IS_VAR_DATA_TYPE(((const STagVal*)p)->type) &&
|
if (tagType != TSDB_DATA_TYPE_JSON && p != NULL && IS_VAR_DATA_TYPE(((const STagVal*)p)->type) &&
|
||||||
|
@ -2068,7 +2139,6 @@ static SSDataBlock* sysTableScanUserTables(SOperatorInfo* pOperator) {
|
||||||
pInfo->loadInfo.totalRows += pInfo->pRes->info.rows;
|
pInfo->loadInfo.totalRows += pInfo->pRes->info.rows;
|
||||||
return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes;
|
return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
|
static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
|
||||||
|
|
|
@ -371,6 +371,19 @@ static int32_t collectMetaKeyFromShowTables(SCollectMetaKeyCxt* pCxt, SShowStmt*
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t collectMetaKeyFromShowTags(SCollectMetaKeyCxt* pCxt, SShowStmt* pStmt) {
|
||||||
|
int32_t code = reserveTableMetaInCache(pCxt->pParseCxt->acctId, TSDB_INFORMATION_SCHEMA_DB,
|
||||||
|
TSDB_INS_TABLE_USER_TAGS, pCxt->pMetaCache);
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
if (NULL != pStmt->pDbName) {
|
||||||
|
code = reserveDbVgInfoInCache(pCxt->pParseCxt->acctId, ((SValueNode*)pStmt->pDbName)->literal, pCxt->pMetaCache);
|
||||||
|
} else {
|
||||||
|
code = reserveDbVgInfoInCache(pCxt->pParseCxt->acctId, TSDB_INFORMATION_SCHEMA_DB, pCxt->pMetaCache);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t collectMetaKeyFromShowUsers(SCollectMetaKeyCxt* pCxt, SShowStmt* pStmt) {
|
static int32_t collectMetaKeyFromShowUsers(SCollectMetaKeyCxt* pCxt, SShowStmt* pStmt) {
|
||||||
return reserveTableMetaInCache(pCxt->pParseCxt->acctId, TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_USER_USERS,
|
return reserveTableMetaInCache(pCxt->pParseCxt->acctId, TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_USER_USERS,
|
||||||
pCxt->pMetaCache);
|
pCxt->pMetaCache);
|
||||||
|
@ -537,6 +550,8 @@ static int32_t collectMetaKeyFromQuery(SCollectMetaKeyCxt* pCxt, SNode* pStmt) {
|
||||||
return collectMetaKeyFromShowStreams(pCxt, (SShowStmt*)pStmt);
|
return collectMetaKeyFromShowStreams(pCxt, (SShowStmt*)pStmt);
|
||||||
case QUERY_NODE_SHOW_TABLES_STMT:
|
case QUERY_NODE_SHOW_TABLES_STMT:
|
||||||
return collectMetaKeyFromShowTables(pCxt, (SShowStmt*)pStmt);
|
return collectMetaKeyFromShowTables(pCxt, (SShowStmt*)pStmt);
|
||||||
|
case QUERY_NODE_SHOW_TAGS_STMT:
|
||||||
|
return collectMetaKeyFromShowTags(pCxt, (SShowStmt*)pStmt);
|
||||||
case QUERY_NODE_SHOW_USERS_STMT:
|
case QUERY_NODE_SHOW_USERS_STMT:
|
||||||
return collectMetaKeyFromShowUsers(pCxt, (SShowStmt*)pStmt);
|
return collectMetaKeyFromShowUsers(pCxt, (SShowStmt*)pStmt);
|
||||||
case QUERY_NODE_SHOW_LICENCE_STMT:
|
case QUERY_NODE_SHOW_LICENCE_STMT:
|
||||||
|
|
|
@ -1677,7 +1677,8 @@ static int32_t dnodeToVgroupsInfo(SArray* pDnodes, SVgroupsInfo** pVgsInfo) {
|
||||||
|
|
||||||
static bool sysTableFromVnode(const char* pTable) {
|
static bool sysTableFromVnode(const char* pTable) {
|
||||||
return (0 == strcmp(pTable, TSDB_INS_TABLE_USER_TABLES)) ||
|
return (0 == strcmp(pTable, TSDB_INS_TABLE_USER_TABLES)) ||
|
||||||
(0 == strcmp(pTable, TSDB_INS_TABLE_USER_TABLE_DISTRIBUTED));
|
(0 == strcmp(pTable, TSDB_INS_TABLE_USER_TABLE_DISTRIBUTED) ||
|
||||||
|
(0 == strcmp(pTable, TSDB_INS_TABLE_USER_TAGS)));
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool sysTableFromDnode(const char* pTable) { return 0 == strcmp(pTable, TSDB_INS_TABLE_DNODE_VARIABLES); }
|
static bool sysTableFromDnode(const char* pTable) { return 0 == strcmp(pTable, TSDB_INS_TABLE_DNODE_VARIABLES); }
|
||||||
|
@ -1693,7 +1694,7 @@ static int32_t setVnodeSysTableVgroupList(STranslateContext* pCxt, SName* pName,
|
||||||
code = getDBVgInfoImpl(pCxt, pName, &vgroupList);
|
code = getDBVgInfoImpl(pCxt, pName, &vgroupList);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code && 0 == strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_USER_TABLES)) {
|
||||||
code = addMnodeToVgroupList(&pCxt->pParseCxt->mgmtEpSet, &vgroupList);
|
code = addMnodeToVgroupList(&pCxt->pParseCxt->mgmtEpSet, &vgroupList);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1782,7 +1783,8 @@ static bool isSingleTable(SRealTableNode* pRealTable) {
|
||||||
int8_t tableType = pRealTable->pMeta->tableType;
|
int8_t tableType = pRealTable->pMeta->tableType;
|
||||||
if (TSDB_SYSTEM_TABLE == tableType) {
|
if (TSDB_SYSTEM_TABLE == tableType) {
|
||||||
return 0 != strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_USER_TABLES) &&
|
return 0 != strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_USER_TABLES) &&
|
||||||
0 != strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_USER_TABLE_DISTRIBUTED);
|
0 != strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_USER_TABLE_DISTRIBUTED) &&
|
||||||
|
0 != strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_USER_TAGS);
|
||||||
}
|
}
|
||||||
return (TSDB_CHILD_TABLE == tableType || TSDB_NORMAL_TABLE == tableType);
|
return (TSDB_CHILD_TABLE == tableType || TSDB_NORMAL_TABLE == tableType);
|
||||||
}
|
}
|
||||||
|
@ -5025,6 +5027,8 @@ static const char* getSysTableName(ENodeType type) {
|
||||||
return TSDB_INS_TABLE_USER_DATABASES;
|
return TSDB_INS_TABLE_USER_DATABASES;
|
||||||
case QUERY_NODE_SHOW_TABLES_STMT:
|
case QUERY_NODE_SHOW_TABLES_STMT:
|
||||||
return TSDB_INS_TABLE_USER_TABLES;
|
return TSDB_INS_TABLE_USER_TABLES;
|
||||||
|
case QUERY_NODE_SHOW_TAGS_STMT:
|
||||||
|
return TSDB_INS_TABLE_USER_TAGS;
|
||||||
case QUERY_NODE_SHOW_STABLES_STMT:
|
case QUERY_NODE_SHOW_STABLES_STMT:
|
||||||
return TSDB_INS_TABLE_USER_STABLES;
|
return TSDB_INS_TABLE_USER_STABLES;
|
||||||
case QUERY_NODE_SHOW_USERS_STMT:
|
case QUERY_NODE_SHOW_USERS_STMT:
|
||||||
|
|
|
@ -570,7 +570,8 @@ static int32_t createSystemTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan*
|
||||||
pScan->showRewrite = pScanLogicNode->showRewrite;
|
pScan->showRewrite = pScanLogicNode->showRewrite;
|
||||||
pScan->accountId = pCxt->pPlanCxt->acctId;
|
pScan->accountId = pCxt->pPlanCxt->acctId;
|
||||||
if (0 == strcmp(pScanLogicNode->tableName.tname, TSDB_INS_TABLE_USER_TABLES) ||
|
if (0 == strcmp(pScanLogicNode->tableName.tname, TSDB_INS_TABLE_USER_TABLES) ||
|
||||||
0 == strcmp(pScanLogicNode->tableName.tname, TSDB_INS_TABLE_USER_TABLE_DISTRIBUTED)) {
|
0 == strcmp(pScanLogicNode->tableName.tname, TSDB_INS_TABLE_USER_TABLE_DISTRIBUTED) ||
|
||||||
|
0 == strcmp(pScanLogicNode->tableName.tname, TSDB_INS_TABLE_USER_TAGS)) {
|
||||||
vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
|
vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
|
||||||
} else {
|
} else {
|
||||||
pSubplan->execNode.nodeId = MNODE_HANDLE;
|
pSubplan->execNode.nodeId = MNODE_HANDLE;
|
||||||
|
|
Loading…
Reference in New Issue