enh: (last) rm rwritebatch

This commit is contained in:
Shungang Li 2024-09-10 17:48:47 +08:00
parent c33b57fa65
commit fede52d5b7
2 changed files with 14 additions and 18 deletions

View File

@ -342,7 +342,6 @@ typedef struct {
rocksdb_writeoptions_t *writeoptions; rocksdb_writeoptions_t *writeoptions;
rocksdb_readoptions_t *readoptions; rocksdb_readoptions_t *readoptions;
rocksdb_writebatch_t *writebatch; rocksdb_writebatch_t *writebatch;
rocksdb_writebatch_t *rwritebatch;
STSchema *pTSchema; STSchema *pTSchema;
} SRocksCache; } SRocksCache;

View File

@ -214,10 +214,8 @@ static int32_t tsdbOpenRocksCache(STsdb *pTsdb) {
} }
rocksdb_writebatch_t *writebatch = rocksdb_writebatch_create(); rocksdb_writebatch_t *writebatch = rocksdb_writebatch_create();
rocksdb_writebatch_t *rwritebatch = rocksdb_writebatch_create();
pTsdb->rCache.writebatch = writebatch; pTsdb->rCache.writebatch = writebatch;
pTsdb->rCache.rwritebatch = rwritebatch;
pTsdb->rCache.my_comparator = cmp; pTsdb->rCache.my_comparator = cmp;
pTsdb->rCache.options = options; pTsdb->rCache.options = options;
pTsdb->rCache.writeoptions = writeoptions; pTsdb->rCache.writeoptions = writeoptions;
@ -248,7 +246,6 @@ static void tsdbCloseRocksCache(STsdb *pTsdb) {
rocksdb_close(pTsdb->rCache.db); rocksdb_close(pTsdb->rCache.db);
rocksdb_flushoptions_destroy(pTsdb->rCache.flushoptions); rocksdb_flushoptions_destroy(pTsdb->rCache.flushoptions);
rocksdb_writebatch_destroy(pTsdb->rCache.writebatch); rocksdb_writebatch_destroy(pTsdb->rCache.writebatch);
rocksdb_writebatch_destroy(pTsdb->rCache.rwritebatch);
rocksdb_readoptions_destroy(pTsdb->rCache.readoptions); rocksdb_readoptions_destroy(pTsdb->rCache.readoptions);
rocksdb_writeoptions_destroy(pTsdb->rCache.writeoptions); rocksdb_writeoptions_destroy(pTsdb->rCache.writeoptions);
rocksdb_options_destroy(pTsdb->rCache.options); rocksdb_options_destroy(pTsdb->rCache.options);
@ -258,8 +255,8 @@ static void tsdbCloseRocksCache(STsdb *pTsdb) {
taosMemoryFree(pTsdb->rCache.pTSchema); taosMemoryFree(pTsdb->rCache.pTSchema);
} }
static void rocksMayWrite(STsdb *pTsdb, bool force, bool read) { static void rocksMayWrite(STsdb *pTsdb, bool force) {
rocksdb_writebatch_t *wb = read ? pTsdb->rCache.rwritebatch : pTsdb->rCache.writebatch; rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
int count = rocksdb_writebatch_count(wb); int count = rocksdb_writebatch_count(wb);
if ((force && count > 0) || count >= ROCKS_BATCH_SIZE) { if ((force && count > 0) || count >= ROCKS_BATCH_SIZE) {
@ -516,8 +513,7 @@ int32_t tsdbCacheCommit(STsdb *pTsdb) {
taosLRUCacheApply(pCache, tsdbCacheFlushDirty, &pTsdb->flushState); taosLRUCacheApply(pCache, tsdbCacheFlushDirty, &pTsdb->flushState);
rocksMayWrite(pTsdb, true, false); rocksMayWrite(pTsdb, true);
rocksMayWrite(pTsdb, true, true);
rocksdb_flush(pTsdb->rCache.db, pTsdb->rCache.flushoptions, &err); rocksdb_flush(pTsdb->rCache.db, pTsdb->rCache.flushoptions, &err);
(void)taosThreadMutexUnlock(&pTsdb->lruMutex); (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
@ -667,8 +663,7 @@ int32_t tsdbCacheCommitNoLock(STsdb *pTsdb) {
taosLRUCacheApply(pCache, tsdbCacheFlushDirty, &pTsdb->flushState); taosLRUCacheApply(pCache, tsdbCacheFlushDirty, &pTsdb->flushState);
rocksMayWrite(pTsdb, true, false); rocksMayWrite(pTsdb, true);
rocksMayWrite(pTsdb, true, true);
rocksdb_flush(pTsdb->rCache.db, pTsdb->rCache.flushoptions, &err); rocksdb_flush(pTsdb->rCache.db, pTsdb->rCache.flushoptions, &err);
if (NULL != err) { if (NULL != err) {
@ -862,7 +857,7 @@ int32_t tsdbCacheDropTable(STsdb *pTsdb, tb_uid_t uid, tb_uid_t suid, SSchemaWra
taosMemoryFree(pTSchema); taosMemoryFree(pTSchema);
} }
rocksMayWrite(pTsdb, true, false); rocksMayWrite(pTsdb, true);
(void)taosThreadMutexUnlock(&pTsdb->lruMutex); (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
@ -903,7 +898,7 @@ int32_t tsdbCacheDropSubTables(STsdb *pTsdb, SArray *uids, tb_uid_t suid) {
taosMemoryFree(pTSchema); taosMemoryFree(pTSchema);
rocksMayWrite(pTsdb, true, false); rocksMayWrite(pTsdb, true);
(void)taosThreadMutexUnlock(&pTsdb->lruMutex); (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
@ -934,7 +929,7 @@ int32_t tsdbCacheDropNTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, bool h
(void)tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey); (void)tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey);
rocksMayWrite(pTsdb, true, false); rocksMayWrite(pTsdb, true);
(void)taosThreadMutexUnlock(&pTsdb->lruMutex); (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
@ -973,7 +968,7 @@ int32_t tsdbCacheDropSTableColumn(STsdb *pTsdb, SArray *uids, int16_t cid, bool
(void)tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey); (void)tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey);
} }
rocksMayWrite(pTsdb, true, false); rocksMayWrite(pTsdb, true);
(void)taosThreadMutexUnlock(&pTsdb->lruMutex); (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
@ -1243,7 +1238,7 @@ static int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SArray
taosMemoryFreeClear(pToFree); taosMemoryFreeClear(pToFree);
} }
rocksMayWrite(pTsdb, true, false); rocksMayWrite(pTsdb, true);
taosMemoryFree(keys_list); taosMemoryFree(keys_list);
taosMemoryFree(keys_list_sizes); taosMemoryFree(keys_list_sizes);
@ -1582,7 +1577,7 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr
} }
// store result back to rocks cache // store result back to rocks cache
wb = pTsdb->rCache.rwritebatch; wb = pTsdb->rCache.writebatch;
char *value = NULL; char *value = NULL;
size_t vlen = 0; size_t vlen = 0;
code = tsdbCacheSerialize(pLastCol, &value, &vlen); code = tsdbCacheSerialize(pLastCol, &value, &vlen);
@ -1597,7 +1592,7 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr
} }
if (wb) { if (wb) {
rocksMayWrite(pTsdb, false, true); rocksMayWrite(pTsdb, false);
} }
_exit: _exit:
@ -1797,6 +1792,8 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache
if (remainCols && TARRAY_SIZE(remainCols) > 0) { if (remainCols && TARRAY_SIZE(remainCols) > 0) {
(void)taosThreadMutexLock(&pTsdb->lruMutex); (void)taosThreadMutexLock(&pTsdb->lruMutex);
rocksMayWrite(pTsdb, false);
for (int i = 0; i < TARRAY_SIZE(remainCols);) { for (int i = 0; i < TARRAY_SIZE(remainCols);) {
SIdxKey *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[i]; SIdxKey *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[i];
LRUHandle *h = taosLRUCacheLookup(pCache, &idxKey->key, ROCKS_KEY_LEN); LRUHandle *h = taosLRUCacheLookup(pCache, &idxKey->key, ROCKS_KEY_LEN);
@ -1944,7 +1941,7 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE
taosMemoryFreeClear(pLastCol); taosMemoryFreeClear(pLastCol);
} }
rocksMayWrite(pTsdb, true, false); rocksMayWrite(pTsdb, true);
_exit: _exit:
(void)taosThreadMutexUnlock(&pTsdb->lruMutex); (void)taosThreadMutexUnlock(&pTsdb->lruMutex);