From 8ed2836a2b33c1d6f953b565a7bc8653abfe9e68 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 29 Aug 2023 09:52:03 +0800 Subject: [PATCH] refactor code --- source/libs/stream/inc/streamBackendRocksdb.h | 2 +- source/libs/stream/src/streamBackendRocksdb.c | 2 +- source/libs/stream/src/streamMeta.c | 10 +++++----- source/libs/stream/src/streamSnapshot.c | 2 +- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/source/libs/stream/inc/streamBackendRocksdb.h b/source/libs/stream/inc/streamBackendRocksdb.h index 00fdb7be78..39854d1824 100644 --- a/source/libs/stream/inc/streamBackendRocksdb.h +++ b/source/libs/stream/inc/streamBackendRocksdb.h @@ -139,7 +139,7 @@ int32_t streamStatePutBatchOptimize(SStreamState* pState, int32_t cfIdx, rocksdb int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch); int32_t streamBackendTriggerChkp(void* pMeta, char* dst); -int32_t streamBackendAddInUseChkpPos(void* arg, int64_t chkpId); +int32_t streamBackendAddInUseChkp(void* arg, int64_t chkpId); int32_t streamBackendDelInUseChkp(void* arg, int64_t chkpId); // int32_t streamDefaultIter_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result); diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 1ebe82ba5c..40d5fdc86b 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -909,7 +909,7 @@ _ERROR: taosArrayDestroy(refs); return code; } -int32_t streamBackendAddInUseChkpPos(void* arg, int64_t chkpId) { +int32_t streamBackendAddInUseChkp(void* arg, int64_t chkpId) { if (arg == NULL) return 0; SStreamMeta* pMeta = arg; diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index a8fa037592..817ceed69c 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -49,11 +49,6 @@ void metaRefMgtInit(); void metaRefMgtCleanup(); int32_t metaRefMgtAdd(int64_t vgId, int64_t* rid); -void metaRefMgtInit() { - taosThreadMutexInit(&(gMetaRefMgt.mutex), NULL); - gMetaRefMgt.pTable = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); -} - static void streamMetaEnvInit() { streamBackendId = taosOpenRef(64, streamBackendCleanup); streamBackendCfWrapperId = taosOpenRef(64, streamBackendHandleCleanup); @@ -72,6 +67,11 @@ void streamMetaCleanup() { metaRefMgtCleanup(); } +void metaRefMgtInit() { + taosThreadMutexInit(&(gMetaRefMgt.mutex), NULL); + gMetaRefMgt.pTable = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); +} + void metaRefMgtCleanup() { void* pIter = taosHashIterate(gMetaRefMgt.pTable, NULL); while (pIter) { diff --git a/source/libs/stream/src/streamSnapshot.c b/source/libs/stream/src/streamSnapshot.c index 29e1232ee8..8a4500dd86 100644 --- a/source/libs/stream/src/streamSnapshot.c +++ b/source/libs/stream/src/streamSnapshot.c @@ -123,7 +123,7 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, int64_t chk if (taosIsDir(tdir)) { validChkp = 1; qInfo("%s start to read snap %s", STREAM_STATE_TRANSFER, tdir); - streamBackendAddInUseChkpPos(pMeta, chkpId); + streamBackendAddInUseChkp(pMeta, chkpId); } else { qWarn("%s failed to read from %s, reason: dir not exist,retry to default state dir", STREAM_STATE_TRANSFER, tdir); }