fix(query): set the table schema correctly when the table is dropped.

This commit is contained in:
Haojun Liao 2023-04-18 09:03:30 +08:00
parent 2ae05b8cf6
commit 7d3e5aa3d3
1 changed files with 32 additions and 18 deletions

View File

@ -1881,8 +1881,8 @@ static FORCE_INLINE STSchema* getLatestTableSchema(STsdbReader* pReader, uint64_
return pReader->pSchema; return pReader->pSchema;
} }
pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, uid, -1, 1); int32_t code = metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, uid, -1, 1, &pReader->pSchema);
if (pReader->pSchema == NULL) { if (code != TSDB_CODE_SUCCESS || pReader->pSchema == NULL) {
tsdbError("failed to get table schema, uid:%" PRIu64 ", it may have been dropped, ver:-1, %s", uid, pReader->idStr); tsdbError("failed to get table schema, uid:%" PRIu64 ", it may have been dropped, ver:-1, %s", uid, pReader->idStr);
} }
@ -1890,9 +1890,15 @@ static FORCE_INLINE STSchema* getLatestTableSchema(STsdbReader* pReader, uint64_
} }
static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader* pReader, uint64_t uid) { static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader* pReader, uint64_t uid) {
int32_t code = 0;
// always set the newest schema version in pReader->pSchema // always set the newest schema version in pReader->pSchema
if (pReader->pSchema == NULL) { if (pReader->pSchema == NULL) {
pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, uid, -1, 1); code = metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, uid, -1, 1, &pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
return NULL;
}
} }
if (pReader->pSchema && sversion == pReader->pSchema->version) { if (pReader->pSchema && sversion == pReader->pSchema->version) {
@ -1905,7 +1911,7 @@ static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader*
} }
STSchema* ptr = NULL; STSchema* ptr = NULL;
int32_t code = metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &ptr); code = metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &ptr);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
terrno = code; terrno = code;
return NULL; return NULL;
@ -2014,6 +2020,10 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
if (minKey == k.ts) { if (minKey == k.ts) {
init = true; init = true;
STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid); STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
if (pSchema == NULL) {
return terrno;
}
int32_t code = tsdbRowMergerInit(&merge, NULL, pRow, pSchema); int32_t code = tsdbRowMergerInit(&merge, NULL, pRow, pSchema);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
@ -2222,6 +2232,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
if (pSchema == NULL) { if (pSchema == NULL) {
return code; return code;
} }
STSchema* piSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid); STSchema* piSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid);
if (piSchema == NULL) { if (piSchema == NULL) {
return code; return code;
@ -3843,11 +3854,8 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter,
return terrno; return terrno;
} }
if (pReader->pSchema == NULL) { STSchema* ps = (pReader->pSchema != NULL)? pReader->pSchema:pTSchema;
pReader->pSchema = pTSchema; code = tsdbRowMergerInit(&merge, ps, &current, pTSchema);
}
code = tsdbRowMergerInit(&merge, pReader->pSchema, &current, pTSchema);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
@ -3891,7 +3899,14 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* p
TSDBKEY k = TSDBROW_KEY(pRow); TSDBKEY k = TSDBROW_KEY(pRow);
TSDBKEY ik = TSDBROW_KEY(piRow); TSDBKEY ik = TSDBROW_KEY(piRow);
STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid); STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
if (pSchema == NULL) {
return terrno;
}
STSchema* piSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid); STSchema* piSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid);
if (piSchema == NULL) {
return terrno;
}
if (ASCENDING_TRAVERSE(pReader->order)) { // ascending order imem --> mem if (ASCENDING_TRAVERSE(pReader->order)) { // ascending order imem --> mem
int32_t code = tsdbRowMergerInit(&merge, pSchema, piRow, piSchema); int32_t code = tsdbRowMergerInit(&merge, pSchema, piRow, piSchema);
@ -4000,10 +4015,11 @@ int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, SRow* pT
int64_t uid = pScanInfo->uid; int64_t uid = pScanInfo->uid;
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock);
SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo; SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
STSchema* pSchema = doGetSchemaForTSRow(pTSRow->sver, pReader, uid); STSchema* pSchema = doGetSchemaForTSRow(pTSRow->sver, pReader, uid);
if (pSchema == NULL) {
return terrno;
}
SColVal colVal = {0}; SColVal colVal = {0};
int32_t i = 0, j = 0; int32_t i = 0, j = 0;
@ -5187,8 +5203,6 @@ int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) {
} }
int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int64_t* suid) { int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int64_t* suid) {
int32_t sversion = 1;
SMetaReader mr = {0}; SMetaReader mr = {0};
metaReaderInit(&mr, pVnode->pMeta, 0); metaReaderInit(&mr, pVnode->pMeta, 0);
int32_t code = metaGetTableEntryByUidCache(&mr, uid); int32_t code = metaGetTableEntryByUidCache(&mr, uid);
@ -5200,6 +5214,7 @@ int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int6
*suid = 0; *suid = 0;
// only child table and ordinary table is allowed, super table is not allowed.
if (mr.me.type == TSDB_CHILD_TABLE) { if (mr.me.type == TSDB_CHILD_TABLE) {
tDecoderClear(&mr.coder); tDecoderClear(&mr.coder);
*suid = mr.me.ctbEntry.suid; *suid = mr.me.ctbEntry.suid;
@ -5209,9 +5224,7 @@ int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int6
metaReaderClear(&mr); metaReaderClear(&mr);
return terrno; return terrno;
} }
sversion = mr.me.stbEntry.schemaRow.version; } else if (mr.me.type == TSDB_NORMAL_TABLE) { // do nothing
} else if (mr.me.type == TSDB_NORMAL_TABLE) {
sversion = mr.me.ntbEntry.schemaRow.version;
} else { } else {
terrno = TSDB_CODE_INVALID_PARA; terrno = TSDB_CODE_INVALID_PARA;
metaReaderClear(&mr); metaReaderClear(&mr);
@ -5219,9 +5232,10 @@ int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int6
} }
metaReaderClear(&mr); metaReaderClear(&mr);
*pSchema = metaGetTbTSchema(pVnode->pMeta, uid, sversion, 1);
return TSDB_CODE_SUCCESS; // get the newest table schema version
code = metaGetTbTSchemaEx(pVnode->pMeta, uid, -1, 1, pSchema);
return code;
} }
int32_t tsdbTakeReadSnap(STsdbReader* pReader, _query_reseek_func_t reseek, STsdbReadSnap** ppSnap) { int32_t tsdbTakeReadSnap(STsdbReader* pReader, _query_reseek_func_t reseek, STsdbReadSnap** ppSnap) {