diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 30efee42e5..ed8f99ec75 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -342,6 +342,7 @@ typedef struct { rocksdb_writeoptions_t *writeoptions; rocksdb_readoptions_t *readoptions; rocksdb_writebatch_t *writebatch; + TdThreadMutex writeBatchMutex; STSchema *pTSchema; } SRocksCache; diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 00b7f38b8f..89a51eb0f5 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -221,6 +221,8 @@ static int32_t tsdbOpenRocksCache(STsdb *pTsdb) { rocksdb_writebatch_t *writebatch = rocksdb_writebatch_create(); + TAOS_CHECK_GOTO(taosThreadMutexInit(&pTsdb->rCache.writeBatchMutex, NULL), &lino, _err6) ; + pTsdb->rCache.writebatch = writebatch; pTsdb->rCache.my_comparator = cmp; pTsdb->rCache.options = options; @@ -232,6 +234,8 @@ static int32_t tsdbOpenRocksCache(STsdb *pTsdb) { TAOS_RETURN(code); +_err6: + rocksdb_writebatch_destroy(writebatch); _err5: rocksdb_close(pTsdb->rCache.db); _err4: @@ -250,6 +254,7 @@ _err: static void tsdbCloseRocksCache(STsdb *pTsdb) { rocksdb_close(pTsdb->rCache.db); + (void)taosThreadMutexDestroy(&pTsdb->rCache.writeBatchMutex); rocksdb_flushoptions_destroy(pTsdb->rCache.flushoptions); rocksdb_writebatch_destroy(pTsdb->rCache.writebatch); rocksdb_readoptions_destroy(pTsdb->rCache.readoptions); @@ -1077,7 +1082,9 @@ static int32_t tsdbCachePutToRocksdb(STsdb *pTsdb, SLastKey *pLastKey, SLastCol } rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch; + (void)taosThreadMutexLock(&pTsdb->rCache.writeBatchMutex); rocksdb_writebatch_put(wb, (char *)pLastKey, ROCKS_KEY_LEN, rocks_value, vlen); + (void)taosThreadMutexUnlock(&pTsdb->rCache.writeBatchMutex); taosMemoryFree(rocks_value);