From fede52d5b7743ec553d10eaac72af5d694f6568d Mon Sep 17 00:00:00 2001 From: Shungang Li Date: Tue, 10 Sep 2024 17:48:47 +0800 Subject: [PATCH 1/3] enh: (last) rm rwritebatch --- source/dnode/vnode/src/inc/tsdb.h | 1 - source/dnode/vnode/src/tsdb/tsdbCache.c | 31 +++++++++++-------------- 2 files changed, 14 insertions(+), 18 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 59d70889e5..c85c376c47 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -342,7 +342,6 @@ typedef struct { rocksdb_writeoptions_t *writeoptions; rocksdb_readoptions_t *readoptions; rocksdb_writebatch_t *writebatch; - rocksdb_writebatch_t *rwritebatch; STSchema *pTSchema; } SRocksCache; diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 3a3e20612e..f4a423bfa7 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -214,10 +214,8 @@ static int32_t tsdbOpenRocksCache(STsdb *pTsdb) { } rocksdb_writebatch_t *writebatch = rocksdb_writebatch_create(); - rocksdb_writebatch_t *rwritebatch = rocksdb_writebatch_create(); pTsdb->rCache.writebatch = writebatch; - pTsdb->rCache.rwritebatch = rwritebatch; pTsdb->rCache.my_comparator = cmp; pTsdb->rCache.options = options; pTsdb->rCache.writeoptions = writeoptions; @@ -248,7 +246,6 @@ static void tsdbCloseRocksCache(STsdb *pTsdb) { rocksdb_close(pTsdb->rCache.db); rocksdb_flushoptions_destroy(pTsdb->rCache.flushoptions); rocksdb_writebatch_destroy(pTsdb->rCache.writebatch); - rocksdb_writebatch_destroy(pTsdb->rCache.rwritebatch); rocksdb_readoptions_destroy(pTsdb->rCache.readoptions); rocksdb_writeoptions_destroy(pTsdb->rCache.writeoptions); rocksdb_options_destroy(pTsdb->rCache.options); @@ -258,8 +255,8 @@ static void tsdbCloseRocksCache(STsdb *pTsdb) { taosMemoryFree(pTsdb->rCache.pTSchema); } -static void rocksMayWrite(STsdb *pTsdb, bool force, bool read) { - rocksdb_writebatch_t *wb = read ? pTsdb->rCache.rwritebatch : pTsdb->rCache.writebatch; +static void rocksMayWrite(STsdb *pTsdb, bool force) { + rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch; int count = rocksdb_writebatch_count(wb); if ((force && count > 0) || count >= ROCKS_BATCH_SIZE) { @@ -516,8 +513,7 @@ int32_t tsdbCacheCommit(STsdb *pTsdb) { taosLRUCacheApply(pCache, tsdbCacheFlushDirty, &pTsdb->flushState); - rocksMayWrite(pTsdb, true, false); - rocksMayWrite(pTsdb, true, true); + rocksMayWrite(pTsdb, true); rocksdb_flush(pTsdb->rCache.db, pTsdb->rCache.flushoptions, &err); (void)taosThreadMutexUnlock(&pTsdb->lruMutex); @@ -667,8 +663,7 @@ int32_t tsdbCacheCommitNoLock(STsdb *pTsdb) { taosLRUCacheApply(pCache, tsdbCacheFlushDirty, &pTsdb->flushState); - rocksMayWrite(pTsdb, true, false); - rocksMayWrite(pTsdb, true, true); + rocksMayWrite(pTsdb, true); rocksdb_flush(pTsdb->rCache.db, pTsdb->rCache.flushoptions, &err); if (NULL != err) { @@ -862,7 +857,7 @@ int32_t tsdbCacheDropTable(STsdb *pTsdb, tb_uid_t uid, tb_uid_t suid, SSchemaWra taosMemoryFree(pTSchema); } - rocksMayWrite(pTsdb, true, false); + rocksMayWrite(pTsdb, true); (void)taosThreadMutexUnlock(&pTsdb->lruMutex); @@ -903,7 +898,7 @@ int32_t tsdbCacheDropSubTables(STsdb *pTsdb, SArray *uids, tb_uid_t suid) { taosMemoryFree(pTSchema); - rocksMayWrite(pTsdb, true, false); + rocksMayWrite(pTsdb, true); (void)taosThreadMutexUnlock(&pTsdb->lruMutex); @@ -934,7 +929,7 @@ int32_t tsdbCacheDropNTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, bool h (void)tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey); - rocksMayWrite(pTsdb, true, false); + rocksMayWrite(pTsdb, true); (void)taosThreadMutexUnlock(&pTsdb->lruMutex); @@ -973,7 +968,7 @@ int32_t tsdbCacheDropSTableColumn(STsdb *pTsdb, SArray *uids, int16_t cid, bool (void)tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey); } - rocksMayWrite(pTsdb, true, false); + rocksMayWrite(pTsdb, true); (void)taosThreadMutexUnlock(&pTsdb->lruMutex); @@ -1243,7 +1238,7 @@ static int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SArray taosMemoryFreeClear(pToFree); } - rocksMayWrite(pTsdb, true, false); + rocksMayWrite(pTsdb, true); taosMemoryFree(keys_list); taosMemoryFree(keys_list_sizes); @@ -1582,7 +1577,7 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr } // store result back to rocks cache - wb = pTsdb->rCache.rwritebatch; + wb = pTsdb->rCache.writebatch; char *value = NULL; size_t vlen = 0; code = tsdbCacheSerialize(pLastCol, &value, &vlen); @@ -1597,7 +1592,7 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr } if (wb) { - rocksMayWrite(pTsdb, false, true); + rocksMayWrite(pTsdb, false); } _exit: @@ -1797,6 +1792,8 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache if (remainCols && TARRAY_SIZE(remainCols) > 0) { (void)taosThreadMutexLock(&pTsdb->lruMutex); + rocksMayWrite(pTsdb, false); + for (int i = 0; i < TARRAY_SIZE(remainCols);) { SIdxKey *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[i]; LRUHandle *h = taosLRUCacheLookup(pCache, &idxKey->key, ROCKS_KEY_LEN); @@ -1944,7 +1941,7 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE taosMemoryFreeClear(pLastCol); } - rocksMayWrite(pTsdb, true, false); + rocksMayWrite(pTsdb, true); _exit: (void)taosThreadMutexUnlock(&pTsdb->lruMutex); From 44a955218cfcedb9b9445e058fcb3a87e4177c95 Mon Sep 17 00:00:00 2001 From: Shungang Li Date: Wed, 11 Sep 2024 13:48:23 +0800 Subject: [PATCH 2/3] fix: (last) call rocksdb_write before rocksdb_multi_get --- source/dnode/vnode/src/inc/tsdb.h | 1 - source/dnode/vnode/src/tsdb/tsdbCache.c | 111 +++++++++--------------- 2 files changed, 40 insertions(+), 72 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index c85c376c47..d4a33a5478 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -362,7 +362,6 @@ struct STsdb { SMemTable *imem; STsdbFS fs; // old SLRUCache *lruCache; - SCacheFlushState flushState; TdThreadMutex lruMutex; SLRUCache *biCache; TdThreadMutex biMutex; diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index f4a423bfa7..53e20d459f 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -267,7 +267,6 @@ static void rocksMayWrite(STsdb *pTsdb, bool force) { tsdbError("vgId:%d, %s failed at line %d, count: %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, count, err); rocksdb_free(err); - // pTsdb->flushState.flush_count = 0; } rocksdb_writebatch_clear(wb); @@ -456,47 +455,23 @@ static int32_t tsdbCacheSerialize(SLastCol *pLastCol, char **value, size_t *size TAOS_RETURN(TSDB_CODE_SUCCESS); } -static void tsdbCachePutBatch(SLastCol *pLastCol, const void *key, size_t klen, SCacheFlushState *state) { - int32_t code = 0; - STsdb *pTsdb = state->pTsdb; - SRocksCache *rCache = &pTsdb->rCache; - rocksdb_writebatch_t *wb = rCache->writebatch; - char *rocks_value = NULL; - size_t vlen = 0; - - code = tsdbCacheSerialize(pLastCol, &rocks_value, &vlen); - if (code) { - tsdbError("tsdb/cache: vgId:%d, serialize failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code)); - return; - } - - rocksdb_writebatch_put(wb, (char *)key, klen, rocks_value, vlen); - - taosMemoryFree(rocks_value); - - if (++state->flush_count >= ROCKS_BATCH_SIZE) { - char *err = NULL; - - rocksdb_write(rCache->db, rCache->writeoptions, wb, &err); - if (NULL != err) { - tsdbError("vgId:%d, %s failed at line %d, count: %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, - state->flush_count, err); - rocksdb_free(err); - } - - rocksdb_writebatch_clear(wb); - - state->flush_count = 0; - } -} +static int32_t tsdbCachePutToRocksdb(STsdb *pTsdb, SLastKey *pLastKey, SLastCol *pLastCol); int tsdbCacheFlushDirty(const void *key, size_t klen, void *value, void *ud) { SLastCol *pLastCol = (SLastCol *)value; if (pLastCol->dirty) { - tsdbCachePutBatch(pLastCol, key, klen, (SCacheFlushState *)ud); + STsdb *pTsdb = (STsdb *)ud; + + int32_t code = tsdbCachePutToRocksdb(pTsdb, (SLastKey *)key, pLastCol); + if (code) { + tsdbError("tsdb/cache: vgId:%d, flush dirty lru failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code)); + return code; + } pLastCol->dirty = 0; + + rocksMayWrite(pTsdb, false); } return 0; @@ -511,7 +486,7 @@ int32_t tsdbCacheCommit(STsdb *pTsdb) { (void)taosThreadMutexLock(&pTsdb->lruMutex); - taosLRUCacheApply(pCache, tsdbCacheFlushDirty, &pTsdb->flushState); + taosLRUCacheApply(pCache, tsdbCacheFlushDirty, pTsdb); rocksMayWrite(pTsdb, true); rocksdb_flush(pTsdb->rCache.db, pTsdb->rCache.flushoptions, &err); @@ -602,7 +577,7 @@ static void tsdbCacheDeleter(const void *key, size_t klen, void *value, void *ud SLastCol *pLastCol = (SLastCol *)value; if (pLastCol->dirty) { - tsdbCachePutBatch(pLastCol, key, klen, (SCacheFlushState *)ud); + tsdbCacheFlushDirty(key, klen, pLastCol, ud); } for (uint8_t i = 0; i < pLastCol->rowKey.numOfPKs; ++i) { @@ -639,11 +614,11 @@ static int32_t tsdbCacheNewTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, i SLastKey *pLastKey = &(SLastKey){.lflag = lflag, .uid = uid, .cid = cid}; LRUStatus status = taosLRUCacheInsert(pCache, pLastKey, ROCKS_KEY_LEN, pLastCol, charge, tsdbCacheDeleter, NULL, - TAOS_LRU_PRIORITY_LOW, &pTsdb->flushState); + TAOS_LRU_PRIORITY_LOW, pTsdb); if (status != TAOS_LRU_STATUS_OK) { tsdbError("vgId:%d, %s failed at line %d status %d.", TD_VID(pTsdb->pVnode), __func__, __LINE__, status); - tsdbCacheFreeSLastColItem(pLastCol); code = TSDB_CODE_FAILED; + pLastCol = NULL; } _exit: @@ -661,7 +636,7 @@ int32_t tsdbCacheCommitNoLock(STsdb *pTsdb) { SLRUCache *pCache = pTsdb->lruCache; rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch; - taosLRUCacheApply(pCache, tsdbCacheFlushDirty, &pTsdb->flushState); + taosLRUCacheApply(pCache, tsdbCacheFlushDirty, pTsdb); rocksMayWrite(pTsdb, true); rocksdb_flush(pTsdb->rCache.db, pTsdb->rCache.flushoptions, &err); @@ -733,6 +708,10 @@ static int32_t tsdbCacheDropTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, char **values_list = NULL; size_t *values_list_sizes = NULL; + + // was written by caller + // rocksMayWrite(pTsdb, true); // flush writebatch cache + TAOS_CHECK_GOTO(tsdbCacheGetValuesFromRocks(pTsdb, 2, (const char *const *)keys_list, keys_list_sizes, &values_list, &values_list_sizes), NULL, _exit); @@ -857,7 +836,7 @@ int32_t tsdbCacheDropTable(STsdb *pTsdb, tb_uid_t uid, tb_uid_t suid, SSchemaWra taosMemoryFree(pTSchema); } - rocksMayWrite(pTsdb, true); + rocksMayWrite(pTsdb, false); (void)taosThreadMutexUnlock(&pTsdb->lruMutex); @@ -898,7 +877,7 @@ int32_t tsdbCacheDropSubTables(STsdb *pTsdb, SArray *uids, tb_uid_t suid) { taosMemoryFree(pTSchema); - rocksMayWrite(pTsdb, true); + rocksMayWrite(pTsdb, false); (void)taosThreadMutexUnlock(&pTsdb->lruMutex); @@ -929,7 +908,7 @@ int32_t tsdbCacheDropNTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, bool h (void)tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey); - rocksMayWrite(pTsdb, true); + rocksMayWrite(pTsdb, false); (void)taosThreadMutexUnlock(&pTsdb->lruMutex); @@ -968,7 +947,7 @@ int32_t tsdbCacheDropSTableColumn(STsdb *pTsdb, SArray *uids, int16_t cid, bool (void)tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey); } - rocksMayWrite(pTsdb, true); + rocksMayWrite(pTsdb, false); (void)taosThreadMutexUnlock(&pTsdb->lruMutex); @@ -1078,11 +1057,11 @@ static int32_t tsdbCachePutToLRU(STsdb *pTsdb, SLastKey *pLastKey, SLastCol *pLa TAOS_CHECK_EXIT(tsdbCacheReallocSLastCol(pLRULastCol, &charge)); LRUStatus status = taosLRUCacheInsert(pTsdb->lruCache, pLastKey, ROCKS_KEY_LEN, pLRULastCol, charge, tsdbCacheDeleter, - NULL, TAOS_LRU_PRIORITY_LOW, &pTsdb->flushState); + NULL, TAOS_LRU_PRIORITY_LOW, pTsdb); if (TAOS_LRU_STATUS_OK != status && TAOS_LRU_STATUS_OK_OVERWRITTEN != status) { tsdbError("vgId:%d, %s failed at line %d status %d.", TD_VID(pTsdb->pVnode), __func__, __LINE__, status); - tsdbCacheFreeSLastColItem(pLRULastCol); code = TSDB_CODE_FAILED; + pLRULastCol = NULL; } _exit: @@ -1172,6 +1151,8 @@ static int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SArray keys_list_sizes[i] = ROCKS_KEY_LEN; } + rocksMayWrite(pTsdb, true); // flush writebatch cache + code = tsdbCacheGetValuesFromRocks(pTsdb, num_keys, (const char *const *)keys_list, keys_list_sizes, &values_list, &values_list_sizes); if (code) { @@ -1238,7 +1219,7 @@ static int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SArray taosMemoryFreeClear(pToFree); } - rocksMayWrite(pTsdb, true); + rocksMayWrite(pTsdb, false); taosMemoryFree(keys_list); taosMemoryFree(keys_list_sizes); @@ -1568,32 +1549,22 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr } LRUStatus status = taosLRUCacheInsert(pCache, &idxKey->key, ROCKS_KEY_LEN, pLastCol, charge, tsdbCacheDeleter, NULL, - TAOS_LRU_PRIORITY_LOW, &pTsdb->flushState); + TAOS_LRU_PRIORITY_LOW, pTsdb); if (TAOS_LRU_STATUS_OK != status && TAOS_LRU_STATUS_OK_OVERWRITTEN != status) { tsdbError("vgId:%d, %s failed at line %d status %d.", TD_VID(pTsdb->pVnode), __func__, __LINE__, status); - tsdbCacheFreeSLastColItem(pLastCol); - taosMemoryFree(pLastCol); + pLastCol = NULL; TAOS_CHECK_EXIT(TSDB_CODE_FAILED); } // store result back to rocks cache - wb = pTsdb->rCache.writebatch; - char *value = NULL; - size_t vlen = 0; - code = tsdbCacheSerialize(pLastCol, &value, &vlen); + code = tsdbCachePutToRocksdb(pTsdb, &idxKey->key, pLastCol); if (code) { - tsdbError("tsdb/cache: vgId:%d, serialize failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code)); - } else { - SLastKey *key = &idxKey->key; - size_t klen = ROCKS_KEY_LEN; - rocksdb_writebatch_put(wb, (char *)key, klen, value, vlen); - taosMemoryFree(value); + tsdbError("vgId:%d, %s failed at line %d since %s.", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code)); + TAOS_CHECK_EXIT(code); } } - if (wb) { - rocksMayWrite(pTsdb, false); - } + rocksMayWrite(pTsdb, false); _exit: taosArrayDestroy(lastrowTmpIndexArray); @@ -1633,6 +1604,8 @@ static int32_t tsdbCacheLoadFromRocks(STsdb *pTsdb, tb_uid_t uid, SArray *pLastA keys_list_sizes[i] = ROCKS_KEY_LEN; } + rocksMayWrite(pTsdb, true); // flush writebatch cache + code = tsdbCacheGetValuesFromRocks(pTsdb, num_keys, (const char *const *)keys_list, keys_list_sizes, &values_list, &values_list_sizes); if (code) { @@ -1681,11 +1654,9 @@ static int32_t tsdbCacheLoadFromRocks(STsdb *pTsdb, tb_uid_t uid, SArray *pLastA } LRUStatus status = taosLRUCacheInsert(pCache, &idxKey->key, ROCKS_KEY_LEN, pLastCol, charge, tsdbCacheDeleter, - NULL, TAOS_LRU_PRIORITY_LOW, &pTsdb->flushState); + NULL, TAOS_LRU_PRIORITY_LOW, pTsdb); if (TAOS_LRU_STATUS_OK != status && TAOS_LRU_STATUS_OK_OVERWRITTEN != status) { tsdbError("vgId:%d, %s failed at line %d status %d.", TD_VID(pTsdb->pVnode), __func__, __LINE__, status); - tsdbCacheFreeSLastColItem(pLastCol); - taosMemoryFreeClear(pLastCol); taosMemoryFreeClear(pToFree); TAOS_CHECK_EXIT(TSDB_CODE_FAILED); } @@ -1792,7 +1763,6 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache if (remainCols && TARRAY_SIZE(remainCols) > 0) { (void)taosThreadMutexLock(&pTsdb->lruMutex); - rocksMayWrite(pTsdb, false); for (int i = 0; i < TARRAY_SIZE(remainCols);) { SIdxKey *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[i]; @@ -1907,6 +1877,8 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE keys_list_sizes[i] = klen; } + rocksMayWrite(pTsdb, true); // flush writebatch cache + TAOS_CHECK_GOTO(tsdbCacheGetValuesFromRocks(pTsdb, numKeys, (const char *const *)keys_list, keys_list_sizes, &values_list, &values_list_sizes), NULL, _exit); @@ -1941,7 +1913,7 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE taosMemoryFreeClear(pLastCol); } - rocksMayWrite(pTsdb, true); + rocksMayWrite(pTsdb, false); _exit: (void)taosThreadMutexUnlock(&pTsdb->lruMutex); @@ -1983,9 +1955,6 @@ int32_t tsdbOpenCache(STsdb *pTsdb) { (void)taosThreadMutexInit(&pTsdb->lruMutex, NULL); - pTsdb->flushState.pTsdb = pTsdb; - pTsdb->flushState.flush_count = 0; - _err: if (code) { tsdbError("tsdb/cache: vgId:%d, open failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino, tstrerror(code)); From 4d103c09d490f67fd5b6fb836d91449a8731433c Mon Sep 17 00:00:00 2001 From: Shungang Li Date: Wed, 11 Sep 2024 19:22:04 +0800 Subject: [PATCH 3/3] fix: ignore tsdbCacheFlushDirty errcode --- source/dnode/vnode/src/tsdb/tsdbCache.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 53e20d459f..74bd837775 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -577,7 +577,7 @@ static void tsdbCacheDeleter(const void *key, size_t klen, void *value, void *ud SLastCol *pLastCol = (SLastCol *)value; if (pLastCol->dirty) { - tsdbCacheFlushDirty(key, klen, pLastCol, ud); + (void)tsdbCacheFlushDirty(key, klen, pLastCol, ud); } for (uint8_t i = 0; i < pLastCol->rowKey.numOfPKs; ++i) {