cache/commit: flush cache when tsdb commits data

This commit is contained in:
Minglei Jin 2023-05-08 11:14:59 +08:00
parent 198ce399d2
commit 2eec395720
4 changed files with 31 additions and 4 deletions

View File

@ -346,6 +346,7 @@ struct STsdbFS {
typedef struct {
rocksdb_t *db;
rocksdb_options_t *options;
rocksdb_flushoptions_t *flushoptions;
rocksdb_writeoptions_t *writeoptions;
rocksdb_readoptions_t *readoptions;
rocksdb_writebatch_t *writebatch;

View File

@ -178,6 +178,7 @@ int tsdbClose(STsdb** pTsdb);
int32_t tsdbBegin(STsdb* pTsdb);
int32_t tsdbPrepareCommit(STsdb* pTsdb);
int32_t tsdbCommit(STsdb* pTsdb, SCommitInfo* pInfo);
int32_t tsdbCacheCommit(STsdb* pTsdb);
int32_t tsdbCompact(STsdb* pTsdb, SCompactInfo* pInfo);
int32_t tsdbFinishCommit(STsdb* pTsdb);
int32_t tsdbRollbackCommit(STsdb* pTsdb);

View File

@ -98,12 +98,19 @@ static int32_t tsdbOpenRocksCache(STsdb *pTsdb) {
goto _err3;
}
rocksdb_flushoptions_t *flushoptions = rocksdb_flushoptions_create();
if (NULL == flushoptions) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err4;
}
rocksdb_writebatch_t *writebatch = rocksdb_writebatch_create();
pTsdb->rCache.writebatch = writebatch;
pTsdb->rCache.options = options;
pTsdb->rCache.writeoptions = writeoptions;
pTsdb->rCache.readoptions = readoptions;
pTsdb->rCache.flushoptions = flushoptions;
pTsdb->rCache.db = db;
taosThreadMutexInit(&pTsdb->rCache.rMutex, NULL);
@ -122,6 +129,7 @@ _err:
static void tsdbCloseRocksCache(STsdb *pTsdb) {
rocksdb_close(pTsdb->rCache.db);
rocksdb_flushoptions_destroy(pTsdb->rCache.flushoptions);
rocksdb_writebatch_destroy(pTsdb->rCache.writebatch);
rocksdb_readoptions_destroy(pTsdb->rCache.readoptions);
rocksdb_writeoptions_destroy(pTsdb->rCache.writeoptions);
@ -129,6 +137,20 @@ static void tsdbCloseRocksCache(STsdb *pTsdb) {
taosThreadMutexDestroy(&pTsdb->rCache.rMutex);
}
int32_t tsdbCacheCommit(STsdb *pTsdb) {
int32_t code = 0;
char *err = NULL;
rocksdb_flush(pTsdb->rCache.db, pTsdb->rCache.flushoptions, &err);
if (NULL != err) {
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err);
rocksdb_free(err);
code = -1;
}
return code;
}
SLastCol *tsdbCacheDeserialize(char const *value) {
if (!value) {
return NULL;

View File

@ -439,6 +439,9 @@ static int vnodeCommitImpl(SCommitInfo *pInfo) {
code = tsdbCommit(pVnode->pTsdb, pInfo);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbCacheCommit(pVnode->pTsdb);
TSDB_CHECK_CODE(code, lino, _exit);
if (VND_IS_RSMA(pVnode)) {
code = smaCommit(pVnode->pSma, pInfo);
TSDB_CHECK_CODE(code, lino, _exit);