diff --git a/include/libs/stream/streamState.h b/include/libs/stream/streamState.h index 2be0330efc..b2ca8cfcf2 100644 --- a/include/libs/stream/streamState.h +++ b/include/libs/stream/streamState.h @@ -66,7 +66,7 @@ typedef struct { SSHashObj* parNameMap; int64_t checkPointId; int32_t taskId; - int32_t streamId; + int64_t streamId; } SStreamState; SStreamState* streamStateOpen(char* path, struct SStreamTask* pTask, bool specPath, int32_t szPage, int32_t pages); diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 6618650648..104498b9fe 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -347,6 +347,8 @@ typedef struct SStreamMeta { SRWLatch lock; int32_t walScanCounter; void* streamBackend; + int32_t streamBackendId; + int64_t streamBackendRid; } SStreamMeta; int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo); diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index d8ad769741..fea9f4daa3 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -617,10 +617,10 @@ rocksdb_compactionfilter_t* compactFilteFactoryCreateFilter(void* arg, rocksdb_c } int streamStateOpenBackend(void* backend, SStreamState* pState) { - qInfo("start to open backend, %p, %d-%d", pState, pState->streamId, pState->taskId); + qInfo("start to open backend, %p 0x%" PRIx64 "-%d", pState, pState->streamId, pState->taskId); SBackendHandle* handle = backend; - sprintf(pState->pTdbState->idstr, "%d-%d", pState->streamId, pState->taskId); + sprintf(pState->pTdbState->idstr, "0x%" PRIx64 "-%d", pState->streamId, pState->taskId); char* err = NULL; int cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]); @@ -671,12 +671,14 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) { SCfComparator compare = {.comp = pCompare, .numOfComp = cfLen}; pState->pTdbState->pComparNode = streamBackendAddCompare(handle, &compare); rocksdb_writeoptions_disable_WAL(pState->pTdbState->writeOpts, 1); + qInfo("succ to open backend, %p, 0x%" PRIx64 "-%d", pState, pState->streamId, pState->taskId); return 0; } void streamStateCloseBackend(SStreamState* pState, bool remove) { char* status[] = {"close", "drop"}; - qInfo("start to %s backend, %p, %d-%d", status[remove == false ? 0 : 1], pState, pState->streamId, pState->taskId); + qInfo("start to %s backend, %p, 0x%" PRIx64 "-%d", status[remove == false ? 0 : 1], pState, pState->streamId, + pState->taskId); if (pState->pTdbState->rocksdb == NULL) { return; } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index cfe6852b18..0ec8f0458a 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -16,6 +16,7 @@ #include "executor.h" #include "streamBackendRocksdb.h" #include "streamInc.h" +#include "tref.h" #include "ttimer.h" SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId) { @@ -77,6 +78,9 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF } pMeta->streamBackend = streamBackendInit(statePath); + pMeta->streamBackendId = taosOpenRef(20, streamBackendCleanup); + pMeta->streamBackendRid = taosAddRef(pMeta->streamBackendId, pMeta->streamBackend); + taosMemoryFree(statePath); taosInitRWLatch(&pMeta->lock); @@ -88,7 +92,7 @@ _err: if (pMeta->pTaskDb) tdbTbClose(pMeta->pTaskDb); if (pMeta->pCheckpointDb) tdbTbClose(pMeta->pCheckpointDb); if (pMeta->db) tdbClose(pMeta->db); - if (pMeta->streamBackend) streamBackendCleanup(pMeta->streamBackend); + // if (pMeta->streamBackend) streamBackendCleanup(pMeta->streamBackend); taosMemoryFree(pMeta); return NULL; } @@ -116,7 +120,9 @@ void streamMetaClose(SStreamMeta* pMeta) { } taosHashCleanup(pMeta->pTasks); - streamBackendCleanup(pMeta->streamBackend); + taosRemoveRef(pMeta->streamBackendId, pMeta->streamBackendRid); + // streamBackendCleanup(pMeta->streamBackend); + taosCloseRef(pMeta->streamBackendId); taosMemoryFree(pMeta->path); taosMemoryFree(pMeta); } diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 86056f94bb..437367c4b3 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -22,6 +22,7 @@ #include "tcoding.h" #include "tcommon.h" #include "tcompare.h" +#include "tref.h" #include "ttimer.h" #define MAX_TABLE_NAME_NUM 100000 @@ -92,6 +93,10 @@ int stateKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) { SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int32_t szPage, int32_t pages) { qWarn("open stream state, %s", path); + if (pTask == NULL) { + qWarn("failed to open stream state, %s", path); + return NULL; + } SStreamState* pState = taosMemoryCalloc(1, sizeof(SStreamState)); if (pState == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -115,8 +120,10 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int pState->streamId = pTask->id.streamId; #ifdef USE_ROCKSDB qWarn("open stream state1"); + taosAcquireRef(pTask->pMeta->streamBackendId, pTask->pMeta->streamBackendRid); int code = streamStateOpenBackend(pTask->pMeta->streamBackend, pState); if (code == -1) { + taosReleaseRef(pTask->pMeta->streamBackendId, pTask->pMeta->streamBackendRid); taosMemoryFree(pState); pState = NULL; } @@ -125,6 +132,7 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int pState->pFileState = NULL; _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT); pState->parNameMap = tSimpleHashInit(1024, hashFn); + return pState; #else @@ -213,6 +221,7 @@ _err: } void streamStateClose(SStreamState* pState, bool remove) { + SStreamTask* pTask = pState->pTdbState->pOwner; #ifdef USE_ROCKSDB // streamStateCloseBackend(pState); streamStateDestroy(pState, remove); @@ -227,6 +236,7 @@ void streamStateClose(SStreamState* pState, bool remove) { tdbTbClose(pState->pTdbState->pParTagDb); tdbClose(pState->pTdbState->db); #endif + taosReleaseRef(pTask->pMeta->streamBackendId, pTask->pMeta->streamBackendRid); } int32_t streamStateBegin(SStreamState* pState) {