Merge pull request #25149 from taosdata/fix/3_liaohj

enh: meta pause/resume for sys table scan
This commit is contained in:
Haojun Liao 2024-03-22 17:32:35 +08:00 committed by GitHub
commit bd52416af9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 125 additions and 104 deletions

View File

@ -238,12 +238,12 @@ typedef struct SStoreSnapshotFn {
} SStoreSnapshotFn;
typedef struct SStoreMeta {
SMTbCursor* (*openTableMetaCursor)(void* pVnode); // metaOpenTbCursor
void (*closeTableMetaCursor)(SMTbCursor* pTbCur); // metaCloseTbCursor
void (*pauseTableMetaCursor)(SMTbCursor* pTbCur); // metaPauseTbCursor
void (*resumeTableMetaCursor)(SMTbCursor* pTbCur, int8_t first); // metaResumeTbCursor
int32_t (*cursorNext)(SMTbCursor* pTbCur, ETableType jumpTableType); // metaTbCursorNext
int32_t (*cursorPrev)(SMTbCursor* pTbCur, ETableType jumpTableType); // metaTbCursorPrev
SMTbCursor* (*openTableMetaCursor)(void* pVnode); // metaOpenTbCursor
void (*closeTableMetaCursor)(SMTbCursor* pTbCur); // metaCloseTbCursor
void (*pauseTableMetaCursor)(SMTbCursor* pTbCur); // metaPauseTbCursor
void (*resumeTableMetaCursor)(SMTbCursor* pTbCur, int8_t first, int8_t move); // metaResumeTbCursor
int32_t (*cursorNext)(SMTbCursor* pTbCur, ETableType jumpTableType); // metaTbCursorNext
int32_t (*cursorPrev)(SMTbCursor* pTbCur, ETableType jumpTableType); // metaTbCursorPrev
int32_t (*getTableTags)(void* pVnode, uint64_t suid, SArray* uidList);
int32_t (*getTableTagsByUid)(void* pVnode, int64_t suid, SArray* uidList);

View File

@ -136,7 +136,7 @@ typedef SVCreateTSmaReq SSmaCfg;
SMTbCursor* metaOpenTbCursor(void* pVnode);
void metaCloseTbCursor(SMTbCursor* pTbCur);
void metaPauseTbCursor(SMTbCursor* pTbCur);
void metaResumeTbCursor(SMTbCursor* pTbCur, int8_t first);
void metaResumeTbCursor(SMTbCursor* pTbCur, int8_t first, int8_t move);
int32_t metaTbCursorNext(SMTbCursor* pTbCur, ETableType jumpTableType);
int32_t metaTbCursorPrev(SMTbCursor* pTbCur, ETableType jumpTableType);

View File

@ -248,7 +248,7 @@ SMTbCursor *metaOpenTbCursor(void *pVnode) {
// tdbTbcMoveToFirst((TBC *)pTbCur->pDbc);
pTbCur->pMeta = pVnodeObj->pMeta;
pTbCur->paused = 1;
metaResumeTbCursor(pTbCur, 1);
metaResumeTbCursor(pTbCur, 1, 0);
return pTbCur;
}
@ -273,7 +273,7 @@ void metaPauseTbCursor(SMTbCursor *pTbCur) {
pTbCur->paused = 1;
}
}
void metaResumeTbCursor(SMTbCursor *pTbCur, int8_t first) {
void metaResumeTbCursor(SMTbCursor *pTbCur, int8_t first, int8_t move) {
if (pTbCur->paused) {
metaReaderDoInit(&pTbCur->mr, pTbCur->pMeta, META_READER_LOCK);
@ -282,9 +282,11 @@ void metaResumeTbCursor(SMTbCursor *pTbCur, int8_t first) {
if (first) {
tdbTbcMoveToFirst((TBC *)pTbCur->pDbc);
} else {
int c = 0;
int c = 1;
tdbTbcMoveTo(pTbCur->pDbc, pTbCur->pKey, pTbCur->kLen, &c);
if (c < 0) {
if (c == 0) {
if (move) tdbTbcMoveToNext(pTbCur->pDbc);
} else if (c < 0) {
tdbTbcMoveToPrev(pTbCur->pDbc);
} else {
tdbTbcMoveToNext(pTbCur->pDbc);

View File

@ -433,95 +433,113 @@ static bool sysTableIsCondOnOneTable(SNode* pCond, char* condTable) {
return false;
}
static SSDataBlock* sysTableScanUserCols(SOperatorInfo* pOperator) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
static SSDataBlock* doOptimizeTableNameFilter(SOperatorInfo* pOperator, SSDataBlock* dataBlock, char* dbname) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
SSysTableScanInfo* pInfo = pOperator->info;
if (pOperator->status == OP_EXEC_DONE) {
int32_t numOfRows = 0;
char tableName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(tableName, pInfo->req.filterTb);
SMetaReader smrTable = {0};
pAPI->metaReaderFn.initReader(&smrTable, pInfo->readHandle.vnode, META_READER_LOCK, &pAPI->metaFn);
int32_t code = pAPI->metaReaderFn.getTableEntryByName(&smrTable, pInfo->req.filterTb);
if (code != TSDB_CODE_SUCCESS) {
// terrno has been set by pAPI->metaReaderFn.getTableEntryByName, therefore, return directly
pAPI->metaReaderFn.clearReader(&smrTable);
pInfo->loadInfo.totalRows = 0;
return NULL;
}
blockDataCleanup(pInfo->pRes);
int32_t numOfRows = 0;
if (smrTable.me.type == TSDB_SUPER_TABLE) {
pAPI->metaReaderFn.clearReader(&smrTable);
pInfo->loadInfo.totalRows = 0;
return NULL;
}
SSDataBlock* dataBlock = buildInfoSchemaTableMetaBlock(TSDB_INS_TABLE_COLS);
blockDataEnsureCapacity(dataBlock, pOperator->resultInfo.capacity);
if (smrTable.me.type == TSDB_CHILD_TABLE) {
int64_t suid = smrTable.me.ctbEntry.suid;
pAPI->metaReaderFn.clearReader(&smrTable);
pAPI->metaReaderFn.initReader(&smrTable, pInfo->readHandle.vnode, META_READER_LOCK, &pAPI->metaFn);
code = pAPI->metaReaderFn.getTableEntryByUid(&smrTable, suid);
if (code != TSDB_CODE_SUCCESS) {
// terrno has been set by pAPI->metaReaderFn.getTableEntryByName, therefore, return directly
pAPI->metaReaderFn.clearReader(&smrTable);
pInfo->loadInfo.totalRows = 0;
return NULL;
}
}
char typeName[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
SSchemaWrapper* schemaRow = NULL;
if (smrTable.me.type == TSDB_SUPER_TABLE) {
schemaRow = &smrTable.me.stbEntry.schemaRow;
STR_TO_VARSTR(typeName, "CHILD_TABLE");
} else if (smrTable.me.type == TSDB_NORMAL_TABLE) {
schemaRow = &smrTable.me.ntbEntry.schemaRow;
STR_TO_VARSTR(typeName, "NORMAL_TABLE");
}
sysTableUserColsFillOneTableCols(pInfo, dbname, &numOfRows, dataBlock, tableName, schemaRow, typeName);
pAPI->metaReaderFn.clearReader(&smrTable);
if (numOfRows > 0) {
relocateAndFilterSysTagsScanResult(pInfo, numOfRows, dataBlock, pOperator->exprSupp.pFilterInfo);
numOfRows = 0;
}
pInfo->loadInfo.totalRows += pInfo->pRes->info.rows;
setOperatorCompleted(pOperator);
qDebug("get cols success, total rows:%" PRIu64 ", current:%" PRId64 " %s", pInfo->loadInfo.totalRows,
pInfo->pRes->info.rows, GET_TASKID(pTaskInfo));
return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes;
}
int32_t doExtractDbName(char* dbname, SSysTableScanInfo* pInfo, SStorageAPI* pAPI) {
SName sn = {0};
const char* db = NULL;
int32_t vgId = 0;
pAPI->metaFn.getBasicInfo(pInfo->readHandle.vnode, &db, &vgId, NULL, NULL);
SName sn = {0};
char dbname[TSDB_DB_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
tNameFromString(&sn, db, T_NAME_ACCT | T_NAME_DB);
tNameGetDbName(&sn, varDataVal(dbname));
varDataSetLen(dbname, strlen(varDataVal(dbname)));
// optimize when sql like where table_name='tablename' and xxx.
if (pInfo->req.filterTb[0]) {
char tableName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(tableName, pInfo->req.filterTb);
return TSDB_CODE_SUCCESS;
}
SMetaReader smrTable = {0};
pAPI->metaReaderFn.initReader(&smrTable, pInfo->readHandle.vnode, META_READER_LOCK, &pAPI->metaFn);
int32_t code = pAPI->metaReaderFn.getTableEntryByName(&smrTable, pInfo->req.filterTb);
if (code != TSDB_CODE_SUCCESS) {
// terrno has been set by pAPI->metaReaderFn.getTableEntryByName, therefore, return directly
pAPI->metaReaderFn.clearReader(&smrTable);
blockDataDestroy(dataBlock);
pInfo->loadInfo.totalRows = 0;
return NULL;
}
static SSDataBlock* sysTableScanUserCols(SOperatorInfo* pOperator) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
SSysTableScanInfo* pInfo = pOperator->info;
int32_t numOfRows = 0;
int32_t ret = 0;
char dbname[TSDB_DB_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
SSDataBlock* pDataBlock = NULL;
if (smrTable.me.type == TSDB_SUPER_TABLE) {
pAPI->metaReaderFn.clearReader(&smrTable);
blockDataDestroy(dataBlock);
pInfo->loadInfo.totalRows = 0;
return NULL;
}
if (smrTable.me.type == TSDB_CHILD_TABLE) {
int64_t suid = smrTable.me.ctbEntry.suid;
pAPI->metaReaderFn.clearReader(&smrTable);
pAPI->metaReaderFn.initReader(&smrTable, pInfo->readHandle.vnode, META_READER_LOCK, &pAPI->metaFn);
code = pAPI->metaReaderFn.getTableEntryByUid(&smrTable, suid);
if (code != TSDB_CODE_SUCCESS) {
// terrno has been set by pAPI->metaReaderFn.getTableEntryByName, therefore, return directly
pAPI->metaReaderFn.clearReader(&smrTable);
blockDataDestroy(dataBlock);
pInfo->loadInfo.totalRows = 0;
return NULL;
}
}
char typeName[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
SSchemaWrapper* schemaRow = NULL;
if (smrTable.me.type == TSDB_SUPER_TABLE) {
schemaRow = &smrTable.me.stbEntry.schemaRow;
STR_TO_VARSTR(typeName, "CHILD_TABLE");
} else if (smrTable.me.type == TSDB_NORMAL_TABLE) {
schemaRow = &smrTable.me.ntbEntry.schemaRow;
STR_TO_VARSTR(typeName, "NORMAL_TABLE");
}
sysTableUserColsFillOneTableCols(pInfo, dbname, &numOfRows, dataBlock, tableName, schemaRow, typeName);
pAPI->metaReaderFn.clearReader(&smrTable);
if (numOfRows > 0) {
relocateAndFilterSysTagsScanResult(pInfo, numOfRows, dataBlock, pOperator->exprSupp.pFilterInfo);
numOfRows = 0;
}
blockDataDestroy(dataBlock);
pInfo->loadInfo.totalRows += pInfo->pRes->info.rows;
setOperatorCompleted(pOperator);
return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes;
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
blockDataCleanup(pInfo->pRes);
pDataBlock = buildInfoSchemaTableMetaBlock(TSDB_INS_TABLE_COLS);
blockDataEnsureCapacity(pDataBlock, pOperator->resultInfo.capacity);
doExtractDbName(dbname, pInfo, pAPI);
// optimize when sql like where table_name='tablename' and xxx.
if (pInfo->req.filterTb[0]) {
SSDataBlock* p = doOptimizeTableNameFilter(pOperator, pDataBlock, dbname);
blockDataDestroy(pDataBlock);
return p;
}
int32_t ret = 0;
if (pInfo->pCur == NULL) {
pInfo->pCur = pAPI->metaFn.openTableMetaCursor(pInfo->readHandle.vnode);
} else {
pAPI->metaFn.resumeTableMetaCursor(pInfo->pCur, 0, 0);
}
if (pInfo->pSchema == NULL) {
@ -532,26 +550,20 @@ static SSDataBlock* sysTableScanUserCols(SOperatorInfo* pOperator) {
if (!pInfo->pCur || !pInfo->pSchema) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
qError("sysTableScanUserCols failed since %s", terrstr(terrno));
blockDataDestroy(dataBlock);
blockDataDestroy(pDataBlock);
pInfo->loadInfo.totalRows = 0;
return NULL;
}
int32_t restore = pInfo->restore;
pInfo->restore = false;
while (restore || ((ret = pAPI->metaFn.cursorNext(pInfo->pCur, TSDB_TABLE_MAX)) == 0)) {
if (restore) {
restore = false;
}
while (((ret = pAPI->metaFn.cursorNext(pInfo->pCur, TSDB_TABLE_MAX)) == 0)) {
char typeName[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
char tableName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
SSchemaWrapper* schemaRow = NULL;
if (pInfo->pCur->mr.me.type == TSDB_SUPER_TABLE) {
qDebug("sysTableScanUserCols cursor get super table");
qDebug("sysTableScanUserCols cursor get super table, %s", GET_TASKID(pTaskInfo));
void* schema = taosHashGet(pInfo->pSchema, &pInfo->pCur->mr.me.uid, sizeof(int64_t));
if (schema == NULL) {
SSchemaWrapper* schemaWrapper = tCloneSSchemaWrapper(&pInfo->pCur->mr.me.stbEntry.schemaRow);
@ -559,7 +571,8 @@ static SSDataBlock* sysTableScanUserCols(SOperatorInfo* pOperator) {
}
continue;
} else if (pInfo->pCur->mr.me.type == TSDB_CHILD_TABLE) {
qDebug("sysTableScanUserCols cursor get child table");
qDebug("sysTableScanUserCols cursor get child table, %s", GET_TASKID(pTaskInfo));
STR_TO_VARSTR(typeName, "CHILD_TABLE");
STR_TO_VARSTR(tableName, pInfo->pCur->mr.me.name);
int64_t suid = pInfo->pCur->mr.me.ctbEntry.suid;
@ -568,13 +581,14 @@ static SSDataBlock* sysTableScanUserCols(SOperatorInfo* pOperator) {
schemaRow = *(SSchemaWrapper**)schema;
} else {
SMetaReader smrSuperTable = {0};
pAPI->metaReaderFn.initReader(&smrSuperTable, pInfo->readHandle.vnode, META_READER_LOCK, &pAPI->metaFn);
pAPI->metaReaderFn.initReader(&smrSuperTable, pInfo->readHandle.vnode, META_READER_NOLOCK, &pAPI->metaFn);
int code = pAPI->metaReaderFn.getTableEntryByUid(&smrSuperTable, suid);
if (code != TSDB_CODE_SUCCESS) {
// terrno has been set by pAPI->metaReaderFn.getTableEntryByName, therefore, return directly
qError("sysTableScanUserCols get meta by suid:%" PRId64 " error, code:%d", suid, code);
qError("sysTableScanUserCols get meta by suid:%" PRId64 " error, code:%d, %s", suid, code, GET_TASKID(pTaskInfo));
pAPI->metaReaderFn.clearReader(&smrSuperTable);
blockDataDestroy(dataBlock);
blockDataDestroy(pDataBlock);
pInfo->loadInfo.totalRows = 0;
return NULL;
}
@ -584,34 +598,35 @@ static SSDataBlock* sysTableScanUserCols(SOperatorInfo* pOperator) {
pAPI->metaReaderFn.clearReader(&smrSuperTable);
}
} else if (pInfo->pCur->mr.me.type == TSDB_NORMAL_TABLE) {
qDebug("sysTableScanUserCols cursor get normal table");
qDebug("sysTableScanUserCols cursor get normal table, %s", GET_TASKID(pTaskInfo));
schemaRow = &pInfo->pCur->mr.me.ntbEntry.schemaRow;
STR_TO_VARSTR(typeName, "NORMAL_TABLE");
STR_TO_VARSTR(tableName, pInfo->pCur->mr.me.name);
} else {
qDebug("sysTableScanUserCols cursor get invalid table");
qDebug("sysTableScanUserCols cursor get invalid table, %s", GET_TASKID(pTaskInfo));
continue;
}
if ((numOfRows + schemaRow->nCols) > pOperator->resultInfo.capacity) {
relocateAndFilterSysTagsScanResult(pInfo, numOfRows, dataBlock, pOperator->exprSupp.pFilterInfo);
relocateAndFilterSysTagsScanResult(pInfo, numOfRows, pDataBlock, pOperator->exprSupp.pFilterInfo);
numOfRows = 0;
pInfo->restore = true;
if (pInfo->pRes->info.rows > 0) {
pAPI->metaFn.pauseTableMetaCursor(pInfo->pCur);
break;
}
} else {
sysTableUserColsFillOneTableCols(pInfo, dbname, &numOfRows, dataBlock, tableName, schemaRow, typeName);
sysTableUserColsFillOneTableCols(pInfo, dbname, &numOfRows, pDataBlock, tableName, schemaRow, typeName);
}
}
if (numOfRows > 0) {
relocateAndFilterSysTagsScanResult(pInfo, numOfRows, dataBlock, pOperator->exprSupp.pFilterInfo);
pAPI->metaFn.pauseTableMetaCursor(pInfo->pCur);
relocateAndFilterSysTagsScanResult(pInfo, numOfRows, pDataBlock, pOperator->exprSupp.pFilterInfo);
numOfRows = 0;
}
blockDataDestroy(dataBlock);
blockDataDestroy(pDataBlock);
if (ret != 0) {
pAPI->metaFn.closeTableMetaCursor(pInfo->pCur);
pInfo->pCur = NULL;
@ -619,8 +634,7 @@ static SSDataBlock* sysTableScanUserCols(SOperatorInfo* pOperator) {
}
pInfo->loadInfo.totalRows += pInfo->pRes->info.rows;
qDebug("sysTableScanUserCols get cols success, rows:%" PRIu64, pInfo->loadInfo.totalRows);
qDebug("get cols success, rows:%" PRIu64 " %s", pInfo->loadInfo.totalRows, GET_TASKID(pTaskInfo));
return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes;
}
@ -702,6 +716,8 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
int32_t ret = 0;
if (pInfo->pCur == NULL) {
pInfo->pCur = pAPI->metaFn.openTableMetaCursor(pInfo->readHandle.vnode);
} else {
pAPI->metaFn.resumeTableMetaCursor(pInfo->pCur, 0, 0);
}
bool blockFull = false;
@ -727,7 +743,6 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
}
if ((smrSuperTable.me.stbEntry.schemaTag.nCols + numOfRows) > pOperator->resultInfo.capacity) {
pAPI->metaFn.cursorPrev(pInfo->pCur, TSDB_TABLE_MAX);
blockFull = true;
} else {
sysTableUserTagsFillOneTableTags(pInfo, &smrSuperTable, &pInfo->pCur->mr, dbname, tableName, &numOfRows,
@ -741,6 +756,7 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
numOfRows = 0;
if (pInfo->pRes->info.rows > 0) {
pAPI->metaFn.pauseTableMetaCursor(pInfo->pCur);
break;
}
@ -749,6 +765,7 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
}
if (numOfRows > 0) {
pAPI->metaFn.pauseTableMetaCursor(pInfo->pCur);
relocateAndFilterSysTagsScanResult(pInfo, numOfRows, dataBlock, pOperator->exprSupp.pFilterInfo);
numOfRows = 0;
}
@ -1329,7 +1346,7 @@ static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) {
firstMetaCursor = 1;
}
if (!firstMetaCursor) {
pAPI->metaFn.resumeTableMetaCursor(pInfo->pCur, 0);
pAPI->metaFn.resumeTableMetaCursor(pInfo->pCur, 0, 1);
}
blockDataCleanup(pInfo->pRes);

View File

@ -19,6 +19,8 @@ from util.common import *
from util.sqlset import *
class TDTestCase:
updatecfgDict = {'qDebugFlag':135 , 'mDebugFlag':135}
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug("start to execute %s" % __file__)