From fede52d5b7743ec553d10eaac72af5d694f6568d Mon Sep 17 00:00:00 2001 From: Shungang Li Date: Tue, 10 Sep 2024 17:48:47 +0800 Subject: [PATCH] enh: (last) rm rwritebatch --- source/dnode/vnode/src/inc/tsdb.h | 1 - source/dnode/vnode/src/tsdb/tsdbCache.c | 31 +++++++++++-------------- 2 files changed, 14 insertions(+), 18 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 59d70889e5..c85c376c47 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -342,7 +342,6 @@ typedef struct { rocksdb_writeoptions_t *writeoptions; rocksdb_readoptions_t *readoptions; rocksdb_writebatch_t *writebatch; - rocksdb_writebatch_t *rwritebatch; STSchema *pTSchema; } SRocksCache; diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 3a3e20612e..f4a423bfa7 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -214,10 +214,8 @@ static int32_t tsdbOpenRocksCache(STsdb *pTsdb) { } rocksdb_writebatch_t *writebatch = rocksdb_writebatch_create(); - rocksdb_writebatch_t *rwritebatch = rocksdb_writebatch_create(); pTsdb->rCache.writebatch = writebatch; - pTsdb->rCache.rwritebatch = rwritebatch; pTsdb->rCache.my_comparator = cmp; pTsdb->rCache.options = options; pTsdb->rCache.writeoptions = writeoptions; @@ -248,7 +246,6 @@ static void tsdbCloseRocksCache(STsdb *pTsdb) { rocksdb_close(pTsdb->rCache.db); rocksdb_flushoptions_destroy(pTsdb->rCache.flushoptions); rocksdb_writebatch_destroy(pTsdb->rCache.writebatch); - rocksdb_writebatch_destroy(pTsdb->rCache.rwritebatch); rocksdb_readoptions_destroy(pTsdb->rCache.readoptions); rocksdb_writeoptions_destroy(pTsdb->rCache.writeoptions); rocksdb_options_destroy(pTsdb->rCache.options); @@ -258,8 +255,8 @@ static void tsdbCloseRocksCache(STsdb *pTsdb) { taosMemoryFree(pTsdb->rCache.pTSchema); } -static void rocksMayWrite(STsdb *pTsdb, bool force, bool read) { - rocksdb_writebatch_t *wb = read ? pTsdb->rCache.rwritebatch : pTsdb->rCache.writebatch; +static void rocksMayWrite(STsdb *pTsdb, bool force) { + rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch; int count = rocksdb_writebatch_count(wb); if ((force && count > 0) || count >= ROCKS_BATCH_SIZE) { @@ -516,8 +513,7 @@ int32_t tsdbCacheCommit(STsdb *pTsdb) { taosLRUCacheApply(pCache, tsdbCacheFlushDirty, &pTsdb->flushState); - rocksMayWrite(pTsdb, true, false); - rocksMayWrite(pTsdb, true, true); + rocksMayWrite(pTsdb, true); rocksdb_flush(pTsdb->rCache.db, pTsdb->rCache.flushoptions, &err); (void)taosThreadMutexUnlock(&pTsdb->lruMutex); @@ -667,8 +663,7 @@ int32_t tsdbCacheCommitNoLock(STsdb *pTsdb) { taosLRUCacheApply(pCache, tsdbCacheFlushDirty, &pTsdb->flushState); - rocksMayWrite(pTsdb, true, false); - rocksMayWrite(pTsdb, true, true); + rocksMayWrite(pTsdb, true); rocksdb_flush(pTsdb->rCache.db, pTsdb->rCache.flushoptions, &err); if (NULL != err) { @@ -862,7 +857,7 @@ int32_t tsdbCacheDropTable(STsdb *pTsdb, tb_uid_t uid, tb_uid_t suid, SSchemaWra taosMemoryFree(pTSchema); } - rocksMayWrite(pTsdb, true, false); + rocksMayWrite(pTsdb, true); (void)taosThreadMutexUnlock(&pTsdb->lruMutex); @@ -903,7 +898,7 @@ int32_t tsdbCacheDropSubTables(STsdb *pTsdb, SArray *uids, tb_uid_t suid) { taosMemoryFree(pTSchema); - rocksMayWrite(pTsdb, true, false); + rocksMayWrite(pTsdb, true); (void)taosThreadMutexUnlock(&pTsdb->lruMutex); @@ -934,7 +929,7 @@ int32_t tsdbCacheDropNTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, bool h (void)tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey); - rocksMayWrite(pTsdb, true, false); + rocksMayWrite(pTsdb, true); (void)taosThreadMutexUnlock(&pTsdb->lruMutex); @@ -973,7 +968,7 @@ int32_t tsdbCacheDropSTableColumn(STsdb *pTsdb, SArray *uids, int16_t cid, bool (void)tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey); } - rocksMayWrite(pTsdb, true, false); + rocksMayWrite(pTsdb, true); (void)taosThreadMutexUnlock(&pTsdb->lruMutex); @@ -1243,7 +1238,7 @@ static int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SArray taosMemoryFreeClear(pToFree); } - rocksMayWrite(pTsdb, true, false); + rocksMayWrite(pTsdb, true); taosMemoryFree(keys_list); taosMemoryFree(keys_list_sizes); @@ -1582,7 +1577,7 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr } // store result back to rocks cache - wb = pTsdb->rCache.rwritebatch; + wb = pTsdb->rCache.writebatch; char *value = NULL; size_t vlen = 0; code = tsdbCacheSerialize(pLastCol, &value, &vlen); @@ -1597,7 +1592,7 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr } if (wb) { - rocksMayWrite(pTsdb, false, true); + rocksMayWrite(pTsdb, false); } _exit: @@ -1797,6 +1792,8 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache if (remainCols && TARRAY_SIZE(remainCols) > 0) { (void)taosThreadMutexLock(&pTsdb->lruMutex); + rocksMayWrite(pTsdb, false); + for (int i = 0; i < TARRAY_SIZE(remainCols);) { SIdxKey *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[i]; LRUHandle *h = taosLRUCacheLookup(pCache, &idxKey->key, ROCKS_KEY_LEN); @@ -1944,7 +1941,7 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE taosMemoryFreeClear(pLastCol); } - rocksMayWrite(pTsdb, true, false); + rocksMayWrite(pTsdb, true); _exit: (void)taosThreadMutexUnlock(&pTsdb->lruMutex);