From d5c0ca3163160d568804e0b1b199829d507bce22 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 8 Jun 2023 13:36:52 +0000 Subject: [PATCH 1/4] opt batch write --- source/libs/stream/inc/streamBackendRocksdb.h | 5 ++ source/libs/stream/src/streamBackendRocksdb.c | 54 +++++++++++++------ source/libs/stream/src/tstreamFileState.c | 11 +++- 3 files changed, 52 insertions(+), 18 deletions(-) diff --git a/source/libs/stream/inc/streamBackendRocksdb.h b/source/libs/stream/inc/streamBackendRocksdb.h index 1cbd7b042c..da4e442f1a 100644 --- a/source/libs/stream/inc/streamBackendRocksdb.h +++ b/source/libs/stream/inc/streamBackendRocksdb.h @@ -122,12 +122,17 @@ char* streamDefaultIterKey_rocksdb(void* iter, int32_t* len); char* streamDefaultIterVal_rocksdb(void* iter, int32_t* len); // batch func +int streamStateGetCfIdx(SStreamState* pState, const char* funcName); void* streamStateCreateBatch(); int32_t streamStateGetBatchSize(void* pBatch); void streamStateClearBatch(void* pBatch); void streamStateDestroyBatch(void* pBatch); int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_writebatch_t* pBatch, void* key, void* val, int32_t vlen, int64_t ttl); + +int32_t streamStatePutBatchOptimize(SStreamState* pState, int32_t cfIdx, rocksdb_writebatch_t* pBatch, void* key, + void* val, int32_t vlen, int64_t ttl, void* tmpBuf); + int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch); // int32_t streamDefaultIter_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result); #endif \ No newline at end of file diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index df045eef20..4ad208988b 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -210,7 +210,6 @@ void streamBackendDelCompare(void* backend, void* arg) { } void streamStateDestroy_rocksdb(SStreamState* pState, bool remove) { streamStateCloseBackend(pState, remove); } static bool streamStateIterSeekAndValid(rocksdb_iterator_t* iter, char* buf, size_t len); -int streamGetInit(SStreamState* pState, const char* funcName); // |key|-----value------| // |key|ttl|len|userData| @@ -557,14 +556,20 @@ typedef struct { int32_t encodeValueFunc(void* value, int32_t vlen, int64_t ttl, char** dest) { SStreamValue key = {.unixTimestamp = ttl, .len = vlen, .data = (char*)(value)}; - - char* p = taosMemoryCalloc(1, sizeof(int64_t) + sizeof(int32_t) + key.len); - char* buf = p; - int32_t len = 0; - len += taosEncodeFixedI64((void**)&buf, key.unixTimestamp); - len += taosEncodeFixedI32((void**)&buf, key.len); - len += taosEncodeBinary((void**)&buf, (char*)value, vlen); - *dest = p; + int32_t len = 0; + if (*dest == NULL) { + char* p = taosMemoryCalloc(1, sizeof(int64_t) + sizeof(int32_t) + key.len); + char* buf = p; + len += taosEncodeFixedI64((void**)&buf, key.unixTimestamp); + len += taosEncodeFixedI32((void**)&buf, key.len); + len += taosEncodeBinary((void**)&buf, (char*)value, vlen); + *dest = p; + } else { + char* buf = *dest; + len += taosEncodeFixedI64((void**)&buf, key.unixTimestamp); + len += taosEncodeFixedI32((void**)&buf, key.len); + len += taosEncodeBinary((void**)&buf, (char*)value, vlen); + } return len; } /* @@ -713,7 +718,7 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t rocksdb_options_set_block_based_table_factory((rocksdb_options_t*)cfOpts[i], tableOpt); params[i].tableOpt = tableOpt; - int idx = streamGetInit(NULL, funcname); + int idx = streamStateGetCfIdx(NULL, funcname); SCfInit* cfPara = &ginitDict[idx]; rocksdb_comparator_t* compare = @@ -744,7 +749,7 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t char idstr[128] = {0}; sprintf(idstr, "0x%" PRIx64 "-%d", streamId, taskId); - int idx = streamGetInit(NULL, funcname); + int idx = streamStateGetCfIdx(NULL, funcname); RocksdbCfInst* inst = NULL; RocksdbCfInst** pInst = taosHashGet(handle->cfInst, idstr, strlen(idstr) + 1); @@ -955,7 +960,7 @@ void streamStateDestroyCompar(void* arg) { taosMemoryFree(comp->comp); } -int streamGetInit(SStreamState* pState, const char* funcName) { +int streamStateGetCfIdx(SStreamState* pState, const char* funcName) { int idx = -1; size_t len = strlen(funcName); for (int i = 0; i < sizeof(ginitDict) / sizeof(ginitDict[0]); i++) { @@ -1002,7 +1007,7 @@ bool streamStateIterSeekAndValid(rocksdb_iterator_t* iter, char* buf, size_t len } rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfName, rocksdb_snapshot_t** snapshot, rocksdb_readoptions_t** readOpt) { - int idx = streamGetInit(pState, cfName); + int idx = streamStateGetCfIdx(pState, cfName); if (snapshot != NULL) { *snapshot = (rocksdb_snapshot_t*)rocksdb_create_snapshot(pState->pTdbState->rocksdb); @@ -1022,7 +1027,7 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa code = 0; \ char buf[128] = {0}; \ char* err = NULL; \ - int i = streamGetInit(pState, funcname); \ + int i = streamStateGetCfIdx(pState, funcname); \ if (i < 0) { \ qWarn("streamState failed to get cf name: %s", funcname); \ code = -1; \ @@ -1053,7 +1058,7 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa code = 0; \ char buf[128] = {0}; \ char* err = NULL; \ - int i = streamGetInit(pState, funcname); \ + int i = streamStateGetCfIdx(pState, funcname); \ if (i < 0) { \ qWarn("streamState failed to get cf name: %s", funcname); \ code = -1; \ @@ -1101,7 +1106,7 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa code = 0; \ char buf[128] = {0}; \ char* err = NULL; \ - int i = streamGetInit(pState, funcname); \ + int i = streamStateGetCfIdx(pState, funcname); \ if (i < 0) { \ qWarn("streamState failed to get cf name: %s_%s", pState->pTdbState->idstr, funcname); \ code = -1; \ @@ -2041,7 +2046,7 @@ void streamStateClearBatch(void* pBatch) { rocksdb_writebatch_clear((rocksdb_ void streamStateDestroyBatch(void* pBatch) { rocksdb_writebatch_destroy((rocksdb_writebatch_t*)pBatch); } int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_writebatch_t* pBatch, void* key, void* val, int32_t vlen, int64_t ttl) { - int i = streamGetInit(pState, cfName); + int i = streamStateGetCfIdx(pState, cfName); if (i < 0) { qError("streamState failed to put to cf name:%s", cfName); @@ -2057,6 +2062,21 @@ int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_wr taosMemoryFree(ttlV); return 0; } +int32_t streamStatePutBatchOptimize(SStreamState* pState, int32_t cfIdx, rocksdb_writebatch_t* pBatch, void* key, + void* val, int32_t vlen, int64_t ttl, void* tmpBuf) { + char buf[128] = {0}; + int32_t klen = ginitDict[cfIdx].enFunc((void*)key, buf); + char* ttlV = tmpBuf; + int32_t ttlVLen = ginitDict[cfIdx].enValueFunc(val, vlen, ttl, &ttlV); + + rocksdb_column_family_handle_t* pCf = pState->pTdbState->pHandle[ginitDict[cfIdx].idx]; + rocksdb_writebatch_put_cf((rocksdb_writebatch_t*)pBatch, pCf, buf, (size_t)klen, ttlV, (size_t)ttlVLen); + + if (tmpBuf == NULL) { + taosMemoryFree(ttlV); + } + return 0; +} int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch) { char* err = NULL; rocksdb_write(pState->pTdbState->rocksdb, pState->pTdbState->writeOpts, (rocksdb_writebatch_t*)pBatch, &err); diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index bfaeca89f6..dc9a1f80bb 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -350,6 +350,11 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, const int32_t BATCH_LIMIT = 256; SListNode* pNode = NULL; + int idx = streamStateGetCfIdx(pFileState->pFileStore, "state"); + + int32_t len = pFileState->rowSize + sizeof(uint64_t) + sizeof(int32_t) + 1; + char* buf = taosMemoryCalloc(1, len); + void* batch = streamStateCreateBatch(); while ((pNode = tdListNext(&iter)) != NULL && code == TSDB_CODE_SUCCESS) { SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data; @@ -360,9 +365,13 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, } SStateKey sKey = {.key = *((SWinKey*)pPos->pKey), .opNum = ((SStreamState*)pFileState->pFileStore)->number}; - code = streamStatePutBatch(pFileState->pFileStore, "state", batch, &sKey, pPos->pRowBuff, pFileState->rowSize, 0); + code = streamStatePutBatchOptimize(pFileState->pFileStore, idx, batch, &sKey, pPos->pRowBuff, pFileState->rowSize, + 0, buf); + memset(buf, 0, len); qDebug("===stream===put %" PRId64 " to disc, res %d", sKey.key.ts, code); } + taosMemoryFree(buf); + if (streamStateGetBatchSize(batch) > 0) { code = streamStatePutBatch_rocksdb(pFileState->pFileStore, batch); } From a67ef8ce916fec2c41b7e573f5338305b9cda94a Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 9 Jun 2023 01:27:08 +0000 Subject: [PATCH 2/4] opt batch write --- source/libs/stream/src/streamBackendRocksdb.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 4ad208988b..c4e3f147e0 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -877,7 +877,7 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) { taosThreadRwlockInit(&pState->pTdbState->rwLock, NULL); SCfComparator compare = {.comp = pCompare, .numOfComp = cfLen}; pState->pTdbState->pComparNode = streamBackendAddCompare(handle, &compare); - // rocksdb_writeoptions_disable_WAL(pState->pTdbState->writeOpts, 1); + rocksdb_writeoptions_disable_WAL(pState->pTdbState->writeOpts, 1); qInfo("succ to open state %p on backend, %p, 0x%" PRIx64 "-%d", pState, handle, pState->streamId, pState->taskId); return 0; } From 06b65ce5c1bbdf4632706cd84e2b24193b22b404 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 9 Jun 2023 07:49:43 +0000 Subject: [PATCH 3/4] opt batch write --- source/libs/stream/src/streamBackendRocksdb.c | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index c4e3f147e0..3581a4c0ff 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -89,6 +89,8 @@ void* streamBackendInit(const char* path) { pHandle->cfInst = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); rocksdb_env_t* env = rocksdb_create_default_env(); // rocksdb_envoptions_create(); + rocksdb_env_set_low_priority_background_threads(env, tsNumOfSnodeStreamThreads); + rocksdb_env_set_high_priority_background_threads(env, tsNumOfSnodeStreamThreads); rocksdb_cache_t* cache = rocksdb_cache_create_lru(64 << 20); @@ -96,13 +98,13 @@ void* streamBackendInit(const char* path) { rocksdb_options_set_env(opts, env); rocksdb_options_set_create_if_missing(opts, 1); rocksdb_options_set_create_missing_column_families(opts, 1); - rocksdb_options_set_write_buffer_size(opts, 48 << 20); rocksdb_options_set_max_total_wal_size(opts, 128 << 20); rocksdb_options_set_recycle_log_file_num(opts, 6); - rocksdb_options_set_max_write_buffer_number(opts, 2); + rocksdb_options_set_max_write_buffer_number(opts, 3); rocksdb_options_set_info_log_level(opts, 0); uint32_t dbLimit = nextPow2(tsMaxStreamBackendCache); rocksdb_options_set_db_write_buffer_size(opts, dbLimit << 20); + rocksdb_options_set_write_buffer_size(opts, (dbLimit << 20) / 2); pHandle->env = env; pHandle->dbOpt = opts; From d38e3835c097b1697e120a6bdb6bd296c756be5f Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 9 Jun 2023 08:12:39 +0000 Subject: [PATCH 4/4] opt batch write --- source/libs/stream/src/streamBackendRocksdb.c | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 3581a4c0ff..cebe4e8204 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -81,6 +81,8 @@ const char* compareParKeyName(void* name); const char* comparePartagKeyName(void* name); void* streamBackendInit(const char* path) { + uint32_t dbMemLimit = nextPow2(tsMaxStreamBackendCache) << 20; + qDebug("start to init stream backend at %s", path); SBackendHandle* pHandle = taosMemoryCalloc(1, sizeof(SBackendHandle)); pHandle->list = tdListNew(sizeof(SCfComparator)); @@ -89,22 +91,23 @@ void* streamBackendInit(const char* path) { pHandle->cfInst = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); rocksdb_env_t* env = rocksdb_create_default_env(); // rocksdb_envoptions_create(); - rocksdb_env_set_low_priority_background_threads(env, tsNumOfSnodeStreamThreads); - rocksdb_env_set_high_priority_background_threads(env, tsNumOfSnodeStreamThreads); - rocksdb_cache_t* cache = rocksdb_cache_create_lru(64 << 20); + int32_t nBGThread = tsNumOfSnodeStreamThreads <= 2 ? 1 : tsNumOfSnodeStreamThreads / 2; + rocksdb_env_set_low_priority_background_threads(env, nBGThread); + rocksdb_env_set_high_priority_background_threads(env, nBGThread); + + rocksdb_cache_t* cache = rocksdb_cache_create_lru(dbMemLimit / 2); rocksdb_options_t* opts = rocksdb_options_create(); rocksdb_options_set_env(opts, env); rocksdb_options_set_create_if_missing(opts, 1); rocksdb_options_set_create_missing_column_families(opts, 1); - rocksdb_options_set_max_total_wal_size(opts, 128 << 20); + rocksdb_options_set_max_total_wal_size(opts, dbMemLimit); rocksdb_options_set_recycle_log_file_num(opts, 6); rocksdb_options_set_max_write_buffer_number(opts, 3); rocksdb_options_set_info_log_level(opts, 0); - uint32_t dbLimit = nextPow2(tsMaxStreamBackendCache); - rocksdb_options_set_db_write_buffer_size(opts, dbLimit << 20); - rocksdb_options_set_write_buffer_size(opts, (dbLimit << 20) / 2); + rocksdb_options_set_db_write_buffer_size(opts, dbMemLimit); + rocksdb_options_set_write_buffer_size(opts, dbMemLimit / 2); pHandle->env = env; pHandle->dbOpt = opts;