fix: support fetching specific schema version from table
This commit is contained in:
parent
882f4f388c
commit
5fa27b27ba
|
@ -605,6 +605,10 @@ static int32_t tdAppendKvRowToDataCol(STSRow *pRow, STSchema *pSchema, SDataCols
|
||||||
* @param pCols
|
* @param pCols
|
||||||
*/
|
*/
|
||||||
int32_t tdAppendSTSRowToDataCol(STSRow *pRow, STSchema *pSchema, SDataCols *pCols, bool isMerge) {
|
int32_t tdAppendSTSRowToDataCol(STSRow *pRow, STSchema *pSchema, SDataCols *pCols, bool isMerge) {
|
||||||
|
#ifdef TD_DEBUG_PRINT_TSDB_LOAD_DCOLS
|
||||||
|
printf("%s:%d ts: %" PRIi64 " sver:%d maxCols:%" PRIi16 " nCols:%" PRIi16 ", nRows:%d\n", __func__, __LINE__,
|
||||||
|
TD_ROW_KEY(pRow), TD_ROW_SVER(pRow), pCols->maxCols, pCols->numOfCols, pCols->numOfRows);
|
||||||
|
#endif
|
||||||
if (TD_IS_TP_ROW(pRow)) {
|
if (TD_IS_TP_ROW(pRow)) {
|
||||||
return tdAppendTpRowToDataCol(pRow, pSchema, pCols, isMerge);
|
return tdAppendTpRowToDataCol(pRow, pSchema, pCols, isMerge);
|
||||||
} else if (TD_IS_KV_ROW(pRow)) {
|
} else if (TD_IS_KV_ROW(pRow)) {
|
||||||
|
|
|
@ -79,7 +79,8 @@ struct STsdb {
|
||||||
struct STable {
|
struct STable {
|
||||||
uint64_t tid;
|
uint64_t tid;
|
||||||
uint64_t uid;
|
uint64_t uid;
|
||||||
STSchema *pSchema;
|
STSchema *pSchema; // latest schema
|
||||||
|
STSchema *pCacheSchema; // cached cache
|
||||||
};
|
};
|
||||||
|
|
||||||
#define TABLE_TID(t) (t)->tid
|
#define TABLE_TID(t) (t)->tid
|
||||||
|
@ -181,12 +182,15 @@ int tsdbUnlockRepo(STsdb *pTsdb);
|
||||||
|
|
||||||
static FORCE_INLINE STSchema *tsdbGetTableSchemaImpl(STsdb *pTsdb, STable *pTable, bool lock, bool copy,
|
static FORCE_INLINE STSchema *tsdbGetTableSchemaImpl(STsdb *pTsdb, STable *pTable, bool lock, bool copy,
|
||||||
int32_t version) {
|
int32_t version) {
|
||||||
if ((version != -1) && (schemaVersion(pTable->pSchema) != version)) {
|
if (version < 0) {
|
||||||
taosMemoryFreeClear(pTable->pSchema);
|
return pTable->pSchema;
|
||||||
pTable->pSchema = metaGetTbTSchema(REPO_META(pTsdb), pTable->uid, version);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return pTable->pSchema;
|
if (!pTable->pCacheSchema || (schemaVersion(pTable->pCacheSchema) != version)) {
|
||||||
|
taosMemoryFreeClear(pTable->pCacheSchema);
|
||||||
|
pTable->pCacheSchema = metaGetTbTSchema(REPO_META(pTsdb), pTable->uid, version);
|
||||||
|
}
|
||||||
|
return pTable->pCacheSchema;
|
||||||
}
|
}
|
||||||
|
|
||||||
// tsdbMemTable.h
|
// tsdbMemTable.h
|
||||||
|
|
|
@ -300,7 +300,7 @@ STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver) {
|
||||||
pSW = metaGetTableSchema(pMeta, quid, sver, 0);
|
pSW = metaGetTableSchema(pMeta, quid, sver, 0);
|
||||||
if (!pSW) return NULL;
|
if (!pSW) return NULL;
|
||||||
|
|
||||||
tdInitTSchemaBuilder(&sb, sver);
|
tdInitTSchemaBuilder(&sb, pSW->version);
|
||||||
for (int i = 0; i < pSW->nCols; i++) {
|
for (int i = 0; i < pSW->nCols; i++) {
|
||||||
pSchema = pSW->pSchema + i;
|
pSchema = pSW->pSchema + i;
|
||||||
tdAddColToSchema(&sb, pSchema->type, pSchema->flags, pSchema->colId, pSchema->bytes);
|
tdAddColToSchema(&sb, pSchema->type, pSchema->flags, pSchema->colId, pSchema->bytes);
|
||||||
|
|
|
@ -441,7 +441,7 @@ static int32_t tdExecuteRSma(SSma *pSma, const void *pMsg, int32_t inputType, tb
|
||||||
|
|
||||||
if (inputType == STREAM_DATA_TYPE_SUBMIT_BLOCK) {
|
if (inputType == STREAM_DATA_TYPE_SUBMIT_BLOCK) {
|
||||||
// TODO: use the proper schema instead of 0, and cache STSchema in cache
|
// TODO: use the proper schema instead of 0, and cache STSchema in cache
|
||||||
STSchema *pTSchema = metaGetTbTSchema(SMA_META(pSma), suid, 1);
|
STSchema *pTSchema = metaGetTbTSchema(SMA_META(pSma), suid, -1);
|
||||||
if (!pTSchema) {
|
if (!pTSchema) {
|
||||||
terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION;
|
terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION;
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
|
|
|
@ -466,7 +466,7 @@ static int tsdbCreateCommitIters(SCommitH *pCommith) {
|
||||||
pTbData = (STbData *)pNode->pData;
|
pTbData = (STbData *)pNode->pData;
|
||||||
|
|
||||||
pCommitIter = pCommith->iters + i;
|
pCommitIter = pCommith->iters + i;
|
||||||
pTSchema = metaGetTbTSchema(REPO_META(pRepo), pTbData->uid, -1); // TODO: schema version
|
pTSchema = metaGetTbTSchema(REPO_META(pRepo), pTbData->uid, -1);
|
||||||
|
|
||||||
if (pTSchema) {
|
if (pTSchema) {
|
||||||
pCommitIter->pIter = tSkipListCreateIter(pTbData->pData);
|
pCommitIter->pIter = tSkipListCreateIter(pTbData->pData);
|
||||||
|
@ -475,7 +475,8 @@ static int tsdbCreateCommitIters(SCommitH *pCommith) {
|
||||||
pCommitIter->pTable = (STable *)taosMemoryMalloc(sizeof(STable));
|
pCommitIter->pTable = (STable *)taosMemoryMalloc(sizeof(STable));
|
||||||
pCommitIter->pTable->uid = pTbData->uid;
|
pCommitIter->pTable->uid = pTbData->uid;
|
||||||
pCommitIter->pTable->tid = pTbData->uid;
|
pCommitIter->pTable->tid = pTbData->uid;
|
||||||
pCommitIter->pTable->pSchema = pTSchema; // metaGetTbTSchema(REPO_META(pRepo), pTbData->uid, 0);
|
pCommitIter->pTable->pSchema = pTSchema;
|
||||||
|
pCommitIter->pTable->pCacheSchema = NULL;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
tSkipListDestroyIter(pSlIter);
|
tSkipListDestroyIter(pSlIter);
|
||||||
|
@ -490,6 +491,7 @@ static void tsdbDestroyCommitIters(SCommitH *pCommith) {
|
||||||
tSkipListDestroyIter(pCommith->iters[i].pIter);
|
tSkipListDestroyIter(pCommith->iters[i].pIter);
|
||||||
if (pCommith->iters[i].pTable) {
|
if (pCommith->iters[i].pTable) {
|
||||||
tdFreeSchema(pCommith->iters[i].pTable->pSchema);
|
tdFreeSchema(pCommith->iters[i].pTable->pSchema);
|
||||||
|
tdFreeSchema(pCommith->iters[i].pTable->pCacheSchema);
|
||||||
taosMemoryFreeClear(pCommith->iters[i].pTable);
|
taosMemoryFreeClear(pCommith->iters[i].pTable);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -914,7 +916,7 @@ static int tsdbMoveBlkIdx(SCommitH *pCommith, SBlockIdx *pIdx) {
|
||||||
while (bidx < nBlocks) {
|
while (bidx < nBlocks) {
|
||||||
if (!pTSchema && !tsdbCommitIsSameFile(pCommith, bidx)) {
|
if (!pTSchema && !tsdbCommitIsSameFile(pCommith, bidx)) {
|
||||||
// Set commit table
|
// Set commit table
|
||||||
pTSchema = metaGetTbTSchema(REPO_META(pTsdb), pIdx->uid, 1); // TODO: schema version
|
pTSchema = metaGetTbTSchema(REPO_META(pTsdb), pIdx->uid, -1); // TODO: schema version
|
||||||
if (!pTSchema) {
|
if (!pTSchema) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return -1;
|
return -1;
|
||||||
|
|
Loading…
Reference in New Issue