fix: refactor table scan count
This commit is contained in:
parent
a2de81e908
commit
ee5d0a384f
|
@ -489,8 +489,8 @@ typedef struct STableCountScanSupp {
|
|||
|
||||
bool groupByDbName;
|
||||
bool groupByStbName;
|
||||
char dbName[TSDB_DB_NAME_LEN];
|
||||
char stbName[TSDB_TABLE_NAME_LEN];
|
||||
char dbNameFilter[TSDB_DB_NAME_LEN];
|
||||
char stbNameFilter[TSDB_TABLE_NAME_LEN];
|
||||
|
||||
} STableCountScanSupp;
|
||||
|
||||
|
@ -498,16 +498,10 @@ typedef struct STableCountScanOperatorInfo {
|
|||
SReadHandle readHandle;
|
||||
SSDataBlock* pRes;
|
||||
|
||||
SName tableName;
|
||||
|
||||
SNodeList* groupTags;
|
||||
SNodeList* scanCols;
|
||||
SNodeList* pseudoCols;
|
||||
|
||||
STableCountScanSupp supp;
|
||||
|
||||
int32_t currGrpIdx;
|
||||
SArray* stbUidList; // when group by db_name and stable_name
|
||||
SArray* stbUidList; // when group by db_name and/or stable_name
|
||||
} STableCountScanOperatorInfo;
|
||||
|
||||
typedef struct SOptrBasicInfo {
|
||||
|
|
|
@ -486,10 +486,11 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int
|
|||
code = metaGetTableEntryByUidCache(&mr, pBlock->info.id.uid);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
if (terrno == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
|
||||
qWarn("failed to get table meta, table may have been dropped, uid:0x%" PRIx64 ", code:%s, %s", pBlock->info.id.uid,
|
||||
tstrerror(terrno), idStr);
|
||||
qWarn("failed to get table meta, table may have been dropped, uid:0x%" PRIx64 ", code:%s, %s",
|
||||
pBlock->info.id.uid, tstrerror(terrno), idStr);
|
||||
} else {
|
||||
qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", pBlock->info.id.uid, tstrerror(terrno), idStr);
|
||||
qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", pBlock->info.id.uid, tstrerror(terrno),
|
||||
idStr);
|
||||
}
|
||||
metaReaderClear(&mr);
|
||||
return terrno;
|
||||
|
@ -1435,8 +1436,8 @@ static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock
|
|||
NULL);
|
||||
if (closedWin && pInfo->partitionSup.needCalc) {
|
||||
gpId = calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, pBlock, rowId);
|
||||
appendOneRowToStreamSpecialBlock(pInfo->pUpdateDataRes, tsCol + rowId, tsCol + rowId, &pBlock->info.id.uid, &gpId,
|
||||
NULL);
|
||||
appendOneRowToStreamSpecialBlock(pInfo->pUpdateDataRes, tsCol + rowId, tsCol + rowId, &pBlock->info.id.uid,
|
||||
&gpId, NULL);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -2461,7 +2462,8 @@ static void destroyTagScanOperatorInfo(void* param) {
|
|||
taosMemoryFreeClear(param);
|
||||
}
|
||||
|
||||
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo) {
|
||||
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode,
|
||||
SExecTaskInfo* pTaskInfo) {
|
||||
STagScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STagScanInfo));
|
||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||
if (pInfo == NULL || pOperator == NULL) {
|
||||
|
@ -2917,6 +2919,21 @@ _error:
|
|||
// TableCountScanOperator
|
||||
static SSDataBlock* doTableCountScan(SOperatorInfo* pOperator);
|
||||
static void destoryTableCountScanOperator(void* param);
|
||||
static void buildVnodeGroupedStbTableCount(STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp,
|
||||
SSDataBlock* pRes, char* dbName, tb_uid_t stbUid);
|
||||
static void buildVnodeGroupedNtbTableCount(STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp,
|
||||
SSDataBlock* pRes, char* dbName);
|
||||
static void buildVnodeFilteredTbCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
|
||||
STableCountScanSupp* pSupp, SSDataBlock* pRes, char* dbName);
|
||||
static void buildVnodeGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
|
||||
STableCountScanSupp* pSupp, SSDataBlock* pRes, int32_t vgId, char* dbName);
|
||||
static SSDataBlock* buildVnodeDbTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
|
||||
STableCountScanSupp* pSupp, SSDataBlock* pRes);
|
||||
static void buildSysDbGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
|
||||
STableCountScanSupp* pSupp, SSDataBlock* pRes, size_t infodbTableNum,
|
||||
size_t perfdbTableNum);
|
||||
static void buildSysDbFilterTableCount(SOperatorInfo* pOperator, STableCountScanSupp* pSupp, SSDataBlock* pRes,
|
||||
size_t infodbTableNum, size_t perfdbTableNum);
|
||||
static const char* GROUP_TAG_DB_NAME = "db_name";
|
||||
static const char* GROUP_TAG_STABLE_NAME = "stable_name";
|
||||
|
||||
|
@ -2978,8 +2995,8 @@ int32_t tblCountScanGetInputs(SNodeList* groupTags, SName* tableName, STableCoun
|
|||
}
|
||||
}
|
||||
} else {
|
||||
strncpy(supp->dbName, tNameGetDbNameP(tableName), TSDB_DB_NAME_LEN);
|
||||
strncpy(supp->stbName, tNameGetTableName(tableName), TSDB_TABLE_NAME_LEN);
|
||||
strncpy(supp->dbNameFilter, tNameGetDbNameP(tableName), TSDB_DB_NAME_LEN);
|
||||
strncpy(supp->stbNameFilter, tNameGetTableName(tableName), TSDB_TABLE_NAME_LEN);
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -3061,7 +3078,7 @@ void fillTableCountScanDataBlock(STableCountScanSupp* pSupp, char* dbName, char*
|
|||
if (pSupp->stbNameSlotId != -1) {
|
||||
SColumnInfoData* colInfoData = taosArrayGet(pRes->pDataBlock, pSupp->stbNameSlotId);
|
||||
if (strlen(stbName) != 0) {
|
||||
char varStbName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
char varStbName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
strncpy(varDataVal(varStbName), stbName, strlen(stbName));
|
||||
varDataSetLen(varStbName, strlen(stbName));
|
||||
colDataAppend(colInfoData, 0, varStbName, false);
|
||||
|
@ -3087,33 +3104,43 @@ static SSDataBlock* buildSysDbTableCount(SOperatorInfo* pOperator, STableCountSc
|
|||
getPerfDbMeta(NULL, &perfdbTableNum);
|
||||
|
||||
if (pSupp->groupByDbName) {
|
||||
if (pInfo->currGrpIdx == 0) {
|
||||
uint64_t groupId = calcGroupId(TSDB_INFORMATION_SCHEMA_DB, strlen(TSDB_INFORMATION_SCHEMA_DB));
|
||||
pRes->info.id.groupId = groupId;
|
||||
fillTableCountScanDataBlock(pSupp, TSDB_INFORMATION_SCHEMA_DB, "", infodbTableNum, pRes);
|
||||
} else if (pInfo->currGrpIdx == 1) {
|
||||
uint64_t groupId = calcGroupId(TSDB_PERFORMANCE_SCHEMA_DB, strlen(TSDB_PERFORMANCE_SCHEMA_DB));
|
||||
pRes->info.id.groupId = groupId;
|
||||
fillTableCountScanDataBlock(pSupp, TSDB_PERFORMANCE_SCHEMA_DB, "", perfdbTableNum, pRes);
|
||||
} else {
|
||||
setOperatorCompleted(pOperator);
|
||||
return NULL;
|
||||
}
|
||||
pInfo->currGrpIdx++;
|
||||
buildSysDbGroupedTableCount(pOperator, pInfo, pSupp, pRes, infodbTableNum, perfdbTableNum);
|
||||
return (pRes->info.rows > 0) ? pRes : NULL;
|
||||
} else {
|
||||
if (strcmp(pSupp->dbName, TSDB_INFORMATION_SCHEMA_DB) == 0) {
|
||||
fillTableCountScanDataBlock(pSupp, TSDB_INFORMATION_SCHEMA_DB, "", infodbTableNum, pRes);
|
||||
} else if (strcmp(pSupp->dbName, TSDB_PERFORMANCE_SCHEMA_DB) == 0) {
|
||||
fillTableCountScanDataBlock(pSupp, TSDB_PERFORMANCE_SCHEMA_DB, "", perfdbTableNum, pRes);
|
||||
} else if (strlen(pSupp->dbName) == 0) {
|
||||
fillTableCountScanDataBlock(pSupp, "", "", infodbTableNum + perfdbTableNum, pRes);
|
||||
}
|
||||
setOperatorCompleted(pOperator);
|
||||
buildSysDbFilterTableCount(pOperator, pSupp, pRes, infodbTableNum, perfdbTableNum);
|
||||
return (pRes->info.rows > 0) ? pRes : NULL;
|
||||
}
|
||||
}
|
||||
|
||||
static void buildSysDbFilterTableCount(SOperatorInfo* pOperator, STableCountScanSupp* pSupp, SSDataBlock* pRes,
|
||||
size_t infodbTableNum, size_t perfdbTableNum) {
|
||||
if (strcmp(pSupp->dbNameFilter, TSDB_INFORMATION_SCHEMA_DB) == 0) {
|
||||
fillTableCountScanDataBlock(pSupp, TSDB_INFORMATION_SCHEMA_DB, "", infodbTableNum, pRes);
|
||||
} else if (strcmp(pSupp->dbNameFilter, TSDB_PERFORMANCE_SCHEMA_DB) == 0) {
|
||||
fillTableCountScanDataBlock(pSupp, TSDB_PERFORMANCE_SCHEMA_DB, "", perfdbTableNum, pRes);
|
||||
} else if (strlen(pSupp->dbNameFilter) == 0) {
|
||||
fillTableCountScanDataBlock(pSupp, "", "", infodbTableNum + perfdbTableNum, pRes);
|
||||
}
|
||||
setOperatorCompleted(pOperator);
|
||||
}
|
||||
|
||||
static void buildSysDbGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
|
||||
STableCountScanSupp* pSupp, SSDataBlock* pRes, size_t infodbTableNum,
|
||||
size_t perfdbTableNum) {
|
||||
if (pInfo->currGrpIdx == 0) {
|
||||
uint64_t groupId = calcGroupId(TSDB_INFORMATION_SCHEMA_DB, strlen(TSDB_INFORMATION_SCHEMA_DB));
|
||||
pRes->info.id.groupId = groupId;
|
||||
fillTableCountScanDataBlock(pSupp, TSDB_INFORMATION_SCHEMA_DB, "", infodbTableNum, pRes);
|
||||
} else if (pInfo->currGrpIdx == 1) {
|
||||
uint64_t groupId = calcGroupId(TSDB_PERFORMANCE_SCHEMA_DB, strlen(TSDB_PERFORMANCE_SCHEMA_DB));
|
||||
pRes->info.id.groupId = groupId;
|
||||
fillTableCountScanDataBlock(pSupp, TSDB_PERFORMANCE_SCHEMA_DB, "", perfdbTableNum, pRes);
|
||||
} else {
|
||||
setOperatorCompleted(pOperator);
|
||||
}
|
||||
pInfo->currGrpIdx++;
|
||||
}
|
||||
|
||||
static SSDataBlock* doTableCountScan(SOperatorInfo* pOperator) {
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
STableCountScanOperatorInfo* pInfo = pOperator->info;
|
||||
|
@ -3128,89 +3155,110 @@ static SSDataBlock* doTableCountScan(SOperatorInfo* pOperator) {
|
|||
return buildSysDbTableCount(pOperator, pInfo);
|
||||
}
|
||||
|
||||
return buildVnodeDbTableCount(pOperator, pInfo, pSupp, pRes);
|
||||
}
|
||||
|
||||
static SSDataBlock* buildVnodeDbTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
|
||||
STableCountScanSupp* pSupp, SSDataBlock* pRes) {
|
||||
const char* db = NULL;
|
||||
int32_t vgId = 0;
|
||||
char dbName[TSDB_DB_NAME_LEN] = {0};
|
||||
|
||||
{
|
||||
// get dbname
|
||||
vnodeGetInfo(pInfo->readHandle.vnode, &db, &vgId);
|
||||
SName sn = {0};
|
||||
tNameFromString(&sn, db, T_NAME_ACCT | T_NAME_DB);
|
||||
tNameGetDbName(&sn, dbName);
|
||||
}
|
||||
// get dbname
|
||||
vnodeGetInfo(pInfo->readHandle.vnode, &db, &vgId);
|
||||
SName sn = {0};
|
||||
tNameFromString(&sn, db, T_NAME_ACCT | T_NAME_DB);
|
||||
tNameGetDbName(&sn, dbName);
|
||||
|
||||
if (pSupp->groupByDbName) {
|
||||
if (pSupp->groupByStbName) {
|
||||
if (pInfo->stbUidList == NULL) {
|
||||
pInfo->stbUidList = taosArrayInit(16, sizeof(tb_uid_t));
|
||||
if (vnodeGetStbIdList(pInfo->readHandle.vnode, 0, pInfo->stbUidList) < 0) {
|
||||
qError("vgId:%d, failed to get stb id list error: %s", vgId, terrstr());
|
||||
}
|
||||
buildVnodeGroupedTableCount(pOperator, pInfo, pSupp, pRes, vgId, dbName);
|
||||
} else {
|
||||
buildVnodeFilteredTbCount(pOperator, pInfo, pSupp, pRes, dbName);
|
||||
}
|
||||
return pRes->info.rows > 0 ? pRes : NULL;
|
||||
}
|
||||
|
||||
static void buildVnodeGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
|
||||
STableCountScanSupp* pSupp, SSDataBlock* pRes, int32_t vgId, char* dbName) {
|
||||
if (pSupp->groupByStbName) {
|
||||
if (pInfo->stbUidList == NULL) {
|
||||
pInfo->stbUidList = taosArrayInit(16, sizeof(tb_uid_t));
|
||||
if (vnodeGetStbIdList(pInfo->readHandle.vnode, 0, pInfo->stbUidList) < 0) {
|
||||
qError("vgId:%d, failed to get stb id list error: %s", vgId, terrstr());
|
||||
}
|
||||
if (pInfo->currGrpIdx < taosArrayGetSize(pInfo->stbUidList)) {
|
||||
tb_uid_t stbUid = *(tb_uid_t*)taosArrayGet(pInfo->stbUidList, pInfo->currGrpIdx);
|
||||
}
|
||||
if (pInfo->currGrpIdx < taosArrayGetSize(pInfo->stbUidList)) {
|
||||
tb_uid_t stbUid = *(tb_uid_t*)taosArrayGet(pInfo->stbUidList, pInfo->currGrpIdx);
|
||||
buildVnodeGroupedStbTableCount(pInfo, pSupp, pRes, dbName, stbUid);
|
||||
|
||||
char stbName[TSDB_TABLE_NAME_LEN] = {0};
|
||||
metaGetTableSzNameByUid(pInfo->readHandle.meta, stbUid, stbName);
|
||||
pInfo->currGrpIdx++;
|
||||
} else if (pInfo->currGrpIdx == taosArrayGetSize(pInfo->stbUidList)) {
|
||||
buildVnodeGroupedNtbTableCount(pInfo, pSupp, pRes, dbName);
|
||||
|
||||
char fullStbName[TSDB_TABLE_FNAME_LEN] = {0};
|
||||
snprintf(fullStbName, TSDB_TABLE_FNAME_LEN, "%s.%s", dbName, stbName);
|
||||
uint64_t groupId = calcGroupId(fullStbName, strlen(fullStbName));
|
||||
pRes->info.id.groupId = groupId;
|
||||
|
||||
SMetaStbStats stats = {0};
|
||||
metaGetStbStats(pInfo->readHandle.meta, stbUid, &stats);
|
||||
int64_t ctbNum = stats.ctbNum;
|
||||
|
||||
fillTableCountScanDataBlock(pSupp, dbName, stbName, ctbNum, pRes);
|
||||
|
||||
pInfo->currGrpIdx++;
|
||||
} else if (pInfo->currGrpIdx == taosArrayGetSize(pInfo->stbUidList)) {
|
||||
char fullStbName[TSDB_TABLE_FNAME_LEN] = {0};
|
||||
snprintf(fullStbName, TSDB_TABLE_FNAME_LEN, "%s.%s", dbName, "");
|
||||
uint64_t groupId = calcGroupId(fullStbName, strlen(fullStbName));
|
||||
pRes->info.id.groupId = groupId;
|
||||
int64_t ntbNum = metaGetNtbNum(pInfo->readHandle.meta);
|
||||
fillTableCountScanDataBlock(pSupp, dbName, "", ntbNum, pRes);
|
||||
|
||||
pInfo->currGrpIdx++;
|
||||
} else {
|
||||
setOperatorCompleted(pOperator);
|
||||
return NULL;
|
||||
}
|
||||
pInfo->currGrpIdx++;
|
||||
} else {
|
||||
uint64_t groupId = calcGroupId(dbName, strlen(dbName));
|
||||
pRes->info.id.groupId = groupId;
|
||||
int64_t dbTableCount = metaGetTbNum(pInfo->readHandle.meta);
|
||||
fillTableCountScanDataBlock(pSupp, dbName, "", dbTableCount, pRes);
|
||||
setOperatorCompleted(pOperator);
|
||||
}
|
||||
} else {
|
||||
if (strlen(pSupp->dbName) != 0) {
|
||||
if (strlen(pSupp->stbName) != 0) {
|
||||
tb_uid_t uid = metaGetTableEntryUidByName(pInfo->readHandle.meta, pSupp->stbName);
|
||||
SMetaStbStats stats = {0};
|
||||
metaGetStbStats(pInfo->readHandle.meta, uid, &stats);
|
||||
int64_t ctbNum = stats.ctbNum;
|
||||
fillTableCountScanDataBlock(pSupp, dbName, pSupp->stbName, ctbNum, pRes);
|
||||
} else {
|
||||
int64_t tbNumVnode = metaGetTbNum(pInfo->readHandle.meta);
|
||||
fillTableCountScanDataBlock(pSupp, dbName, "", tbNumVnode, pRes);
|
||||
}
|
||||
uint64_t groupId = calcGroupId(dbName, strlen(dbName));
|
||||
pRes->info.id.groupId = groupId;
|
||||
int64_t dbTableCount = metaGetTbNum(pInfo->readHandle.meta);
|
||||
fillTableCountScanDataBlock(pSupp, dbName, "", dbTableCount, pRes);
|
||||
setOperatorCompleted(pOperator);
|
||||
}
|
||||
}
|
||||
|
||||
static void buildVnodeFilteredTbCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
|
||||
STableCountScanSupp* pSupp, SSDataBlock* pRes, char* dbName) {
|
||||
if (strlen(pSupp->dbNameFilter) != 0) {
|
||||
if (strlen(pSupp->stbNameFilter) != 0) {
|
||||
tb_uid_t uid = metaGetTableEntryUidByName(pInfo->readHandle.meta, pSupp->stbNameFilter);
|
||||
SMetaStbStats stats = {0};
|
||||
metaGetStbStats(pInfo->readHandle.meta, uid, &stats);
|
||||
int64_t ctbNum = stats.ctbNum;
|
||||
fillTableCountScanDataBlock(pSupp, dbName, pSupp->stbNameFilter, ctbNum, pRes);
|
||||
} else {
|
||||
int64_t tbNumVnode = metaGetTbNum(pInfo->readHandle.meta);
|
||||
fillTableCountScanDataBlock(pSupp, dbName, "", tbNumVnode, pRes);
|
||||
}
|
||||
setOperatorCompleted(pOperator);
|
||||
} else {
|
||||
int64_t tbNumVnode = metaGetTbNum(pInfo->readHandle.meta);
|
||||
fillTableCountScanDataBlock(pSupp, dbName, "", tbNumVnode, pRes);
|
||||
}
|
||||
return pRes->info.rows > 0 ? pRes : NULL;
|
||||
setOperatorCompleted(pOperator);
|
||||
}
|
||||
|
||||
static void buildVnodeGroupedNtbTableCount(STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp,
|
||||
SSDataBlock* pRes, char* dbName) {
|
||||
char fullStbName[TSDB_TABLE_FNAME_LEN] = {0};
|
||||
snprintf(fullStbName, TSDB_TABLE_FNAME_LEN, "%s.%s", dbName, "");
|
||||
uint64_t groupId = calcGroupId(fullStbName, strlen(fullStbName));
|
||||
pRes->info.id.groupId = groupId;
|
||||
int64_t ntbNum = metaGetNtbNum(pInfo->readHandle.meta);
|
||||
fillTableCountScanDataBlock(pSupp, dbName, "", ntbNum, pRes);
|
||||
}
|
||||
|
||||
static void buildVnodeGroupedStbTableCount(STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp,
|
||||
SSDataBlock* pRes, char* dbName, tb_uid_t stbUid) {
|
||||
char stbName[TSDB_TABLE_NAME_LEN] = {0};
|
||||
metaGetTableSzNameByUid(pInfo->readHandle.meta, stbUid, stbName);
|
||||
|
||||
char fullStbName[TSDB_TABLE_FNAME_LEN] = {0};
|
||||
snprintf(fullStbName, TSDB_TABLE_FNAME_LEN, "%s.%s", dbName, stbName);
|
||||
uint64_t groupId = calcGroupId(fullStbName, strlen(fullStbName));
|
||||
pRes->info.id.groupId = groupId;
|
||||
|
||||
SMetaStbStats stats = {0};
|
||||
metaGetStbStats(pInfo->readHandle.meta, stbUid, &stats);
|
||||
int64_t ctbNum = stats.ctbNum;
|
||||
|
||||
fillTableCountScanDataBlock(pSupp, dbName, stbName, ctbNum, pRes);
|
||||
}
|
||||
|
||||
static void destoryTableCountScanOperator(void* param) {
|
||||
STableCountScanOperatorInfo* pTableCountScanInfo = param;
|
||||
blockDataDestroy(pTableCountScanInfo->pRes);
|
||||
|
||||
nodesDestroyList(pTableCountScanInfo->groupTags);
|
||||
taosArrayDestroy(pTableCountScanInfo->stbUidList);
|
||||
taosMemoryFreeClear(param);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue