factor code

This commit is contained in:
yihaoDeng 2023-05-11 06:37:57 +00:00
parent d473fcf6b0
commit f652f71d68
5 changed files with 22 additions and 17 deletions

View File

@ -569,6 +569,8 @@ int32_t streamAggRecoverPrepare(SStreamTask* pTask);
// int32_t streamAggChildrenRecoverFinish(SStreamTask* pTask); // int32_t streamAggChildrenRecoverFinish(SStreamTask* pTask);
int32_t streamProcessRecoverFinishReq(SStreamTask* pTask, int32_t childId); int32_t streamProcessRecoverFinishReq(SStreamTask* pTask, int32_t childId);
void streamMetaInit();
void streamMetaCleanup();
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId); SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId);
void streamMetaClose(SStreamMeta* streamMeta); void streamMetaClose(SStreamMeta* streamMeta);

View File

@ -18,6 +18,7 @@
#include "dmNodes.h" #include "dmNodes.h"
#include "index.h" #include "index.h"
#include "qworker.h" #include "qworker.h"
#include "tstream.h"
static bool dmRequireNode(SDnode *pDnode, SMgmtWrapper *pWrapper) { static bool dmRequireNode(SDnode *pDnode, SMgmtWrapper *pWrapper) {
SMgmtInputOpt input = dmBuildMgmtInputOpt(pWrapper); SMgmtInputOpt input = dmBuildMgmtInputOpt(pWrapper);
@ -153,6 +154,7 @@ int32_t dmInitDnode(SDnode *pDnode) {
} }
indexInit(tsNumOfCommitThreads); indexInit(tsNumOfCommitThreads);
streamMetaInit();
dmReportStartup("dnode-transport", "initialized"); dmReportStartup("dnode-transport", "initialized");
dDebug("dnode is created, ptr:%p", pDnode); dDebug("dnode is created, ptr:%p", pDnode);
@ -175,6 +177,7 @@ void dmCleanupDnode(SDnode *pDnode) {
dmCleanupServer(pDnode); dmCleanupServer(pDnode);
dmClearVars(pDnode); dmClearVars(pDnode);
rpcCleanup(); rpcCleanup();
streamMetaCleanup();
indexCleanup(); indexCleanup();
taosConvDestroy(); taosConvDestroy();
dDebug("dnode is closed, ptr:%p", pDnode); dDebug("dnode is closed, ptr:%p", pDnode);

View File

@ -846,7 +846,6 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) {
if (err != NULL) { if (err != NULL) {
qError("failed to create cf:%s_%s, reason:%s", pState->pTdbState->idstr, ginitDict[i].key, err); qError("failed to create cf:%s_%s, reason:%s", pState->pTdbState->idstr, ginitDict[i].key, err);
taosMemoryFreeClear(err); taosMemoryFreeClear(err);
// return -1;
} }
} }
pState->pTdbState->rocksdb = handle->db; pState->pTdbState->rocksdb = handle->db;

View File

@ -19,6 +19,13 @@
#include "tref.h" #include "tref.h"
#include "ttimer.h" #include "ttimer.h"
static TdThreadOnce streamMetaModuleInit = PTHREAD_ONCE_INIT;
static int32_t streamBackendId = 0;
static void streamMetaEnvInit() { streamBackendId = taosOpenRef(20, streamBackendCleanup); }
void streamMetaInit() { taosThreadOnce(&streamMetaModuleInit, streamMetaEnvInit); }
void streamMetaCleanup() { taosCloseRef(streamBackendId); }
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId) { SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId) {
int32_t code = -1; int32_t code = -1;
SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta)); SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta));
@ -85,8 +92,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
} }
pMeta->streamBackend = streamBackendInit(statePath); pMeta->streamBackend = streamBackendInit(statePath);
pMeta->streamBackendId = taosOpenRef(20, streamBackendCleanup); pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend);
pMeta->streamBackendRid = taosAddRef(pMeta->streamBackendId, pMeta->streamBackend);
taosMemoryFree(statePath); taosMemoryFree(statePath);
@ -129,9 +135,7 @@ void streamMetaClose(SStreamMeta* pMeta) {
} }
taosHashCleanup(pMeta->pTasks); taosHashCleanup(pMeta->pTasks);
taosRemoveRef(pMeta->streamBackendId, pMeta->streamBackendRid); taosRemoveRef(streamBackendId, pMeta->streamBackendRid);
// streamBackendCleanup(pMeta->streamBackend);
taosCloseRef(pMeta->streamBackendId);
pMeta->pTaskList = taosArrayDestroy(pMeta->pTaskList); pMeta->pTaskList = taosArrayDestroy(pMeta->pTaskList);
taosMemoryFree(pMeta->path); taosMemoryFree(pMeta->path);
taosMemoryFree(pMeta); taosMemoryFree(pMeta);

View File

@ -1,5 +1,7 @@
#include <gtest/gtest.h> #include <gtest/gtest.h>
#include "streamBackendRocksdb.h"
#include "tstream.h"
#include "tstreamUpdate.h" #include "tstreamUpdate.h"
#include "ttime.h" #include "ttime.h"
@ -9,21 +11,16 @@ using namespace std;
class StreamStateEnv : public ::testing::Test { class StreamStateEnv : public ::testing::Test {
protected: protected:
virtual void SetUp() { virtual void SetUp() {
// initLog(); streamMetaInit();
// taosRemoveDir(path); backend = streamBackendInit(path);
// SIndexOpts opts;
// opts.cacheSize = 1024 * 1024 * 4;
// int ret = indexOpen(&opts, path, &index);
// assert(ret == 0);
} }
virtual void TearDown() { virtual void TearDown() {
streamMetaCleanup();
// indexClose(index); // indexClose(index);
} }
const char *path = TD_TMP_DIR_PATH "stream"; const char *path = TD_TMP_DIR_PATH "stream";
void *backend;
// SIndexOpts* opts;
// SIndex* index;
}; };
bool equalSBF(SScalableBf *left, SScalableBf *right) { bool equalSBF(SScalableBf *left, SScalableBf *right) {
@ -212,7 +209,7 @@ TEST(TD_STREAM_UPDATE_TEST, update) {
// updateInfoDestroy(pSU7); // updateInfoDestroy(pSU7);
} }
// TEST() // TEST()
TEST_F(StreamStateEnv, test1) {} TEST(StreamStateEnv, test1) {}
// int main(int argc, char *argv[]) { // int main(int argc, char *argv[]) {
// testing::InitGoogleTest(&argc, argv); // testing::InitGoogleTest(&argc, argv);
// return RUN_ALL_TESTS(); // return RUN_ALL_TESTS();