fix: get systable cols error

This commit is contained in:
wangjiaming0909 2023-08-17 16:41:42 +08:00
parent df6c93fdb0
commit 4dcdb549c9
2 changed files with 132 additions and 102 deletions

View File

@ -3159,137 +3159,161 @@ static int32_t buildDbColsInfoBlock(const SSDataBlock *p, const SSysTableMeta *p
return numOfRows; return numOfRows;
} }
#define BUILD_COL_FOR_INFO_DB 1
#define BUILD_COL_FOR_PERF_DB 1 << 1
#define BUILD_COL_FOR_USER_DB 1 << 2
#define BUILD_COL_FOR_ALL_DB (BUILD_COL_FOR_INFO_DB | BUILD_COL_FOR_PERF_DB | BUILD_COL_FOR_USER_DB)
static int32_t buildSysDbColsInfo(SSDataBlock *p, char *db, char *tb) { static int32_t buildSysDbColsInfo(SSDataBlock *p, int8_t buildWhichDBs, char *tb) {
size_t size = 0; size_t size = 0;
const SSysTableMeta *pSysDbTableMeta = NULL; const SSysTableMeta *pSysDbTableMeta = NULL;
if (db[0] && strncmp(db, TSDB_INFORMATION_SCHEMA_DB, TSDB_DB_FNAME_LEN) != 0 && if (buildWhichDBs & BUILD_COL_FOR_INFO_DB) {
strncmp(db, TSDB_PERFORMANCE_SCHEMA_DB, TSDB_DB_FNAME_LEN) != 0) { getInfosDbMeta(&pSysDbTableMeta, &size);
return p->info.rows; p->info.rows = buildDbColsInfoBlock(p, pSysDbTableMeta, size, TSDB_INFORMATION_SCHEMA_DB, tb);
} }
getInfosDbMeta(&pSysDbTableMeta, &size); if (buildWhichDBs & BUILD_COL_FOR_PERF_DB) {
p->info.rows = buildDbColsInfoBlock(p, pSysDbTableMeta, size, TSDB_INFORMATION_SCHEMA_DB, tb); getPerfDbMeta(&pSysDbTableMeta, &size);
p->info.rows = buildDbColsInfoBlock(p, pSysDbTableMeta, size, TSDB_PERFORMANCE_SCHEMA_DB, tb);
getPerfDbMeta(&pSysDbTableMeta, &size); }
p->info.rows = buildDbColsInfoBlock(p, pSysDbTableMeta, size, TSDB_PERFORMANCE_SCHEMA_DB, tb);
return p->info.rows; return p->info.rows;
} }
static int8_t determineBuildColForWhichDBs(const char* db) {
int8_t buildWhichDBs;
if (!db[0])
buildWhichDBs = BUILD_COL_FOR_ALL_DB;
else {
char *p = strchr(db, '.');
if (p && strcmp(p + 1, TSDB_INFORMATION_SCHEMA_DB) == 0) {
buildWhichDBs = BUILD_COL_FOR_INFO_DB;
} else if (p && strcmp(p + 1, TSDB_PERFORMANCE_SCHEMA_DB) == 0) {
buildWhichDBs = BUILD_COL_FOR_PERF_DB;
} else {
buildWhichDBs = BUILD_COL_FOR_USER_DB;
}
}
return buildWhichDBs;
}
static int32_t mndRetrieveStbCol(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) { static int32_t mndRetrieveStbCol(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
uint8_t buildWhichDBs;
SMnode *pMnode = pReq->info.node; SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
SStbObj *pStb = NULL; SStbObj *pStb = NULL;
int32_t numOfRows = 0; int32_t numOfRows = 0;
buildWhichDBs = determineBuildColForWhichDBs(pShow->db);
if (!pShow->sysDbRsp) { if (!pShow->sysDbRsp) {
numOfRows = buildSysDbColsInfo(pBlock, pShow->db, pShow->filterTb); numOfRows = buildSysDbColsInfo(pBlock, buildWhichDBs, pShow->filterTb);
mDebug("mndRetrieveStbCol get system table cols, rows:%d, db:%s", numOfRows, pShow->db); mDebug("mndRetrieveStbCol get system table cols, rows:%d, db:%s", numOfRows, pShow->db);
pShow->sysDbRsp = true; pShow->sysDbRsp = true;
} }
SDbObj *pDb = NULL; if (buildWhichDBs & BUILD_COL_FOR_USER_DB) {
if (strlen(pShow->db) > 0) { SDbObj *pDb = NULL;
pDb = mndAcquireDb(pMnode, pShow->db); if (strlen(pShow->db) > 0) {
if (pDb == NULL) return terrno; pDb = mndAcquireDb(pMnode, pShow->db);
} if (pDb == NULL && TSDB_CODE_MND_DB_NOT_EXIST != terrno && pBlock->info.rows == 0) return terrno;
char typeName[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(typeName, "SUPER_TABLE");
bool fetch = pShow->restore ? false : true;
pShow->restore = false;
while (numOfRows < rows) {
if (fetch) {
pShow->pIter = sdbFetch(pSdb, SDB_STB, pShow->pIter, (void **)&pStb);
if (pShow->pIter == NULL) break;
} else {
fetch = true;
void *pKey = taosHashGetKey(pShow->pIter, NULL);
pStb = sdbAcquire(pSdb, SDB_STB, pKey);
if (!pStb) continue;
} }
if (pDb != NULL && pStb->dbUid != pDb->uid) { char typeName[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
sdbRelease(pSdb, pStb); STR_TO_VARSTR(typeName, "SUPER_TABLE");
continue; bool fetch = pShow->restore ? false : true;
} pShow->restore = false;
while (numOfRows < rows) {
SName name = {0}; if (fetch) {
char stbName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; pShow->pIter = sdbFetch(pSdb, SDB_STB, pShow->pIter, (void **)&pStb);
mndExtractTbNameFromStbFullName(pStb->name, &stbName[VARSTR_HEADER_SIZE], TSDB_TABLE_NAME_LEN); if (pShow->pIter == NULL) break;
if (pShow->filterTb[0] && strncmp(pShow->filterTb, &stbName[VARSTR_HEADER_SIZE], TSDB_TABLE_NAME_LEN) != 0) { } else {
sdbRelease(pSdb, pStb); fetch = true;
continue; void *pKey = taosHashGetKey(pShow->pIter, NULL);
} pStb = sdbAcquire(pSdb, SDB_STB, pKey);
if (!pStb) continue;
if ((numOfRows + pStb->numOfColumns) > rows) {
pShow->restore = true;
if (numOfRows == 0) {
mError("mndRetrieveStbCol failed to get stable cols since buf:%d less than result:%d, stable name:%s, db:%s",
rows, pStb->numOfColumns, pStb->name, pStb->db);
} }
sdbRelease(pSdb, pStb);
break;
}
varDataSetLen(stbName, strlen(&stbName[VARSTR_HEADER_SIZE])); if (pDb != NULL && pStb->dbUid != pDb->uid) {
sdbRelease(pSdb, pStb);
mDebug("mndRetrieveStbCol get stable cols, stable name:%s, db:%s", pStb->name, pStb->db); continue;
char db[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
tNameFromString(&name, pStb->db, T_NAME_ACCT | T_NAME_DB);
tNameGetDbName(&name, varDataVal(db));
varDataSetLen(db, strlen(varDataVal(db)));
for (int i = 0; i < pStb->numOfColumns; i++) {
int32_t cols = 0;
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)stbName, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)db, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, typeName, false);
// col name
char colName[TSDB_COL_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(colName, pStb->pColumns[i].name);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, colName, false);
// col type
int8_t colType = pStb->pColumns[i].type;
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
char colTypeStr[VARSTR_HEADER_SIZE + 32];
int colTypeLen = sprintf(varDataVal(colTypeStr), "%s", tDataTypes[colType].name);
if (colType == TSDB_DATA_TYPE_VARCHAR) {
colTypeLen += sprintf(varDataVal(colTypeStr) + colTypeLen, "(%d)",
(int32_t)(pStb->pColumns[i].bytes - VARSTR_HEADER_SIZE));
} else if (colType == TSDB_DATA_TYPE_NCHAR) {
colTypeLen += sprintf(varDataVal(colTypeStr) + colTypeLen, "(%d)",
(int32_t)((pStb->pColumns[i].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE));
} }
varDataSetLen(colTypeStr, colTypeLen);
colDataSetVal(pColInfo, numOfRows, (char *)colTypeStr, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); SName name = {0};
colDataSetVal(pColInfo, numOfRows, (const char *)&pStb->pColumns[i].bytes, false); char stbName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
while (cols < pShow->numOfColumns) { mndExtractTbNameFromStbFullName(pStb->name, &stbName[VARSTR_HEADER_SIZE], TSDB_TABLE_NAME_LEN);
if (pShow->filterTb[0] && strncmp(pShow->filterTb, &stbName[VARSTR_HEADER_SIZE], TSDB_TABLE_NAME_LEN) != 0) {
sdbRelease(pSdb, pStb);
continue;
}
if ((numOfRows + pStb->numOfColumns) > rows) {
pShow->restore = true;
if (numOfRows == 0) {
mError("mndRetrieveStbCol failed to get stable cols since buf:%d less than result:%d, stable name:%s, db:%s",
rows, pStb->numOfColumns, pStb->name, pStb->db);
}
sdbRelease(pSdb, pStb);
break;
}
varDataSetLen(stbName, strlen(&stbName[VARSTR_HEADER_SIZE]));
mDebug("mndRetrieveStbCol get stable cols, stable name:%s, db:%s", pStb->name, pStb->db);
char db[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
tNameFromString(&name, pStb->db, T_NAME_ACCT | T_NAME_DB);
tNameGetDbName(&name, varDataVal(db));
varDataSetLen(db, strlen(varDataVal(db)));
for (int i = 0; i < pStb->numOfColumns; i++) {
int32_t cols = 0;
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)stbName, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetNULL(pColInfo, numOfRows); colDataSetVal(pColInfo, numOfRows, (const char *)db, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, typeName, false);
// col name
char colName[TSDB_COL_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(colName, pStb->pColumns[i].name);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, colName, false);
// col type
int8_t colType = pStb->pColumns[i].type;
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
char colTypeStr[VARSTR_HEADER_SIZE + 32];
int colTypeLen = sprintf(varDataVal(colTypeStr), "%s", tDataTypes[colType].name);
if (colType == TSDB_DATA_TYPE_VARCHAR) {
colTypeLen += sprintf(varDataVal(colTypeStr) + colTypeLen, "(%d)",
(int32_t)(pStb->pColumns[i].bytes - VARSTR_HEADER_SIZE));
} else if (colType == TSDB_DATA_TYPE_NCHAR) {
colTypeLen += sprintf(varDataVal(colTypeStr) + colTypeLen, "(%d)",
(int32_t)((pStb->pColumns[i].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE));
}
varDataSetLen(colTypeStr, colTypeLen);
colDataSetVal(pColInfo, numOfRows, (char *)colTypeStr, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)&pStb->pColumns[i].bytes, false);
while (cols < pShow->numOfColumns) {
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetNULL(pColInfo, numOfRows);
}
numOfRows++;
} }
numOfRows++;
sdbRelease(pSdb, pStb);
} }
sdbRelease(pSdb, pStb); if (pDb != NULL) {
} mndReleaseDb(pMnode, pDb);
}
if (pDb != NULL) {
mndReleaseDb(pMnode, pDb);
} }
pShow->numOfRows += numOfRows; pShow->numOfRows += numOfRows;

View File

@ -215,7 +215,13 @@ class TDTestCase:
for t in range (2): for t in range (2):
tdSql.query(f'select * from information_schema.ins_columns where db_name="db2" and table_type=="NORMAL_TABLE"') tdSql.query(f'select * from information_schema.ins_columns where db_name="db2" and table_type=="NORMAL_TABLE"')
tdSql.checkEqual(20470,len(tdSql.queryResult)) tdSql.checkEqual(20470,len(tdSql.queryResult))
tdSql.query("select * from information_schema.ins_columns where db_name ='information_schema'")
tdSql.checkEqual(195, len(tdSql.queryResult))
tdSql.query("select * from information_schema.ins_columns where db_name ='performance_schema'")
tdSql.checkEqual(54, len(tdSql.queryResult))
def ins_dnodes_check(self): def ins_dnodes_check(self):
tdSql.execute('drop database if exists db2') tdSql.execute('drop database if exists db2')
tdSql.execute('create database if not exists db2 vgroups 1 replica 1') tdSql.execute('create database if not exists db2 vgroups 1 replica 1')