fix(query): not lock when reading meta, and do some internal refactor.

This commit is contained in:
Haojun Liao 2024-03-21 15:45:28 +08:00
parent e18f056c9f
commit 6b2821cd28
1 changed files with 100 additions and 83 deletions

View File

@ -433,93 +433,109 @@ static bool sysTableIsCondOnOneTable(SNode* pCond, char* condTable) {
return false; return false;
} }
static SSDataBlock* sysTableScanUserCols(SOperatorInfo* pOperator) { static SSDataBlock* doOptimizeTableNameFilter(SOperatorInfo* pOperator, SSDataBlock* dataBlock, char* dbname) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStorageAPI* pAPI = &pTaskInfo->storageAPI; SStorageAPI* pAPI = &pTaskInfo->storageAPI;
SSysTableScanInfo* pInfo = pOperator->info; SSysTableScanInfo* pInfo = pOperator->info;
if (pOperator->status == OP_EXEC_DONE) { int32_t numOfRows = 0;
char tableName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(tableName, pInfo->req.filterTb);
SMetaReader smrTable = {0};
pAPI->metaReaderFn.initReader(&smrTable, pInfo->readHandle.vnode, META_READER_LOCK, &pAPI->metaFn);
int32_t code = pAPI->metaReaderFn.getTableEntryByName(&smrTable, pInfo->req.filterTb);
if (code != TSDB_CODE_SUCCESS) {
// terrno has been set by pAPI->metaReaderFn.getTableEntryByName, therefore, return directly
pAPI->metaReaderFn.clearReader(&smrTable);
pInfo->loadInfo.totalRows = 0;
return NULL; return NULL;
} }
blockDataCleanup(pInfo->pRes); if (smrTable.me.type == TSDB_SUPER_TABLE) {
int32_t numOfRows = 0; pAPI->metaReaderFn.clearReader(&smrTable);
pInfo->loadInfo.totalRows = 0;
return NULL;
}
SSDataBlock* dataBlock = buildInfoSchemaTableMetaBlock(TSDB_INS_TABLE_COLS); if (smrTable.me.type == TSDB_CHILD_TABLE) {
blockDataEnsureCapacity(dataBlock, pOperator->resultInfo.capacity); int64_t suid = smrTable.me.ctbEntry.suid;
pAPI->metaReaderFn.clearReader(&smrTable);
pAPI->metaReaderFn.initReader(&smrTable, pInfo->readHandle.vnode, META_READER_LOCK, &pAPI->metaFn);
code = pAPI->metaReaderFn.getTableEntryByUid(&smrTable, suid);
if (code != TSDB_CODE_SUCCESS) {
// terrno has been set by pAPI->metaReaderFn.getTableEntryByName, therefore, return directly
pAPI->metaReaderFn.clearReader(&smrTable);
pInfo->loadInfo.totalRows = 0;
return NULL;
}
}
char typeName[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
SSchemaWrapper* schemaRow = NULL;
if (smrTable.me.type == TSDB_SUPER_TABLE) {
schemaRow = &smrTable.me.stbEntry.schemaRow;
STR_TO_VARSTR(typeName, "CHILD_TABLE");
} else if (smrTable.me.type == TSDB_NORMAL_TABLE) {
schemaRow = &smrTable.me.ntbEntry.schemaRow;
STR_TO_VARSTR(typeName, "NORMAL_TABLE");
}
sysTableUserColsFillOneTableCols(pInfo, dbname, &numOfRows, dataBlock, tableName, schemaRow, typeName);
pAPI->metaReaderFn.clearReader(&smrTable);
if (numOfRows > 0) {
relocateAndFilterSysTagsScanResult(pInfo, numOfRows, dataBlock, pOperator->exprSupp.pFilterInfo);
numOfRows = 0;
}
pInfo->loadInfo.totalRows += pInfo->pRes->info.rows;
setOperatorCompleted(pOperator);
qDebug("get cols success, total rows:%" PRIu64 ", current:%d %s", pInfo->loadInfo.totalRows, pInfo->pRes->info.rows,
GET_TASKID(pTaskInfo));
return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes;
}
int32_t doExtractDbName(char* dbname, SSysTableScanInfo* pInfo, SStorageAPI* pAPI) {
SName sn = {0};
const char* db = NULL; const char* db = NULL;
int32_t vgId = 0; int32_t vgId = 0;
pAPI->metaFn.getBasicInfo(pInfo->readHandle.vnode, &db, &vgId, NULL, NULL); pAPI->metaFn.getBasicInfo(pInfo->readHandle.vnode, &db, &vgId, NULL, NULL);
SName sn = {0};
char dbname[TSDB_DB_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
tNameFromString(&sn, db, T_NAME_ACCT | T_NAME_DB); tNameFromString(&sn, db, T_NAME_ACCT | T_NAME_DB);
tNameGetDbName(&sn, varDataVal(dbname)); tNameGetDbName(&sn, varDataVal(dbname));
varDataSetLen(dbname, strlen(varDataVal(dbname))); varDataSetLen(dbname, strlen(varDataVal(dbname)));
// optimize when sql like where table_name='tablename' and xxx. return TSDB_CODE_SUCCESS;
if (pInfo->req.filterTb[0]) { }
char tableName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(tableName, pInfo->req.filterTb);
SMetaReader smrTable = {0}; static SSDataBlock* sysTableScanUserCols(SOperatorInfo* pOperator) {
pAPI->metaReaderFn.initReader(&smrTable, pInfo->readHandle.vnode, META_READER_LOCK, &pAPI->metaFn); SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
int32_t code = pAPI->metaReaderFn.getTableEntryByName(&smrTable, pInfo->req.filterTb); SStorageAPI* pAPI = &pTaskInfo->storageAPI;
if (code != TSDB_CODE_SUCCESS) { SSysTableScanInfo* pInfo = pOperator->info;
// terrno has been set by pAPI->metaReaderFn.getTableEntryByName, therefore, return directly int32_t numOfRows = 0;
pAPI->metaReaderFn.clearReader(&smrTable); int32_t ret = 0;
blockDataDestroy(dataBlock); char dbname[TSDB_DB_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
pInfo->loadInfo.totalRows = 0; SSDataBlock* pDataBlock = NULL;
return NULL;
}
if (smrTable.me.type == TSDB_SUPER_TABLE) { if (pOperator->status == OP_EXEC_DONE) {
pAPI->metaReaderFn.clearReader(&smrTable); return NULL;
blockDataDestroy(dataBlock); }
pInfo->loadInfo.totalRows = 0;
return NULL; blockDataCleanup(pInfo->pRes);
}
pDataBlock = buildInfoSchemaTableMetaBlock(TSDB_INS_TABLE_COLS);
if (smrTable.me.type == TSDB_CHILD_TABLE) { blockDataEnsureCapacity(pDataBlock, pOperator->resultInfo.capacity);
int64_t suid = smrTable.me.ctbEntry.suid; doExtractDbName(dbname, pInfo, pAPI);
pAPI->metaReaderFn.clearReader(&smrTable);
pAPI->metaReaderFn.initReader(&smrTable, pInfo->readHandle.vnode, META_READER_LOCK, &pAPI->metaFn); // optimize when sql like where table_name='tablename' and xxx.
code = pAPI->metaReaderFn.getTableEntryByUid(&smrTable, suid); if (pInfo->req.filterTb[0]) {
if (code != TSDB_CODE_SUCCESS) { SSDataBlock* p = doOptimizeTableNameFilter(pOperator, pDataBlock, dbname);
// terrno has been set by pAPI->metaReaderFn.getTableEntryByName, therefore, return directly blockDataDestroy(pDataBlock);
pAPI->metaReaderFn.clearReader(&smrTable); return p;
blockDataDestroy(dataBlock);
pInfo->loadInfo.totalRows = 0;
return NULL;
}
}
char typeName[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
SSchemaWrapper* schemaRow = NULL;
if (smrTable.me.type == TSDB_SUPER_TABLE) {
schemaRow = &smrTable.me.stbEntry.schemaRow;
STR_TO_VARSTR(typeName, "CHILD_TABLE");
} else if (smrTable.me.type == TSDB_NORMAL_TABLE) {
schemaRow = &smrTable.me.ntbEntry.schemaRow;
STR_TO_VARSTR(typeName, "NORMAL_TABLE");
}
sysTableUserColsFillOneTableCols(pInfo, dbname, &numOfRows, dataBlock, tableName, schemaRow, typeName);
pAPI->metaReaderFn.clearReader(&smrTable);
if (numOfRows > 0) {
relocateAndFilterSysTagsScanResult(pInfo, numOfRows, dataBlock, pOperator->exprSupp.pFilterInfo);
numOfRows = 0;
}
blockDataDestroy(dataBlock);
pInfo->loadInfo.totalRows += pInfo->pRes->info.rows;
setOperatorCompleted(pOperator);
return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes;
} }
int32_t ret = 0;
if (pInfo->pCur == NULL) { if (pInfo->pCur == NULL) {
pInfo->pCur = pAPI->metaFn.openTableMetaCursor(pInfo->readHandle.vnode); pInfo->pCur = pAPI->metaFn.openTableMetaCursor(pInfo->readHandle.vnode);
} else { } else {
@ -534,7 +550,7 @@ static SSDataBlock* sysTableScanUserCols(SOperatorInfo* pOperator) {
if (!pInfo->pCur || !pInfo->pSchema) { if (!pInfo->pCur || !pInfo->pSchema) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
qError("sysTableScanUserCols failed since %s", terrstr(terrno)); qError("sysTableScanUserCols failed since %s", terrstr(terrno));
blockDataDestroy(dataBlock); blockDataDestroy(pDataBlock);
pInfo->loadInfo.totalRows = 0; pInfo->loadInfo.totalRows = 0;
return NULL; return NULL;
} }
@ -553,7 +569,7 @@ static SSDataBlock* sysTableScanUserCols(SOperatorInfo* pOperator) {
SSchemaWrapper* schemaRow = NULL; SSchemaWrapper* schemaRow = NULL;
if (pInfo->pCur->mr.me.type == TSDB_SUPER_TABLE) { if (pInfo->pCur->mr.me.type == TSDB_SUPER_TABLE) {
qDebug("sysTableScanUserCols cursor get super table"); qDebug("sysTableScanUserCols cursor get super table, %s", GET_TASKID(pTaskInfo));
void* schema = taosHashGet(pInfo->pSchema, &pInfo->pCur->mr.me.uid, sizeof(int64_t)); void* schema = taosHashGet(pInfo->pSchema, &pInfo->pCur->mr.me.uid, sizeof(int64_t));
if (schema == NULL) { if (schema == NULL) {
SSchemaWrapper* schemaWrapper = tCloneSSchemaWrapper(&pInfo->pCur->mr.me.stbEntry.schemaRow); SSchemaWrapper* schemaWrapper = tCloneSSchemaWrapper(&pInfo->pCur->mr.me.stbEntry.schemaRow);
@ -561,7 +577,8 @@ static SSDataBlock* sysTableScanUserCols(SOperatorInfo* pOperator) {
} }
continue; continue;
} else if (pInfo->pCur->mr.me.type == TSDB_CHILD_TABLE) { } else if (pInfo->pCur->mr.me.type == TSDB_CHILD_TABLE) {
qDebug("sysTableScanUserCols cursor get child table"); qDebug("sysTableScanUserCols cursor get child table, %s", GET_TASKID(pTaskInfo));
STR_TO_VARSTR(typeName, "CHILD_TABLE"); STR_TO_VARSTR(typeName, "CHILD_TABLE");
STR_TO_VARSTR(tableName, pInfo->pCur->mr.me.name); STR_TO_VARSTR(tableName, pInfo->pCur->mr.me.name);
int64_t suid = pInfo->pCur->mr.me.ctbEntry.suid; int64_t suid = pInfo->pCur->mr.me.ctbEntry.suid;
@ -570,13 +587,14 @@ static SSDataBlock* sysTableScanUserCols(SOperatorInfo* pOperator) {
schemaRow = *(SSchemaWrapper**)schema; schemaRow = *(SSchemaWrapper**)schema;
} else { } else {
SMetaReader smrSuperTable = {0}; SMetaReader smrSuperTable = {0};
pAPI->metaReaderFn.initReader(&smrSuperTable, pInfo->readHandle.vnode, META_READER_LOCK, &pAPI->metaFn); pAPI->metaReaderFn.initReader(&smrSuperTable, pInfo->readHandle.vnode, META_READER_NOLOCK, &pAPI->metaFn);
int code = pAPI->metaReaderFn.getTableEntryByUid(&smrSuperTable, suid); int code = pAPI->metaReaderFn.getTableEntryByUid(&smrSuperTable, suid);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
// terrno has been set by pAPI->metaReaderFn.getTableEntryByName, therefore, return directly // terrno has been set by pAPI->metaReaderFn.getTableEntryByName, therefore, return directly
qError("sysTableScanUserCols get meta by suid:%" PRId64 " error, code:%d", suid, code); qError("sysTableScanUserCols get meta by suid:%" PRId64 " error, code:%d, %s", suid, code, GET_TASKID(pTaskInfo));
pAPI->metaReaderFn.clearReader(&smrSuperTable); pAPI->metaReaderFn.clearReader(&smrSuperTable);
blockDataDestroy(dataBlock); blockDataDestroy(pDataBlock);
pInfo->loadInfo.totalRows = 0; pInfo->loadInfo.totalRows = 0;
return NULL; return NULL;
} }
@ -586,17 +604,17 @@ static SSDataBlock* sysTableScanUserCols(SOperatorInfo* pOperator) {
pAPI->metaReaderFn.clearReader(&smrSuperTable); pAPI->metaReaderFn.clearReader(&smrSuperTable);
} }
} else if (pInfo->pCur->mr.me.type == TSDB_NORMAL_TABLE) { } else if (pInfo->pCur->mr.me.type == TSDB_NORMAL_TABLE) {
qDebug("sysTableScanUserCols cursor get normal table"); qDebug("sysTableScanUserCols cursor get normal table, %s", GET_TASKID(pTaskInfo));
schemaRow = &pInfo->pCur->mr.me.ntbEntry.schemaRow; schemaRow = &pInfo->pCur->mr.me.ntbEntry.schemaRow;
STR_TO_VARSTR(typeName, "NORMAL_TABLE"); STR_TO_VARSTR(typeName, "NORMAL_TABLE");
STR_TO_VARSTR(tableName, pInfo->pCur->mr.me.name); STR_TO_VARSTR(tableName, pInfo->pCur->mr.me.name);
} else { } else {
qDebug("sysTableScanUserCols cursor get invalid table"); qDebug("sysTableScanUserCols cursor get invalid table, %s", GET_TASKID(pTaskInfo));
continue; continue;
} }
if ((numOfRows + schemaRow->nCols) > pOperator->resultInfo.capacity) { if ((numOfRows + schemaRow->nCols) > pOperator->resultInfo.capacity) {
relocateAndFilterSysTagsScanResult(pInfo, numOfRows, dataBlock, pOperator->exprSupp.pFilterInfo); relocateAndFilterSysTagsScanResult(pInfo, numOfRows, pDataBlock, pOperator->exprSupp.pFilterInfo);
numOfRows = 0; numOfRows = 0;
pInfo->restore = true; pInfo->restore = true;
@ -605,17 +623,17 @@ static SSDataBlock* sysTableScanUserCols(SOperatorInfo* pOperator) {
break; break;
} }
} else { } else {
sysTableUserColsFillOneTableCols(pInfo, dbname, &numOfRows, dataBlock, tableName, schemaRow, typeName); sysTableUserColsFillOneTableCols(pInfo, dbname, &numOfRows, pDataBlock, tableName, schemaRow, typeName);
} }
} }
if (numOfRows > 0) { if (numOfRows > 0) {
pAPI->metaFn.pauseTableMetaCursor(pInfo->pCur); pAPI->metaFn.pauseTableMetaCursor(pInfo->pCur);
relocateAndFilterSysTagsScanResult(pInfo, numOfRows, dataBlock, pOperator->exprSupp.pFilterInfo); relocateAndFilterSysTagsScanResult(pInfo, numOfRows, pDataBlock, pOperator->exprSupp.pFilterInfo);
numOfRows = 0; numOfRows = 0;
} }
blockDataDestroy(dataBlock); blockDataDestroy(pDataBlock);
if (ret != 0) { if (ret != 0) {
pAPI->metaFn.closeTableMetaCursor(pInfo->pCur); pAPI->metaFn.closeTableMetaCursor(pInfo->pCur);
pInfo->pCur = NULL; pInfo->pCur = NULL;
@ -623,8 +641,7 @@ static SSDataBlock* sysTableScanUserCols(SOperatorInfo* pOperator) {
} }
pInfo->loadInfo.totalRows += pInfo->pRes->info.rows; pInfo->loadInfo.totalRows += pInfo->pRes->info.rows;
qDebug("sysTableScanUserCols get cols success, rows:%" PRIu64, pInfo->loadInfo.totalRows); qDebug("get cols success, rows:%" PRIu64 " %s", pInfo->loadInfo.totalRows, GET_TASKID(pTaskInfo));
return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes; return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes;
} }