fix: sys table scan and meta query
This commit is contained in:
parent
61b680241d
commit
fb2667d92d
|
@ -238,12 +238,12 @@ typedef struct SStoreSnapshotFn {
|
||||||
} SStoreSnapshotFn;
|
} SStoreSnapshotFn;
|
||||||
|
|
||||||
typedef struct SStoreMeta {
|
typedef struct SStoreMeta {
|
||||||
SMTbCursor* (*openTableMetaCursor)(void* pVnode); // metaOpenTbCursor
|
SMTbCursor* (*openTableMetaCursor)(void* pVnode); // metaOpenTbCursor
|
||||||
void (*closeTableMetaCursor)(SMTbCursor* pTbCur); // metaCloseTbCursor
|
void (*closeTableMetaCursor)(SMTbCursor* pTbCur); // metaCloseTbCursor
|
||||||
void (*pauseTableMetaCursor)(SMTbCursor* pTbCur); // metaPauseTbCursor
|
void (*pauseTableMetaCursor)(SMTbCursor* pTbCur); // metaPauseTbCursor
|
||||||
void (*resumeTableMetaCursor)(SMTbCursor* pTbCur, int8_t first); // metaResumeTbCursor
|
void (*resumeTableMetaCursor)(SMTbCursor* pTbCur, int8_t first, int8_t move); // metaResumeTbCursor
|
||||||
int32_t (*cursorNext)(SMTbCursor* pTbCur, ETableType jumpTableType); // metaTbCursorNext
|
int32_t (*cursorNext)(SMTbCursor* pTbCur, ETableType jumpTableType); // metaTbCursorNext
|
||||||
int32_t (*cursorPrev)(SMTbCursor* pTbCur, ETableType jumpTableType); // metaTbCursorPrev
|
int32_t (*cursorPrev)(SMTbCursor* pTbCur, ETableType jumpTableType); // metaTbCursorPrev
|
||||||
|
|
||||||
int32_t (*getTableTags)(void* pVnode, uint64_t suid, SArray* uidList);
|
int32_t (*getTableTags)(void* pVnode, uint64_t suid, SArray* uidList);
|
||||||
int32_t (*getTableTagsByUid)(void* pVnode, int64_t suid, SArray* uidList);
|
int32_t (*getTableTagsByUid)(void* pVnode, int64_t suid, SArray* uidList);
|
||||||
|
|
|
@ -136,7 +136,7 @@ typedef SVCreateTSmaReq SSmaCfg;
|
||||||
SMTbCursor* metaOpenTbCursor(void* pVnode);
|
SMTbCursor* metaOpenTbCursor(void* pVnode);
|
||||||
void metaCloseTbCursor(SMTbCursor* pTbCur);
|
void metaCloseTbCursor(SMTbCursor* pTbCur);
|
||||||
void metaPauseTbCursor(SMTbCursor* pTbCur);
|
void metaPauseTbCursor(SMTbCursor* pTbCur);
|
||||||
void metaResumeTbCursor(SMTbCursor* pTbCur, int8_t first);
|
void metaResumeTbCursor(SMTbCursor* pTbCur, int8_t first, int8_t move);
|
||||||
int32_t metaTbCursorNext(SMTbCursor* pTbCur, ETableType jumpTableType);
|
int32_t metaTbCursorNext(SMTbCursor* pTbCur, ETableType jumpTableType);
|
||||||
int32_t metaTbCursorPrev(SMTbCursor* pTbCur, ETableType jumpTableType);
|
int32_t metaTbCursorPrev(SMTbCursor* pTbCur, ETableType jumpTableType);
|
||||||
|
|
||||||
|
|
|
@ -248,7 +248,7 @@ SMTbCursor *metaOpenTbCursor(void *pVnode) {
|
||||||
// tdbTbcMoveToFirst((TBC *)pTbCur->pDbc);
|
// tdbTbcMoveToFirst((TBC *)pTbCur->pDbc);
|
||||||
pTbCur->pMeta = pVnodeObj->pMeta;
|
pTbCur->pMeta = pVnodeObj->pMeta;
|
||||||
pTbCur->paused = 1;
|
pTbCur->paused = 1;
|
||||||
metaResumeTbCursor(pTbCur, 1);
|
metaResumeTbCursor(pTbCur, 1, 0);
|
||||||
return pTbCur;
|
return pTbCur;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -273,7 +273,7 @@ void metaPauseTbCursor(SMTbCursor *pTbCur) {
|
||||||
pTbCur->paused = 1;
|
pTbCur->paused = 1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
void metaResumeTbCursor(SMTbCursor *pTbCur, int8_t first) {
|
void metaResumeTbCursor(SMTbCursor *pTbCur, int8_t first, int8_t move) {
|
||||||
if (pTbCur->paused) {
|
if (pTbCur->paused) {
|
||||||
metaReaderDoInit(&pTbCur->mr, pTbCur->pMeta, META_READER_LOCK);
|
metaReaderDoInit(&pTbCur->mr, pTbCur->pMeta, META_READER_LOCK);
|
||||||
|
|
||||||
|
@ -282,9 +282,11 @@ void metaResumeTbCursor(SMTbCursor *pTbCur, int8_t first) {
|
||||||
if (first) {
|
if (first) {
|
||||||
tdbTbcMoveToFirst((TBC *)pTbCur->pDbc);
|
tdbTbcMoveToFirst((TBC *)pTbCur->pDbc);
|
||||||
} else {
|
} else {
|
||||||
int c = 0;
|
int c = 1;
|
||||||
tdbTbcMoveTo(pTbCur->pDbc, pTbCur->pKey, pTbCur->kLen, &c);
|
tdbTbcMoveTo(pTbCur->pDbc, pTbCur->pKey, pTbCur->kLen, &c);
|
||||||
if (c < 0) {
|
if (c == 0) {
|
||||||
|
if (move) tdbTbcMoveToNext(pTbCur->pDbc);
|
||||||
|
} else if (c < 0) {
|
||||||
tdbTbcMoveToPrev(pTbCur->pDbc);
|
tdbTbcMoveToPrev(pTbCur->pDbc);
|
||||||
} else {
|
} else {
|
||||||
tdbTbcMoveToNext(pTbCur->pDbc);
|
tdbTbcMoveToNext(pTbCur->pDbc);
|
||||||
|
|
|
@ -539,7 +539,7 @@ static SSDataBlock* sysTableScanUserCols(SOperatorInfo* pOperator) {
|
||||||
if (pInfo->pCur == NULL) {
|
if (pInfo->pCur == NULL) {
|
||||||
pInfo->pCur = pAPI->metaFn.openTableMetaCursor(pInfo->readHandle.vnode);
|
pInfo->pCur = pAPI->metaFn.openTableMetaCursor(pInfo->readHandle.vnode);
|
||||||
} else {
|
} else {
|
||||||
pAPI->metaFn.resumeTableMetaCursor(pInfo->pCur, 0);
|
pAPI->metaFn.resumeTableMetaCursor(pInfo->pCur, 0, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pInfo->pSchema == NULL) {
|
if (pInfo->pSchema == NULL) {
|
||||||
|
@ -555,14 +555,8 @@ static SSDataBlock* sysTableScanUserCols(SOperatorInfo* pOperator) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t restore = pInfo->restore;
|
|
||||||
pInfo->restore = false;
|
|
||||||
|
|
||||||
while (restore || ((ret = pAPI->metaFn.cursorNext(pInfo->pCur, TSDB_TABLE_MAX)) == 0)) {
|
|
||||||
if (restore) {
|
|
||||||
restore = false;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
while (((ret = pAPI->metaFn.cursorNext(pInfo->pCur, TSDB_TABLE_MAX)) == 0)) {
|
||||||
char typeName[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
char typeName[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||||
char tableName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
char tableName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||||
|
|
||||||
|
@ -616,7 +610,6 @@ static SSDataBlock* sysTableScanUserCols(SOperatorInfo* pOperator) {
|
||||||
if ((numOfRows + schemaRow->nCols) > pOperator->resultInfo.capacity) {
|
if ((numOfRows + schemaRow->nCols) > pOperator->resultInfo.capacity) {
|
||||||
relocateAndFilterSysTagsScanResult(pInfo, numOfRows, pDataBlock, pOperator->exprSupp.pFilterInfo);
|
relocateAndFilterSysTagsScanResult(pInfo, numOfRows, pDataBlock, pOperator->exprSupp.pFilterInfo);
|
||||||
numOfRows = 0;
|
numOfRows = 0;
|
||||||
pInfo->restore = true;
|
|
||||||
|
|
||||||
if (pInfo->pRes->info.rows > 0) {
|
if (pInfo->pRes->info.rows > 0) {
|
||||||
pAPI->metaFn.pauseTableMetaCursor(pInfo->pCur);
|
pAPI->metaFn.pauseTableMetaCursor(pInfo->pCur);
|
||||||
|
@ -724,7 +717,7 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
|
||||||
if (pInfo->pCur == NULL) {
|
if (pInfo->pCur == NULL) {
|
||||||
pInfo->pCur = pAPI->metaFn.openTableMetaCursor(pInfo->readHandle.vnode);
|
pInfo->pCur = pAPI->metaFn.openTableMetaCursor(pInfo->readHandle.vnode);
|
||||||
} else {
|
} else {
|
||||||
pAPI->metaFn.resumeTableMetaCursor(pInfo->pCur, 0);
|
pAPI->metaFn.resumeTableMetaCursor(pInfo->pCur, 0, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool blockFull = false;
|
bool blockFull = false;
|
||||||
|
@ -750,7 +743,6 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((smrSuperTable.me.stbEntry.schemaTag.nCols + numOfRows) > pOperator->resultInfo.capacity) {
|
if ((smrSuperTable.me.stbEntry.schemaTag.nCols + numOfRows) > pOperator->resultInfo.capacity) {
|
||||||
pAPI->metaFn.cursorPrev(pInfo->pCur, TSDB_TABLE_MAX);
|
|
||||||
blockFull = true;
|
blockFull = true;
|
||||||
} else {
|
} else {
|
||||||
sysTableUserTagsFillOneTableTags(pInfo, &smrSuperTable, &pInfo->pCur->mr, dbname, tableName, &numOfRows,
|
sysTableUserTagsFillOneTableTags(pInfo, &smrSuperTable, &pInfo->pCur->mr, dbname, tableName, &numOfRows,
|
||||||
|
@ -1354,7 +1346,7 @@ static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) {
|
||||||
firstMetaCursor = 1;
|
firstMetaCursor = 1;
|
||||||
}
|
}
|
||||||
if (!firstMetaCursor) {
|
if (!firstMetaCursor) {
|
||||||
pAPI->metaFn.resumeTableMetaCursor(pInfo->pCur, 0);
|
pAPI->metaFn.resumeTableMetaCursor(pInfo->pCur, 0, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
blockDataCleanup(pInfo->pRes);
|
blockDataCleanup(pInfo->pRes);
|
||||||
|
|
Loading…
Reference in New Issue