Merge pull request #15001 from taosdata/szhou/feature/user-tags
feat: add user tags system table
This commit is contained in:
commit
75578b46e5
|
@ -35,6 +35,7 @@ extern "C" {
|
||||||
#define TSDB_INS_TABLE_USER_INDEXES "user_indexes"
|
#define TSDB_INS_TABLE_USER_INDEXES "user_indexes"
|
||||||
#define TSDB_INS_TABLE_USER_STABLES "user_stables"
|
#define TSDB_INS_TABLE_USER_STABLES "user_stables"
|
||||||
#define TSDB_INS_TABLE_USER_TABLES "user_tables"
|
#define TSDB_INS_TABLE_USER_TABLES "user_tables"
|
||||||
|
#define TSDB_INS_TABLE_USER_TAGS "user_tags"
|
||||||
#define TSDB_INS_TABLE_USER_TABLE_DISTRIBUTED "user_table_distributed"
|
#define TSDB_INS_TABLE_USER_TABLE_DISTRIBUTED "user_table_distributed"
|
||||||
#define TSDB_INS_TABLE_USER_USERS "user_users"
|
#define TSDB_INS_TABLE_USER_USERS "user_users"
|
||||||
#define TSDB_INS_TABLE_LICENCES "grants"
|
#define TSDB_INS_TABLE_LICENCES "grants"
|
||||||
|
|
|
@ -104,6 +104,7 @@ typedef enum _mgmt_table {
|
||||||
TSDB_MGMT_TABLE_STB,
|
TSDB_MGMT_TABLE_STB,
|
||||||
TSDB_MGMT_TABLE_STREAMS,
|
TSDB_MGMT_TABLE_STREAMS,
|
||||||
TSDB_MGMT_TABLE_TABLE,
|
TSDB_MGMT_TABLE_TABLE,
|
||||||
|
TSDB_MGMT_TABLE_TAG,
|
||||||
TSDB_MGMT_TABLE_USER,
|
TSDB_MGMT_TABLE_USER,
|
||||||
TSDB_MGMT_TABLE_GRANTS,
|
TSDB_MGMT_TABLE_GRANTS,
|
||||||
TSDB_MGMT_TABLE_VGROUP,
|
TSDB_MGMT_TABLE_VGROUP,
|
||||||
|
|
|
@ -170,6 +170,7 @@ typedef enum ENodeType {
|
||||||
QUERY_NODE_SHOW_STABLES_STMT,
|
QUERY_NODE_SHOW_STABLES_STMT,
|
||||||
QUERY_NODE_SHOW_STREAMS_STMT,
|
QUERY_NODE_SHOW_STREAMS_STMT,
|
||||||
QUERY_NODE_SHOW_TABLES_STMT,
|
QUERY_NODE_SHOW_TABLES_STMT,
|
||||||
|
QUERY_NODE_SHOW_TAGS_STMT,
|
||||||
QUERY_NODE_SHOW_USERS_STMT,
|
QUERY_NODE_SHOW_USERS_STMT,
|
||||||
QUERY_NODE_SHOW_LICENCE_STMT,
|
QUERY_NODE_SHOW_LICENCE_STMT,
|
||||||
QUERY_NODE_SHOW_VGROUPS_STMT,
|
QUERY_NODE_SHOW_VGROUPS_STMT,
|
||||||
|
|
|
@ -153,6 +153,15 @@ static const SSysDbTableSchema userTblsSchema[] = {
|
||||||
{.name = "type", .bytes = 21 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
{.name = "type", .bytes = 21 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
static const SSysDbTableSchema userTagsSchema[] = {
|
||||||
|
{.name = "table_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
|
{.name = "db_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
|
{.name = "stable_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
|
{.name = "tag_name", .bytes = TSDB_COL_NAME_LEN - 1 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
|
{.name = "tag_type", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT},
|
||||||
|
{.name = "tag_value", .bytes = TSDB_MAX_TAGS_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
|
};
|
||||||
|
|
||||||
static const SSysDbTableSchema userTblDistSchema[] = {
|
static const SSysDbTableSchema userTblDistSchema[] = {
|
||||||
{.name = "db_name", .bytes = 32 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
{.name = "db_name", .bytes = 32 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
{.name = "table_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
|
{.name = "table_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
|
@ -255,6 +264,7 @@ static const SSysTableMeta infosMeta[] = {
|
||||||
{TSDB_INS_TABLE_USER_STABLES, userStbsSchema, tListLen(userStbsSchema)},
|
{TSDB_INS_TABLE_USER_STABLES, userStbsSchema, tListLen(userStbsSchema)},
|
||||||
{TSDB_PERFS_TABLE_STREAMS, streamSchema, tListLen(streamSchema)},
|
{TSDB_PERFS_TABLE_STREAMS, streamSchema, tListLen(streamSchema)},
|
||||||
{TSDB_INS_TABLE_USER_TABLES, userTblsSchema, tListLen(userTblsSchema)},
|
{TSDB_INS_TABLE_USER_TABLES, userTblsSchema, tListLen(userTblsSchema)},
|
||||||
|
{TSDB_INS_TABLE_USER_TAGS, userTagsSchema, tListLen(userTagsSchema)},
|
||||||
// {TSDB_INS_TABLE_USER_TABLE_DISTRIBUTED, userTblDistSchema, tListLen(userTblDistSchema)},
|
// {TSDB_INS_TABLE_USER_TABLE_DISTRIBUTED, userTblDistSchema, tListLen(userTblDistSchema)},
|
||||||
{TSDB_INS_TABLE_USER_USERS, userUsersSchema, tListLen(userUsersSchema)},
|
{TSDB_INS_TABLE_USER_USERS, userUsersSchema, tListLen(userUsersSchema)},
|
||||||
{TSDB_INS_TABLE_LICENCES, grantsSchema, tListLen(grantsSchema)},
|
{TSDB_INS_TABLE_LICENCES, grantsSchema, tListLen(grantsSchema)},
|
||||||
|
|
|
@ -76,6 +76,8 @@ static int32_t convertToRetrieveType(char *name, int32_t len) {
|
||||||
type = TSDB_MGMT_TABLE_STB;
|
type = TSDB_MGMT_TABLE_STB;
|
||||||
} else if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, len) == 0) {
|
} else if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, len) == 0) {
|
||||||
type = TSDB_MGMT_TABLE_TABLE;
|
type = TSDB_MGMT_TABLE_TABLE;
|
||||||
|
} else if (strncasecmp(name, TSDB_INS_TABLE_USER_TAGS, len) == 0) {
|
||||||
|
type = TSDB_MGMT_TABLE_TAG;
|
||||||
} else if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLE_DISTRIBUTED, len) == 0) {
|
} else if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLE_DISTRIBUTED, len) == 0) {
|
||||||
// type = TSDB_MGMT_TABLE_DIST;
|
// type = TSDB_MGMT_TABLE_DIST;
|
||||||
} else if (strncasecmp(name, TSDB_INS_TABLE_USER_USERS, len) == 0) {
|
} else if (strncasecmp(name, TSDB_INS_TABLE_USER_USERS, len) == 0) {
|
||||||
|
|
|
@ -1596,7 +1596,8 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
|
||||||
pInfo->pUpdateRes = createResDataBlock(pDescNode);
|
pInfo->pUpdateRes = createResDataBlock(pDescNode);
|
||||||
pInfo->pCondition = pScanPhyNode->node.pConditions;
|
pInfo->pCondition = pScanPhyNode->node.pConditions;
|
||||||
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
|
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
|
||||||
pInfo->sessionSup = (SessionWindowSupporter){.pStreamAggSup = NULL, .gap = -1, .parentType = QUERY_NODE_PHYSICAL_PLAN};
|
pInfo->sessionSup =
|
||||||
|
(SessionWindowSupporter){.pStreamAggSup = NULL, .gap = -1, .parentType = QUERY_NODE_PHYSICAL_PLAN};
|
||||||
pInfo->groupId = 0;
|
pInfo->groupId = 0;
|
||||||
pInfo->pPullDataRes = createPullDataBlock();
|
pInfo->pPullDataRes = createPullDataBlock();
|
||||||
pInfo->pStreamScanOp = pOperator;
|
pInfo->pStreamScanOp = pOperator;
|
||||||
|
@ -1630,7 +1631,8 @@ static void destroySysScanOperator(void* param, int32_t numOfOutput) {
|
||||||
blockDataDestroy(pInfo->pRes);
|
blockDataDestroy(pInfo->pRes);
|
||||||
|
|
||||||
const char* name = tNameGetTableName(&pInfo->name);
|
const char* name = tNameGetTableName(&pInfo->name);
|
||||||
if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, TSDB_TABLE_FNAME_LEN) == 0 || pInfo->pCur != NULL) {
|
if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, TSDB_TABLE_FNAME_LEN) == 0 ||
|
||||||
|
strncasecmp(name, TSDB_INS_TABLE_USER_TAGS, TSDB_TABLE_FNAME_LEN) == 0 || pInfo->pCur != NULL) {
|
||||||
metaCloseTbCursor(pInfo->pCur);
|
metaCloseTbCursor(pInfo->pCur);
|
||||||
pInfo->pCur = NULL;
|
pInfo->pCur = NULL;
|
||||||
}
|
}
|
||||||
|
@ -1777,14 +1779,14 @@ static SSDataBlock* doFilterResult(SSysTableScanInfo* pInfo) {
|
||||||
return pInfo->pRes->info.rows == 0 ? NULL : pInfo->pRes;
|
return pInfo->pRes->info.rows == 0 ? NULL : pInfo->pRes;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SSDataBlock* buildSysTableMetaBlock() {
|
static SSDataBlock* buildInfoSchemaTableMetaBlock(char* tableName) {
|
||||||
size_t size = 0;
|
size_t size = 0;
|
||||||
const SSysTableMeta* pMeta = NULL;
|
const SSysTableMeta* pMeta = NULL;
|
||||||
getInfosDbMeta(&pMeta, &size);
|
getInfosDbMeta(&pMeta, &size);
|
||||||
|
|
||||||
int32_t index = 0;
|
int32_t index = 0;
|
||||||
for (int32_t i = 0; i < size; ++i) {
|
for (int32_t i = 0; i < size; ++i) {
|
||||||
if (strcmp(pMeta[i].name, TSDB_INS_TABLE_USER_TABLES) == 0) {
|
if (strcmp(pMeta[i].name, tableName) == 0) {
|
||||||
index = i;
|
index = i;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -1800,14 +1802,235 @@ static SSDataBlock* buildSysTableMetaBlock() {
|
||||||
return pBlock;
|
return pBlock;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
|
// TODO: check more datatype, json? and return detailed error when len is not enough
|
||||||
// build message and send to mnode to fetch the content of system tables.
|
static int32_t convertTagDataToTagVarchar(int8_t tagType, char* tagVal, uint32_t tagLen, char* varData,
|
||||||
|
int32_t bufSize) {
|
||||||
|
int outputLen = -1;
|
||||||
|
switch (tagType) {
|
||||||
|
case TSDB_DATA_TYPE_TINYINT:
|
||||||
|
outputLen = snprintf(varDataVal(varData), bufSize, "%d", *((int8_t*)tagVal));
|
||||||
|
break;
|
||||||
|
|
||||||
|
case TSDB_DATA_TYPE_UTINYINT:
|
||||||
|
outputLen = snprintf(varDataVal(varData), bufSize, "%u", *((uint8_t*)tagVal));
|
||||||
|
break;
|
||||||
|
|
||||||
|
case TSDB_DATA_TYPE_SMALLINT:
|
||||||
|
outputLen = snprintf(varDataVal(varData), bufSize, "%d", *((int16_t*)tagVal));
|
||||||
|
break;
|
||||||
|
|
||||||
|
case TSDB_DATA_TYPE_USMALLINT:
|
||||||
|
outputLen = snprintf(varDataVal(varData), bufSize, "%u", *((uint16_t*)tagVal));
|
||||||
|
break;
|
||||||
|
|
||||||
|
case TSDB_DATA_TYPE_INT:
|
||||||
|
outputLen = snprintf(varDataVal(varData), bufSize, "%d", *((int32_t*)tagVal));
|
||||||
|
break;
|
||||||
|
|
||||||
|
case TSDB_DATA_TYPE_UINT:
|
||||||
|
outputLen = snprintf(varDataVal(varData), bufSize, "%u", *((uint32_t*)tagVal));
|
||||||
|
break;
|
||||||
|
|
||||||
|
case TSDB_DATA_TYPE_BIGINT:
|
||||||
|
outputLen = snprintf(varDataVal(varData), bufSize, "%" PRId64, *((int64_t*)tagVal));
|
||||||
|
break;
|
||||||
|
|
||||||
|
case TSDB_DATA_TYPE_UBIGINT:
|
||||||
|
outputLen = snprintf(varDataVal(varData), bufSize, "%" PRIu64, *((uint64_t*)tagVal));
|
||||||
|
break;
|
||||||
|
|
||||||
|
case TSDB_DATA_TYPE_FLOAT: {
|
||||||
|
float fv = 0;
|
||||||
|
fv = GET_FLOAT_VAL(tagVal);
|
||||||
|
outputLen = snprintf(varDataVal(varData), bufSize, "%f", fv);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
case TSDB_DATA_TYPE_DOUBLE: {
|
||||||
|
double dv = 0;
|
||||||
|
dv = GET_DOUBLE_VAL(tagVal);
|
||||||
|
outputLen = snprintf(varDataVal(varData), bufSize, "%lf", dv);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
case TSDB_DATA_TYPE_BINARY:
|
||||||
|
case TSDB_DATA_TYPE_NCHAR:
|
||||||
|
case TSDB_DATA_TYPE_JSON: {
|
||||||
|
memcpy(varDataVal(varData), tagVal, tagLen);
|
||||||
|
outputLen = tagLen;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
case TSDB_DATA_TYPE_TIMESTAMP:
|
||||||
|
outputLen = snprintf(varDataVal(varData), bufSize, "%" PRId64, *((int64_t*)tagVal));
|
||||||
|
break;
|
||||||
|
|
||||||
|
case TSDB_DATA_TYPE_BOOL:
|
||||||
|
outputLen = snprintf(varDataVal(varData), bufSize, "%d", *((int8_t*)tagVal));
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
return TSDB_CODE_FAILED;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (outputLen < 0 || outputLen == bufSize && !IS_VAR_DATA_TYPE(tagType) || outputLen > bufSize) {
|
||||||
|
return TSDB_CODE_FAILED;
|
||||||
|
}
|
||||||
|
varDataSetLen(varData, outputLen);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
SSysTableScanInfo* pInfo = pOperator->info;
|
SSysTableScanInfo* pInfo = pOperator->info;
|
||||||
|
if (pOperator->status == OP_EXEC_DONE) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
// retrieve local table list info from vnode
|
if (pInfo->pCur == NULL) {
|
||||||
const char* name = tNameGetTableName(&pInfo->name);
|
pInfo->pCur = metaOpenTbCursor(pInfo->readHandle.meta);
|
||||||
if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, TSDB_TABLE_FNAME_LEN) == 0) {
|
}
|
||||||
|
|
||||||
|
blockDataCleanup(pInfo->pRes);
|
||||||
|
int32_t numOfRows = 0;
|
||||||
|
|
||||||
|
const char* db = NULL;
|
||||||
|
int32_t vgId = 0;
|
||||||
|
vnodeGetInfo(pInfo->readHandle.vnode, &db, &vgId);
|
||||||
|
|
||||||
|
SName sn = {0};
|
||||||
|
char dbname[TSDB_DB_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||||
|
tNameFromString(&sn, db, T_NAME_ACCT | T_NAME_DB);
|
||||||
|
|
||||||
|
tNameGetDbName(&sn, varDataVal(dbname));
|
||||||
|
varDataSetLen(dbname, strlen(varDataVal(dbname)));
|
||||||
|
|
||||||
|
SSDataBlock* p = buildInfoSchemaTableMetaBlock(TSDB_INS_TABLE_USER_TAGS);
|
||||||
|
blockDataEnsureCapacity(p, pOperator->resultInfo.capacity);
|
||||||
|
|
||||||
|
int32_t ret = 0;
|
||||||
|
while ((ret = metaTbCursorNext(pInfo->pCur)) == 0) {
|
||||||
|
if (pInfo->pCur->mr.me.type != TSDB_CHILD_TABLE) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
char tableName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||||
|
STR_TO_VARSTR(tableName, pInfo->pCur->mr.me.name);
|
||||||
|
|
||||||
|
SMetaReader smr = {0};
|
||||||
|
metaReaderInit(&smr, pInfo->readHandle.meta, 0);
|
||||||
|
|
||||||
|
uint64_t suid = pInfo->pCur->mr.me.ctbEntry.suid;
|
||||||
|
int32_t code = metaGetTableEntryByUid(&smr, suid);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
qError("failed to get super table meta, uid:0x%" PRIx64 ", code:%s, %s", suid, tstrerror(terrno),
|
||||||
|
GET_TASKID(pTaskInfo));
|
||||||
|
metaReaderClear(&smr);
|
||||||
|
metaCloseTbCursor(pInfo->pCur);
|
||||||
|
pInfo->pCur = NULL;
|
||||||
|
longjmp(pTaskInfo->env, terrno);
|
||||||
|
}
|
||||||
|
|
||||||
|
char stableName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||||
|
STR_TO_VARSTR(stableName, smr.me.name);
|
||||||
|
|
||||||
|
int32_t numOfTags = smr.me.stbEntry.schemaTag.nCols;
|
||||||
|
for (int32_t i = 0; i < numOfTags; ++i) {
|
||||||
|
SColumnInfoData* pColInfoData = NULL;
|
||||||
|
|
||||||
|
// table name
|
||||||
|
pColInfoData = taosArrayGet(p->pDataBlock, 0);
|
||||||
|
colDataAppend(pColInfoData, numOfRows, tableName, false);
|
||||||
|
|
||||||
|
// database name
|
||||||
|
pColInfoData = taosArrayGet(p->pDataBlock, 1);
|
||||||
|
colDataAppend(pColInfoData, numOfRows, dbname, false);
|
||||||
|
|
||||||
|
// super table name
|
||||||
|
pColInfoData = taosArrayGet(p->pDataBlock, 2);
|
||||||
|
colDataAppend(pColInfoData, numOfRows, stableName, false);
|
||||||
|
|
||||||
|
char tagName[TSDB_COL_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||||
|
STR_TO_VARSTR(tagName, smr.me.stbEntry.schemaTag.pSchema[i].name);
|
||||||
|
pColInfoData = taosArrayGet(p->pDataBlock, 3);
|
||||||
|
colDataAppend(pColInfoData, numOfRows, tagName, false);
|
||||||
|
|
||||||
|
int8_t tagType = smr.me.stbEntry.schemaTag.pSchema[i].type;
|
||||||
|
pColInfoData = taosArrayGet(p->pDataBlock, 4);
|
||||||
|
colDataAppend(pColInfoData, numOfRows, (char*)&tagType, false);
|
||||||
|
|
||||||
|
STagVal tagVal = {0};
|
||||||
|
tagVal.cid = smr.me.stbEntry.schemaTag.pSchema[i].colId;
|
||||||
|
char* tagData = NULL;
|
||||||
|
uint32_t tagLen = 0;
|
||||||
|
if (tagType == TSDB_DATA_TYPE_JSON) {
|
||||||
|
// TODO: json type?+varheader+data
|
||||||
|
tagData = varDataVal(pInfo->pCur->mr.me.ctbEntry.pTags + 1);
|
||||||
|
tagLen = varDataLen(pInfo->pCur->mr.me.ctbEntry.pTags + 1);
|
||||||
|
} else {
|
||||||
|
bool exist = tTagGet((STag*)pInfo->pCur->mr.me.ctbEntry.pTags, &tagVal);
|
||||||
|
if (exist) {
|
||||||
|
if (IS_VAR_DATA_TYPE(tagType)) {
|
||||||
|
tagData = (char*)tagVal.pData;
|
||||||
|
tagLen = tagVal.nData;
|
||||||
|
} else {
|
||||||
|
tagData = (char*)&tagVal.i64;
|
||||||
|
tagLen = tDataTypes[tagType].bytes;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t bufSize = IS_VAR_DATA_TYPE(tagType) ? (tagLen + VARSTR_HEADER_SIZE)
|
||||||
|
: (3 + DBL_MANT_DIG - DBL_MIN_EXP + VARSTR_HEADER_SIZE);
|
||||||
|
char* tagVarChar = NULL;
|
||||||
|
if (tagData != NULL) {
|
||||||
|
tagVarChar = taosMemoryMalloc(bufSize);
|
||||||
|
code = convertTagDataToTagVarchar(tagType, tagData, tagLen, tagVarChar, bufSize);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
qError("failed to get super table meta, uid:0x%" PRIx64 ", code:%s, %s", suid, tstrerror(terrno),
|
||||||
|
GET_TASKID(pTaskInfo));
|
||||||
|
taosMemoryFree(tagVarChar);
|
||||||
|
metaReaderClear(&smr);
|
||||||
|
metaCloseTbCursor(pInfo->pCur);
|
||||||
|
pInfo->pCur = NULL;
|
||||||
|
longjmp(pTaskInfo->env, terrno);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pColInfoData = taosArrayGet(p->pDataBlock, 5);
|
||||||
|
colDataAppend(pColInfoData, numOfRows, tagVarChar,
|
||||||
|
(tagData == NULL) || (tagType == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(tagData)));
|
||||||
|
taosMemoryFree(tagVarChar);
|
||||||
|
|
||||||
|
++numOfRows;
|
||||||
|
}
|
||||||
|
metaReaderClear(&smr);
|
||||||
|
|
||||||
|
if (numOfRows >= pOperator->resultInfo.capacity) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// todo temporarily free the cursor here, the true reason why the free is not valid needs to be found
|
||||||
|
if (ret != 0) {
|
||||||
|
metaCloseTbCursor(pInfo->pCur);
|
||||||
|
pInfo->pCur = NULL;
|
||||||
|
doSetOperatorCompleted(pOperator);
|
||||||
|
}
|
||||||
|
|
||||||
|
p->info.rows = numOfRows;
|
||||||
|
pInfo->pRes->info.rows = numOfRows;
|
||||||
|
|
||||||
|
relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock, false);
|
||||||
|
doFilterResult(pInfo);
|
||||||
|
|
||||||
|
blockDataDestroy(p);
|
||||||
|
|
||||||
|
pInfo->loadInfo.totalRows += pInfo->pRes->info.rows;
|
||||||
|
return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes;
|
||||||
|
}
|
||||||
|
|
||||||
|
static SSDataBlock* sysTableScanUserTables(SOperatorInfo* pOperator) {
|
||||||
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
SSysTableScanInfo* pInfo = pOperator->info;
|
||||||
if (pOperator->status == OP_EXEC_DONE) {
|
if (pOperator->status == OP_EXEC_DONE) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -1840,7 +2063,7 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
|
||||||
tNameGetDbName(&sn, varDataVal(dbname));
|
tNameGetDbName(&sn, varDataVal(dbname));
|
||||||
varDataSetLen(dbname, strlen(varDataVal(dbname)));
|
varDataSetLen(dbname, strlen(varDataVal(dbname)));
|
||||||
|
|
||||||
SSDataBlock* p = buildSysTableMetaBlock();
|
SSDataBlock* p = buildInfoSchemaTableMetaBlock(TSDB_INS_TABLE_USER_TABLES);
|
||||||
blockDataEnsureCapacity(p, pOperator->resultInfo.capacity);
|
blockDataEnsureCapacity(p, pOperator->resultInfo.capacity);
|
||||||
|
|
||||||
char n[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
char n[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||||
|
@ -1979,6 +2202,18 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
|
||||||
pInfo->loadInfo.totalRows += pInfo->pRes->info.rows;
|
pInfo->loadInfo.totalRows += pInfo->pRes->info.rows;
|
||||||
return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes;
|
return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
|
||||||
|
// build message and send to mnode to fetch the content of system tables.
|
||||||
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
SSysTableScanInfo* pInfo = pOperator->info;
|
||||||
|
|
||||||
|
const char* name = tNameGetTableName(&pInfo->name);
|
||||||
|
if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, TSDB_TABLE_FNAME_LEN) == 0) {
|
||||||
|
return sysTableScanUserTables(pOperator);
|
||||||
|
} else if (strncasecmp(name, TSDB_INS_TABLE_USER_TAGS, TSDB_TABLE_FNAME_LEN) == 0) {
|
||||||
|
return sysTableScanUserTags(pOperator);
|
||||||
} else { // load the meta from mnode of the given epset
|
} else { // load the meta from mnode of the given epset
|
||||||
if (pOperator->status == OP_EXEC_DONE) {
|
if (pOperator->status == OP_EXEC_DONE) {
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -2058,7 +2293,7 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capacity) {
|
int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capacity) {
|
||||||
SSDataBlock* p = buildSysTableMetaBlock();
|
SSDataBlock* p = buildInfoSchemaTableMetaBlock(TSDB_INS_TABLE_USER_TABLES);
|
||||||
blockDataEnsureCapacity(p, capacity);
|
blockDataEnsureCapacity(p, capacity);
|
||||||
|
|
||||||
size_t size = 0;
|
size_t size = 0;
|
||||||
|
@ -2147,7 +2382,8 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan
|
||||||
tNameAssign(&pInfo->name, &pScanNode->tableName);
|
tNameAssign(&pInfo->name, &pScanNode->tableName);
|
||||||
const char* name = tNameGetTableName(&pInfo->name);
|
const char* name = tNameGetTableName(&pInfo->name);
|
||||||
|
|
||||||
if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, TSDB_TABLE_FNAME_LEN) == 0) {
|
if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, TSDB_TABLE_FNAME_LEN) == 0 ||
|
||||||
|
strncasecmp(name, TSDB_INS_TABLE_USER_TAGS, TSDB_TABLE_FNAME_LEN) == 0) {
|
||||||
pInfo->readHandle = *(SReadHandle*)readHandle;
|
pInfo->readHandle = *(SReadHandle*)readHandle;
|
||||||
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
|
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
|
||||||
} else {
|
} else {
|
||||||
|
@ -2941,4 +3177,3 @@ _error:
|
||||||
taosMemoryFree(pOperator);
|
taosMemoryFree(pOperator);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -371,6 +371,19 @@ static int32_t collectMetaKeyFromShowTables(SCollectMetaKeyCxt* pCxt, SShowStmt*
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t collectMetaKeyFromShowTags(SCollectMetaKeyCxt* pCxt, SShowStmt* pStmt) {
|
||||||
|
int32_t code = reserveTableMetaInCache(pCxt->pParseCxt->acctId, TSDB_INFORMATION_SCHEMA_DB,
|
||||||
|
TSDB_INS_TABLE_USER_TAGS, pCxt->pMetaCache);
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
if (NULL != pStmt->pDbName) {
|
||||||
|
code = reserveDbVgInfoInCache(pCxt->pParseCxt->acctId, ((SValueNode*)pStmt->pDbName)->literal, pCxt->pMetaCache);
|
||||||
|
} else {
|
||||||
|
code = reserveDbVgInfoInCache(pCxt->pParseCxt->acctId, TSDB_INFORMATION_SCHEMA_DB, pCxt->pMetaCache);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t collectMetaKeyFromShowUsers(SCollectMetaKeyCxt* pCxt, SShowStmt* pStmt) {
|
static int32_t collectMetaKeyFromShowUsers(SCollectMetaKeyCxt* pCxt, SShowStmt* pStmt) {
|
||||||
return reserveTableMetaInCache(pCxt->pParseCxt->acctId, TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_USER_USERS,
|
return reserveTableMetaInCache(pCxt->pParseCxt->acctId, TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_USER_USERS,
|
||||||
pCxt->pMetaCache);
|
pCxt->pMetaCache);
|
||||||
|
@ -537,6 +550,8 @@ static int32_t collectMetaKeyFromQuery(SCollectMetaKeyCxt* pCxt, SNode* pStmt) {
|
||||||
return collectMetaKeyFromShowStreams(pCxt, (SShowStmt*)pStmt);
|
return collectMetaKeyFromShowStreams(pCxt, (SShowStmt*)pStmt);
|
||||||
case QUERY_NODE_SHOW_TABLES_STMT:
|
case QUERY_NODE_SHOW_TABLES_STMT:
|
||||||
return collectMetaKeyFromShowTables(pCxt, (SShowStmt*)pStmt);
|
return collectMetaKeyFromShowTables(pCxt, (SShowStmt*)pStmt);
|
||||||
|
case QUERY_NODE_SHOW_TAGS_STMT:
|
||||||
|
return collectMetaKeyFromShowTags(pCxt, (SShowStmt*)pStmt);
|
||||||
case QUERY_NODE_SHOW_USERS_STMT:
|
case QUERY_NODE_SHOW_USERS_STMT:
|
||||||
return collectMetaKeyFromShowUsers(pCxt, (SShowStmt*)pStmt);
|
return collectMetaKeyFromShowUsers(pCxt, (SShowStmt*)pStmt);
|
||||||
case QUERY_NODE_SHOW_LICENCE_STMT:
|
case QUERY_NODE_SHOW_LICENCE_STMT:
|
||||||
|
|
|
@ -1685,7 +1685,8 @@ static int32_t dnodeToVgroupsInfo(SArray* pDnodes, SVgroupsInfo** pVgsInfo) {
|
||||||
|
|
||||||
static bool sysTableFromVnode(const char* pTable) {
|
static bool sysTableFromVnode(const char* pTable) {
|
||||||
return (0 == strcmp(pTable, TSDB_INS_TABLE_USER_TABLES)) ||
|
return (0 == strcmp(pTable, TSDB_INS_TABLE_USER_TABLES)) ||
|
||||||
(0 == strcmp(pTable, TSDB_INS_TABLE_USER_TABLE_DISTRIBUTED));
|
(0 == strcmp(pTable, TSDB_INS_TABLE_USER_TABLE_DISTRIBUTED) ||
|
||||||
|
(0 == strcmp(pTable, TSDB_INS_TABLE_USER_TAGS)));
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool sysTableFromDnode(const char* pTable) { return 0 == strcmp(pTable, TSDB_INS_TABLE_DNODE_VARIABLES); }
|
static bool sysTableFromDnode(const char* pTable) { return 0 == strcmp(pTable, TSDB_INS_TABLE_DNODE_VARIABLES); }
|
||||||
|
@ -1701,7 +1702,7 @@ static int32_t setVnodeSysTableVgroupList(STranslateContext* pCxt, SName* pName,
|
||||||
code = getDBVgInfoImpl(pCxt, pName, &vgroupList);
|
code = getDBVgInfoImpl(pCxt, pName, &vgroupList);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code && 0 == strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_USER_TABLES)) {
|
||||||
code = addMnodeToVgroupList(&pCxt->pParseCxt->mgmtEpSet, &vgroupList);
|
code = addMnodeToVgroupList(&pCxt->pParseCxt->mgmtEpSet, &vgroupList);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1790,7 +1791,8 @@ static bool isSingleTable(SRealTableNode* pRealTable) {
|
||||||
int8_t tableType = pRealTable->pMeta->tableType;
|
int8_t tableType = pRealTable->pMeta->tableType;
|
||||||
if (TSDB_SYSTEM_TABLE == tableType) {
|
if (TSDB_SYSTEM_TABLE == tableType) {
|
||||||
return 0 != strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_USER_TABLES) &&
|
return 0 != strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_USER_TABLES) &&
|
||||||
0 != strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_USER_TABLE_DISTRIBUTED);
|
0 != strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_USER_TABLE_DISTRIBUTED) &&
|
||||||
|
0 != strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_USER_TAGS);
|
||||||
}
|
}
|
||||||
return (TSDB_CHILD_TABLE == tableType || TSDB_NORMAL_TABLE == tableType);
|
return (TSDB_CHILD_TABLE == tableType || TSDB_NORMAL_TABLE == tableType);
|
||||||
}
|
}
|
||||||
|
@ -5063,6 +5065,8 @@ static const char* getSysTableName(ENodeType type) {
|
||||||
return TSDB_INS_TABLE_USER_DATABASES;
|
return TSDB_INS_TABLE_USER_DATABASES;
|
||||||
case QUERY_NODE_SHOW_TABLES_STMT:
|
case QUERY_NODE_SHOW_TABLES_STMT:
|
||||||
return TSDB_INS_TABLE_USER_TABLES;
|
return TSDB_INS_TABLE_USER_TABLES;
|
||||||
|
case QUERY_NODE_SHOW_TAGS_STMT:
|
||||||
|
return TSDB_INS_TABLE_USER_TAGS;
|
||||||
case QUERY_NODE_SHOW_STABLES_STMT:
|
case QUERY_NODE_SHOW_STABLES_STMT:
|
||||||
return TSDB_INS_TABLE_USER_STABLES;
|
return TSDB_INS_TABLE_USER_STABLES;
|
||||||
case QUERY_NODE_SHOW_USERS_STMT:
|
case QUERY_NODE_SHOW_USERS_STMT:
|
||||||
|
|
|
@ -570,7 +570,8 @@ static int32_t createSystemTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan*
|
||||||
pScan->showRewrite = pScanLogicNode->showRewrite;
|
pScan->showRewrite = pScanLogicNode->showRewrite;
|
||||||
pScan->accountId = pCxt->pPlanCxt->acctId;
|
pScan->accountId = pCxt->pPlanCxt->acctId;
|
||||||
if (0 == strcmp(pScanLogicNode->tableName.tname, TSDB_INS_TABLE_USER_TABLES) ||
|
if (0 == strcmp(pScanLogicNode->tableName.tname, TSDB_INS_TABLE_USER_TABLES) ||
|
||||||
0 == strcmp(pScanLogicNode->tableName.tname, TSDB_INS_TABLE_USER_TABLE_DISTRIBUTED)) {
|
0 == strcmp(pScanLogicNode->tableName.tname, TSDB_INS_TABLE_USER_TABLE_DISTRIBUTED) ||
|
||||||
|
0 == strcmp(pScanLogicNode->tableName.tname, TSDB_INS_TABLE_USER_TAGS)) {
|
||||||
vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
|
vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
|
||||||
} else {
|
} else {
|
||||||
pSubplan->execNode.nodeId = MNODE_HANDLE;
|
pSubplan->execNode.nodeId = MNODE_HANDLE;
|
||||||
|
|
|
@ -99,7 +99,7 @@ if $rows != 1 then
|
||||||
endi
|
endi
|
||||||
#sql select * from information_schema.`streams`
|
#sql select * from information_schema.`streams`
|
||||||
sql select * from information_schema.user_tables
|
sql select * from information_schema.user_tables
|
||||||
if $rows != 30 then
|
if $rows != 31 then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
#sql select * from information_schema.user_table_distributed
|
#sql select * from information_schema.user_table_distributed
|
||||||
|
@ -197,7 +197,7 @@ if $rows != 1 then
|
||||||
endi
|
endi
|
||||||
#sql select * from performance_schema.`streams`
|
#sql select * from performance_schema.`streams`
|
||||||
sql select * from information_schema.user_tables
|
sql select * from information_schema.user_tables
|
||||||
if $rows != 30 then
|
if $rows != 31 then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
#sql select * from information_schema.user_table_distributed
|
#sql select * from information_schema.user_table_distributed
|
||||||
|
|
|
@ -104,7 +104,7 @@ if $rows != 1 then
|
||||||
endi
|
endi
|
||||||
|
|
||||||
sql select * from information_schema.user_tables
|
sql select * from information_schema.user_tables
|
||||||
if $rows != 30 then
|
if $rows != 31 then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue