From 3e944d228ba1d2fcc1fb242094d2fde1d997ecb8 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 21 Aug 2023 21:48:06 +0800 Subject: [PATCH] refactor checkpoint --- source/libs/stream/src/streamBackendRocksdb.c | 45 +++++++++++-------- 1 file changed, 27 insertions(+), 18 deletions(-) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index dca58b9bdc..97e9154480 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -466,16 +466,16 @@ void* streamBackendInit(const char* streamPath, int64_t chkpId) { taosThreadMutexInit(&pHandle->cfMutex, NULL); pHandle->cfInst = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); - // rocksdb_env_t* env = rocksdb_create_default_env(); // rocksdb_envoptions_create(); + rocksdb_env_t* env = rocksdb_create_default_env(); // rocksdb_envoptions_create(); - // int32_t nBGThread = tsNumOfSnodeStreamThreads <= 2 ? 1 : tsNumOfSnodeStreamThreads / 2; - // rocksdb_env_set_low_priority_background_threads(env, nBGThread); - // rocksdb_env_set_high_priority_background_threads(env, nBGThread); + int32_t nBGThread = tsNumOfSnodeStreamThreads <= 2 ? 1 : tsNumOfSnodeStreamThreads / 2; + rocksdb_env_set_low_priority_background_threads(env, nBGThread); + rocksdb_env_set_high_priority_background_threads(env, nBGThread); rocksdb_cache_t* cache = rocksdb_cache_create_lru(dbMemLimit / 2); rocksdb_options_t* opts = rocksdb_options_create(); - // rocksdb_options_set_env(opts, env); + rocksdb_options_set_env(opts, env); rocksdb_options_set_create_if_missing(opts, 1); rocksdb_options_set_create_missing_column_families(opts, 1); rocksdb_options_set_max_total_wal_size(opts, dbMemLimit); @@ -484,8 +484,9 @@ void* streamBackendInit(const char* streamPath, int64_t chkpId) { rocksdb_options_set_info_log_level(opts, 1); rocksdb_options_set_db_write_buffer_size(opts, dbMemLimit); rocksdb_options_set_write_buffer_size(opts, dbMemLimit / 2); + rocksdb_options_set_atomic_flush(opts, 1); - // pHandle->env = env; + pHandle->env = env; pHandle->dbOpt = opts; pHandle->cache = cache; pHandle->filterFactory = rocksdb_compactionfilterfactory_create( @@ -520,7 +521,7 @@ void* streamBackendInit(const char* streamPath, int64_t chkpId) { _EXIT: rocksdb_options_destroy(opts); rocksdb_cache_destroy(cache); - // rocksdb_env_destroy(env); + rocksdb_env_destroy(env); taosThreadMutexDestroy(&pHandle->mutex); taosThreadMutexDestroy(&pHandle->cfMutex); taosHashCleanup(pHandle->cfInst); @@ -556,7 +557,7 @@ void streamBackendCleanup(void* arg) { rocksdb_close(pHandle->db); } rocksdb_options_destroy(pHandle->dbOpt); - // rocksdb_env_destroy(pHandle->env); + rocksdb_env_destroy(pHandle->env); rocksdb_cache_destroy(pHandle->cache); SListNode* head = tdListPopHead(pHandle->list); @@ -789,7 +790,9 @@ int32_t chkpGetAllDbCfHandle(SStreamMeta* pMeta, rocksdb_column_family_handle_t* rocksdb_column_family_handle_t* p = wrapper->pHandle[i]; size_t len = 0; char* name = rocksdb_column_family_handle_get_name(p, &len); - qError("column name: name: %d", (int)len); + char buf[64] = {0}; + memcpy(buf, name, len); + qError("column name: name: %s, len: %d", buf, (int)len); taosMemoryFree(name); taosArrayPush(pHandle, &p); @@ -834,8 +837,9 @@ _ERROR: return code; } int32_t chkpPreFlushDb(rocksdb_t* db, rocksdb_column_family_handle_t** cf, int32_t nCf) { - int code = -1; - char* err = NULL; + int code = 0; + char* err = NULL; + rocksdb_flushoptions_t* flushOpt = rocksdb_flushoptions_create(); rocksdb_flushoptions_set_wait(flushOpt, 1); @@ -843,8 +847,8 @@ int32_t chkpPreFlushDb(rocksdb_t* db, rocksdb_column_family_handle_t** cf, int32 if (err != NULL) { qError("failed to flush db before streamBackend clean up, reason:%s", err); taosMemoryFree(err); + code = -1; } - code = 0; rocksdb_flushoptions_destroy(flushOpt); return code; } @@ -896,11 +900,10 @@ int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) { } // Get all cf and acquire cfWrappter - int32_t nCf = 0; // chkpGetAllDbCfHandle(pMeta, &ppCf, refs); + int32_t nCf = chkpGetAllDbCfHandle(pMeta, &ppCf, refs); qDebug("stream backend:%p start to do checkpoint at:%s, cf num: %d ", pHandle, pChkpIdDir, 0); - // code = chkpPreFlushDb(pHandle->db, ppCf, nCf); - code = 0; + code = chkpPreFlushDb(pHandle->db, ppCf, nCf); if (code == 0) { code = chkpDoDbCheckpoint(pHandle->db, pChkpIdDir); if (code != 0) { @@ -964,7 +967,13 @@ static rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const cha rocksdb_snapshot_t** snapshot, rocksdb_readoptions_t** readOpt); int defaultKeyComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen) { - int ret = memcmp(aBuf, bBuf, aLen); + int ret = 0; + // qError("alen: %d, blen:%d", (int)aLen, (int)bLen); + if (aLen < bLen) { + ret = memcmp(aBuf, bBuf, aLen); + } else { + ret = memcmp(aBuf, bBuf, bLen); + } if (ret == 0) { if (aLen < bLen) return -1; @@ -1474,7 +1483,7 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t inst->pCompares = taosMemoryCalloc(cfLen, sizeof(rocksdb_comparator_t*)); inst->dbOpt = handle->dbOpt; - // rocksdb_writeoptions_disable_WAL(inst->wOpt, 1); + rocksdb_writeoptions_disable_WAL(inst->wOpt, 1); taosHashPut(handle->cfInst, idstr, strlen(idstr) + 1, &inst, sizeof(void*)); } else { inst = *pInst; @@ -1595,7 +1604,7 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) { taosThreadRwlockInit(&pBackendCfWrapper->rwLock, NULL); SCfComparator compare = {.comp = pCompare, .numOfComp = cfLen}; pBackendCfWrapper->pComparNode = streamBackendAddCompare(handle, &compare); - // rocksdb_writeoptions_disable_WAL(pBackendCfWrapper->writeOpts, 1); + rocksdb_writeoptions_disable_WAL(pBackendCfWrapper->writeOpts, 1); memcpy(pBackendCfWrapper->idstr, pState->pTdbState->idstr, sizeof(pState->pTdbState->idstr)); int64_t id = taosAddRef(streamBackendCfWrapperId, pBackendCfWrapper);