tsdb/cache: cache schema to reuse
This commit is contained in:
parent
fb48ab9298
commit
cf3e84e79d
|
@ -342,7 +342,9 @@ typedef struct {
|
||||||
rocksdb_writeoptions_t *writeoptions;
|
rocksdb_writeoptions_t *writeoptions;
|
||||||
rocksdb_readoptions_t *readoptions;
|
rocksdb_readoptions_t *readoptions;
|
||||||
rocksdb_writebatch_t *writebatch;
|
rocksdb_writebatch_t *writebatch;
|
||||||
TdThreadMutex writeBatchMutex;
|
TdThreadMutex writeBatchMutex;
|
||||||
|
tb_uid_t suid;
|
||||||
|
tb_uid_t uid;
|
||||||
STSchema *pTSchema;
|
STSchema *pTSchema;
|
||||||
} SRocksCache;
|
} SRocksCache;
|
||||||
|
|
||||||
|
|
|
@ -221,7 +221,7 @@ static int32_t tsdbOpenRocksCache(STsdb *pTsdb) {
|
||||||
|
|
||||||
rocksdb_writebatch_t *writebatch = rocksdb_writebatch_create();
|
rocksdb_writebatch_t *writebatch = rocksdb_writebatch_create();
|
||||||
|
|
||||||
TAOS_CHECK_GOTO(taosThreadMutexInit(&pTsdb->rCache.writeBatchMutex, NULL), &lino, _err6) ;
|
TAOS_CHECK_GOTO(taosThreadMutexInit(&pTsdb->rCache.writeBatchMutex, NULL), &lino, _err6);
|
||||||
|
|
||||||
pTsdb->rCache.writebatch = writebatch;
|
pTsdb->rCache.writebatch = writebatch;
|
||||||
pTsdb->rCache.my_comparator = cmp;
|
pTsdb->rCache.my_comparator = cmp;
|
||||||
|
@ -230,6 +230,8 @@ static int32_t tsdbOpenRocksCache(STsdb *pTsdb) {
|
||||||
pTsdb->rCache.readoptions = readoptions;
|
pTsdb->rCache.readoptions = readoptions;
|
||||||
pTsdb->rCache.flushoptions = flushoptions;
|
pTsdb->rCache.flushoptions = flushoptions;
|
||||||
pTsdb->rCache.db = db;
|
pTsdb->rCache.db = db;
|
||||||
|
pTsdb->rCache.suid = -1;
|
||||||
|
pTsdb->rCache.uid = -1;
|
||||||
pTsdb->rCache.pTSchema = NULL;
|
pTsdb->rCache.pTSchema = NULL;
|
||||||
|
|
||||||
TAOS_RETURN(code);
|
TAOS_RETURN(code);
|
||||||
|
@ -1302,6 +1304,22 @@ _exit:
|
||||||
TAOS_RETURN(code);
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t tsdbUpdateSkm(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid) {
|
||||||
|
if (suid) {
|
||||||
|
if (pTsdb->rCache.suid == suid) {
|
||||||
|
pTsdb->rCache.uid = uid;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
} else if (pTsdb->rCache.uid == uid) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
pTsdb->rCache.suid = suid;
|
||||||
|
pTsdb->rCache.uid = uid;
|
||||||
|
tDestroyTSchema(pTsdb->rCache.pTSchema);
|
||||||
|
return metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, -1, &pTsdb->rCache.pTSchema);
|
||||||
|
}
|
||||||
|
|
||||||
int32_t tsdbCacheRowFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int64_t version, int32_t nRow,
|
int32_t tsdbCacheRowFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int64_t version, int32_t nRow,
|
||||||
SRow **aRow) {
|
SRow **aRow) {
|
||||||
int32_t code = 0, lino = 0;
|
int32_t code = 0, lino = 0;
|
||||||
|
@ -1314,7 +1332,11 @@ int32_t tsdbCacheRowFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int6
|
||||||
SArray *ctxArray = NULL;
|
SArray *ctxArray = NULL;
|
||||||
SSHashObj *iColHash = NULL;
|
SSHashObj *iColHash = NULL;
|
||||||
|
|
||||||
|
TAOS_CHECK_GOTO(tsdbUpdateSkm(pTsdb, suid, uid), &lino, _exit);
|
||||||
|
pTSchema = pTsdb->rCache.pTSchema;
|
||||||
|
/*
|
||||||
TAOS_CHECK_GOTO(metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pTSchema), &lino, _exit);
|
TAOS_CHECK_GOTO(metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pTSchema), &lino, _exit);
|
||||||
|
*/
|
||||||
|
|
||||||
TSDBROW tRow = {.type = TSDBROW_ROW_FMT, .version = version};
|
TSDBROW tRow = {.type = TSDBROW_ROW_FMT, .version = version};
|
||||||
int32_t nCol = pTSchema->numOfCols;
|
int32_t nCol = pTSchema->numOfCols;
|
||||||
|
@ -1393,7 +1415,7 @@ int32_t tsdbCacheRowFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int6
|
||||||
}
|
}
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
taosMemoryFreeClear(pTSchema);
|
// taosMemoryFreeClear(pTSchema);
|
||||||
taosArrayDestroy(ctxArray);
|
taosArrayDestroy(ctxArray);
|
||||||
tSimpleHashCleanup(iColHash);
|
tSimpleHashCleanup(iColHash);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue