From b1c50e23b694a18db7756521b4d668d644267517 Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Wed, 15 May 2024 11:38:43 +0800 Subject: [PATCH] refactor checkpoint --- source/libs/stream/src/streamBackendRocksdb.c | 43 ++++++++++--------- 1 file changed, 23 insertions(+), 20 deletions(-) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 34312dab54..bcc5da81ea 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -1043,8 +1043,7 @@ int32_t chkpDoDbCheckpoint(rocksdb_t* db, char* path) { taosMemoryFreeClear(err); goto _ERROR; } - - rocksdb_checkpoint_create(cp, path, 64 << 20, &err); + rocksdb_checkpoint_create(cp, path, UINT64_MAX, &err); if (err != NULL) { stError("failed to do checkpoint at:%s, reason:%s", path, err); taosMemoryFreeClear(err); @@ -1192,20 +1191,23 @@ int32_t taskDbDoCheckpoint(void* arg, int64_t chkpId) { int32_t nCf = chkpGetAllDbCfHandle2(pTaskDb, &ppCf); stDebug("stream backend:%p start to do checkpoint at:%s, cf num: %d ", pTaskDb, pChkpIdDir, nCf); - if ((code = chkpPreFlushDb(pTaskDb->db, ppCf, nCf)) == 0) { - if ((code = chkpDoDbCheckpoint(pTaskDb->db, pChkpIdDir)) != 0) { - stError("stream backend:%p failed to do checkpoint at:%s", pTaskDb, pChkpIdDir); - } else { - stDebug("stream backend:%p end to do checkpoint at:%s, time cost:%" PRId64 "ms", pTaskDb, pChkpIdDir, - taosGetTimestampMs() - st); - } + int64_t written = atomic_load_64(&pTaskDb->dataWritten); + + if (written > 0) { + stDebug("stream backend:%p start to flush db at:%s, data written:%" PRId64 "", pTaskDb, pChkpIdDir, written); + code = chkpPreFlushDb(pTaskDb->db, ppCf, nCf); } else { - stError("stream backend:%p failed to flush db at:%s", pTaskDb, pChkpIdDir); + stDebug("stream backend:%p not need flush db at:%s, data written:%" PRId64 "", pTaskDb, pChkpIdDir, written); + } + if ((code = chkpDoDbCheckpoint(pTaskDb->db, pChkpIdDir)) != 0) { + stError("stream backend:%p failed to do checkpoint at:%s", pTaskDb, pChkpIdDir); + } else { + stDebug("stream backend:%p end to do checkpoint at:%s, time cost:%" PRId64 "ms", pTaskDb, pChkpIdDir, + taosGetTimestampMs() - st); } code = chkpMayDelObsolete(pTaskDb, chkpId, pChkpDir); - pTaskDb->dataWritten = 0; - + atomic_store_64(&pTaskDb->dataWritten, 0); pTaskDb->chkpId = chkpId; _EXIT: @@ -1588,7 +1590,7 @@ int32_t valueEncode(void* value, int32_t vlen, int64_t ttl, char** dest) { if (*dest == NULL) { size_t size = sizeof(key.unixTimestamp) + sizeof(key.len) + sizeof(key.rawLen) + sizeof(key.compress) + key.len; char* p = taosMemoryCalloc(1, size); - char* buf = p; + char* buf = p; len += taosEncodeFixedI64((void**)&buf, key.unixTimestamp); len += taosEncodeFixedI32((void**)&buf, key.len); len += taosEncodeFixedI32((void**)&buf, key.rawLen); @@ -1846,7 +1848,7 @@ void taskDbInitOpt(STaskDbWrapper* pTaskDb) { rocksdb_options_set_create_if_missing(opts, 1); rocksdb_options_set_create_missing_column_families(opts, 1); // rocksdb_options_set_max_total_wal_size(opts, dbMemLimit); - rocksdb_options_set_recycle_log_file_num(opts, 6); + // rocksdb_options_set_ecycle_log_file_num(opts, 6); rocksdb_options_set_max_write_buffer_number(opts, 3); rocksdb_options_set_info_log_level(opts, 1); rocksdb_options_set_db_write_buffer_size(opts, 256 << 20); @@ -2563,7 +2565,7 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe break; \ } \ STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; \ - wrapper->dataWritten += 1; \ + atomic_add_fetch_64(&wrapper->dataWritten, 1); \ char toString[128] = {0}; \ if (stDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \ int32_t klen = ginitDict[i].enFunc((void*)key, buf); \ @@ -2640,7 +2642,7 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe break; \ } \ STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; \ - wrapper->dataWritten += 1; \ + atomic_add_fetch_64(&wrapper->dataWritten, 1); \ char toString[128] = {0}; \ if (stDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \ int32_t klen = ginitDict[i].enFunc((void*)key, buf); \ @@ -2681,7 +2683,7 @@ int32_t streamStateClear_rocksdb(SStreamState* pState) { stDebug("streamStateClear_rocksdb"); STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; - wrapper->dataWritten += 1; + atomic_add_fetch_64(&wrapper->dataWritten, 1); char sKeyStr[128] = {0}; char eKeyStr[128] = {0}; @@ -3705,7 +3707,7 @@ void streamStateDestroyBatch(void* pBatch) { rocksdb_writebatch_destroy((rock int32_t streamStatePutBatch(SStreamState* pState, const char* cfKeyName, rocksdb_writebatch_t* pBatch, void* key, void* val, int32_t vlen, int64_t ttl) { STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; - wrapper->dataWritten += 1; + atomic_add_fetch_64(&wrapper->dataWritten, 1); int i = streamStateGetCfIdx(pState, cfKeyName); if (i < 0) { @@ -3739,7 +3741,8 @@ int32_t streamStatePutBatchOptimize(SStreamState* pState, int32_t cfIdx, rocksdb int32_t ttlVLen = ginitDict[cfIdx].enValueFunc(val, vlen, ttl, &ttlV); STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; - wrapper->dataWritten += 1; + + atomic_add_fetch_64(&wrapper->dataWritten, 1); rocksdb_column_family_handle_t* pCf = wrapper->pCf[ginitDict[cfIdx].idx]; rocksdb_writebatch_put_cf((rocksdb_writebatch_t*)pBatch, pCf, buf, (size_t)klen, ttlV, (size_t)ttlVLen); @@ -3758,7 +3761,7 @@ int32_t streamStatePutBatchOptimize(SStreamState* pState, int32_t cfIdx, rocksdb int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch) { char* err = NULL; STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; - wrapper->dataWritten += 1; + atomic_add_fetch_64(&wrapper->dataWritten, 1); rocksdb_write(wrapper->db, wrapper->writeOpt, (rocksdb_writebatch_t*)pBatch, &err); if (err != NULL) { stError("streamState failed to write batch, err:%s", err);