fix: (last) add lock for writebatch
This commit is contained in:
parent
bc0d958f58
commit
41f2653f0a
|
@ -342,6 +342,7 @@ typedef struct {
|
||||||
rocksdb_writeoptions_t *writeoptions;
|
rocksdb_writeoptions_t *writeoptions;
|
||||||
rocksdb_readoptions_t *readoptions;
|
rocksdb_readoptions_t *readoptions;
|
||||||
rocksdb_writebatch_t *writebatch;
|
rocksdb_writebatch_t *writebatch;
|
||||||
|
TdThreadMutex writeBatchMutex;
|
||||||
STSchema *pTSchema;
|
STSchema *pTSchema;
|
||||||
} SRocksCache;
|
} SRocksCache;
|
||||||
|
|
||||||
|
|
|
@ -221,6 +221,8 @@ static int32_t tsdbOpenRocksCache(STsdb *pTsdb) {
|
||||||
|
|
||||||
rocksdb_writebatch_t *writebatch = rocksdb_writebatch_create();
|
rocksdb_writebatch_t *writebatch = rocksdb_writebatch_create();
|
||||||
|
|
||||||
|
TAOS_CHECK_GOTO(taosThreadMutexInit(&pTsdb->rCache.writeBatchMutex, NULL), &lino, _err6) ;
|
||||||
|
|
||||||
pTsdb->rCache.writebatch = writebatch;
|
pTsdb->rCache.writebatch = writebatch;
|
||||||
pTsdb->rCache.my_comparator = cmp;
|
pTsdb->rCache.my_comparator = cmp;
|
||||||
pTsdb->rCache.options = options;
|
pTsdb->rCache.options = options;
|
||||||
|
@ -232,6 +234,8 @@ static int32_t tsdbOpenRocksCache(STsdb *pTsdb) {
|
||||||
|
|
||||||
TAOS_RETURN(code);
|
TAOS_RETURN(code);
|
||||||
|
|
||||||
|
_err6:
|
||||||
|
rocksdb_writebatch_destroy(writebatch);
|
||||||
_err5:
|
_err5:
|
||||||
rocksdb_close(pTsdb->rCache.db);
|
rocksdb_close(pTsdb->rCache.db);
|
||||||
_err4:
|
_err4:
|
||||||
|
@ -250,6 +254,7 @@ _err:
|
||||||
|
|
||||||
static void tsdbCloseRocksCache(STsdb *pTsdb) {
|
static void tsdbCloseRocksCache(STsdb *pTsdb) {
|
||||||
rocksdb_close(pTsdb->rCache.db);
|
rocksdb_close(pTsdb->rCache.db);
|
||||||
|
(void)taosThreadMutexDestroy(&pTsdb->rCache.writeBatchMutex);
|
||||||
rocksdb_flushoptions_destroy(pTsdb->rCache.flushoptions);
|
rocksdb_flushoptions_destroy(pTsdb->rCache.flushoptions);
|
||||||
rocksdb_writebatch_destroy(pTsdb->rCache.writebatch);
|
rocksdb_writebatch_destroy(pTsdb->rCache.writebatch);
|
||||||
rocksdb_readoptions_destroy(pTsdb->rCache.readoptions);
|
rocksdb_readoptions_destroy(pTsdb->rCache.readoptions);
|
||||||
|
@ -1077,7 +1082,9 @@ static int32_t tsdbCachePutToRocksdb(STsdb *pTsdb, SLastKey *pLastKey, SLastCol
|
||||||
}
|
}
|
||||||
|
|
||||||
rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
|
rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
|
||||||
|
(void)taosThreadMutexLock(&pTsdb->rCache.writeBatchMutex);
|
||||||
rocksdb_writebatch_put(wb, (char *)pLastKey, ROCKS_KEY_LEN, rocks_value, vlen);
|
rocksdb_writebatch_put(wb, (char *)pLastKey, ROCKS_KEY_LEN, rocks_value, vlen);
|
||||||
|
(void)taosThreadMutexUnlock(&pTsdb->rCache.writeBatchMutex);
|
||||||
|
|
||||||
taosMemoryFree(rocks_value);
|
taosMemoryFree(rocks_value);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue