diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 95b2f94f3f..8070067964 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -341,7 +341,7 @@ typedef struct SStreamMeta { TTB* pTaskDb; TTB* pCheckpointDb; SHashObj* pTasks; - SArray* pTaskList; // SArray + SArray* pTaskList; // SArray void* ahandle; TXN* txn; FTaskExpand* expandFunc; @@ -569,6 +569,8 @@ int32_t streamAggRecoverPrepare(SStreamTask* pTask); // int32_t streamAggChildrenRecoverFinish(SStreamTask* pTask); int32_t streamProcessRecoverFinishReq(SStreamTask* pTask, int32_t childId); +void streamMetaInit(); +void streamMetaCleanup(); SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId); void streamMetaClose(SStreamMeta* streamMeta); diff --git a/source/dnode/mgmt/node_mgmt/src/dmMgmt.c b/source/dnode/mgmt/node_mgmt/src/dmMgmt.c index d884120147..544512233e 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmMgmt.c +++ b/source/dnode/mgmt/node_mgmt/src/dmMgmt.c @@ -18,6 +18,7 @@ #include "dmNodes.h" #include "index.h" #include "qworker.h" +#include "tstream.h" static bool dmRequireNode(SDnode *pDnode, SMgmtWrapper *pWrapper) { SMgmtInputOpt input = dmBuildMgmtInputOpt(pWrapper); @@ -153,6 +154,7 @@ int32_t dmInitDnode(SDnode *pDnode) { } indexInit(tsNumOfCommitThreads); + streamMetaInit(); dmReportStartup("dnode-transport", "initialized"); dDebug("dnode is created, ptr:%p", pDnode); @@ -175,6 +177,7 @@ void dmCleanupDnode(SDnode *pDnode) { dmCleanupServer(pDnode); dmClearVars(pDnode); rpcCleanup(); + streamMetaCleanup(); indexCleanup(); taosConvDestroy(); dDebug("dnode is closed, ptr:%p", pDnode); diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index e1c030b5c5..eaf3175bfa 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -846,7 +846,6 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) { if (err != NULL) { qError("failed to create cf:%s_%s, reason:%s", pState->pTdbState->idstr, ginitDict[i].key, err); taosMemoryFreeClear(err); - // return -1; } } pState->pTdbState->rocksdb = handle->db; diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 49710b0934..8167a12f6d 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -19,6 +19,13 @@ #include "tref.h" #include "ttimer.h" +static TdThreadOnce streamMetaModuleInit = PTHREAD_ONCE_INIT; +static int32_t streamBackendId = 0; +static void streamMetaEnvInit() { streamBackendId = taosOpenRef(20, streamBackendCleanup); } + +void streamMetaInit() { taosThreadOnce(&streamMetaModuleInit, streamMetaEnvInit); } +void streamMetaCleanup() { taosCloseRef(streamBackendId); } + SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId) { int32_t code = -1; SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta)); @@ -85,8 +92,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF } pMeta->streamBackend = streamBackendInit(statePath); - pMeta->streamBackendId = taosOpenRef(20, streamBackendCleanup); - pMeta->streamBackendRid = taosAddRef(pMeta->streamBackendId, pMeta->streamBackend); + pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend); taosMemoryFree(statePath); @@ -129,9 +135,7 @@ void streamMetaClose(SStreamMeta* pMeta) { } taosHashCleanup(pMeta->pTasks); - taosRemoveRef(pMeta->streamBackendId, pMeta->streamBackendRid); - // streamBackendCleanup(pMeta->streamBackend); - taosCloseRef(pMeta->streamBackendId); + taosRemoveRef(streamBackendId, pMeta->streamBackendRid); pMeta->pTaskList = taosArrayDestroy(pMeta->pTaskList); taosMemoryFree(pMeta->path); taosMemoryFree(pMeta); diff --git a/source/libs/stream/test/tstreamUpdateTest.cpp b/source/libs/stream/test/tstreamUpdateTest.cpp index 4b3f76276c..18c60aff28 100644 --- a/source/libs/stream/test/tstreamUpdateTest.cpp +++ b/source/libs/stream/test/tstreamUpdateTest.cpp @@ -1,5 +1,7 @@ #include +#include "streamBackendRocksdb.h" +#include "tstream.h" #include "tstreamUpdate.h" #include "ttime.h" @@ -9,21 +11,16 @@ using namespace std; class StreamStateEnv : public ::testing::Test { protected: virtual void SetUp() { - // initLog(); - // taosRemoveDir(path); - // SIndexOpts opts; - // opts.cacheSize = 1024 * 1024 * 4; - // int ret = indexOpen(&opts, path, &index); - // assert(ret == 0); + streamMetaInit(); + backend = streamBackendInit(path); } virtual void TearDown() { + streamMetaCleanup(); // indexClose(index); } const char *path = TD_TMP_DIR_PATH "stream"; - - // SIndexOpts* opts; - // SIndex* index; + void *backend; }; bool equalSBF(SScalableBf *left, SScalableBf *right) { @@ -212,7 +209,7 @@ TEST(TD_STREAM_UPDATE_TEST, update) { // updateInfoDestroy(pSU7); } // TEST() -TEST_F(StreamStateEnv, test1) {} +TEST(StreamStateEnv, test1) {} // int main(int argc, char *argv[]) { // testing::InitGoogleTest(&argc, argv); // return RUN_ALL_TESTS();