diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 9b0673a9f5..47f3047364 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -397,7 +397,7 @@ typedef struct SStreamMeta { SMetaHbInfo hbInfo; int32_t closedTask; int32_t chkptNotReadyTasks; - int64_t rid; + int64_t rid; int64_t chkpId; SArray* chkpSaved; diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index fc722fa12c..2da669ec46 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -30,16 +30,26 @@ int32_t streamBackendId = 0; int32_t streamBackendCfWrapperId = 0; int32_t streamMetaId = 0; -static int64_t streamGetLatestCheckpointId(SStreamMeta* pMeta); -static void metaHbToMnode(void* param, void* tmrId); -static void streamMetaClear(SStreamMeta* pMeta); +typedef struct { + TdThreadMutex mutex; + SHashObj* pTable; +} SGStreamMetaMgt; + +SGStreamMetaMgt gStreamMetaMgt; +static int64_t streamGetLatestCheckpointId(SStreamMeta* pMeta); +static void metaHbToMnode(void* param, void* tmrId); +static void streamMetaClear(SStreamMeta* pMeta); + +void streamMetaCloseImpl(void* arg); -void streamMetaCloseImpl(void* arg); static void streamMetaEnvInit() { streamBackendId = taosOpenRef(64, streamBackendCleanup); streamBackendCfWrapperId = taosOpenRef(64, streamBackendHandleCleanup); streamMetaId = taosOpenRef(64, streamMetaCloseImpl); + + taosThreadMutexInit(&(gStreamMetaMgt.mutex), NULL); + gStreamMetaMgt.pTable = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); } void streamMetaInit() { taosThreadOnce(&streamMetaModuleInit, streamMetaEnvInit); } @@ -47,6 +57,35 @@ void streamMetaCleanup() { taosCloseRef(streamBackendId); taosCloseRef(streamBackendCfWrapperId); taosCloseRef(streamMetaId); + + taosThreadMutexDestroy(&gStreamMetaMgt.mutex); + + void* pIter = taosHashIterate(gStreamMetaMgt.pTable, NULL); + while (pIter) { + SArray* list = *(SArray**)pIter; + for (int i = 0; i < taosArrayGetSize(list); i++) { + void* rid = taosArrayGetP(list, i); + taosMemoryFree(rid); + } + taosArrayDestroy(list); + pIter = taosHashIterate(gStreamMetaMgt.pTable, pIter); + } + taosHashCleanup(gStreamMetaMgt.pTable); +} + +int32_t streamMetaAddRidToGlobalMgt(int64_t vgId, int64_t* rid) { + taosThreadMutexLock(&gStreamMetaMgt.mutex); + void* p = taosHashGet(gStreamMetaMgt.pTable, &vgId, sizeof(vgId)); + if (p == NULL) { + SArray* list = taosArrayInit(8, sizeof(void*)); + taosArrayPush(list, &rid); + taosHashPut(gStreamMetaMgt.pTable, &vgId, sizeof(vgId), &list, sizeof(void*)); + } else { + SArray* list = *(SArray**)p; + taosArrayPush(list, &rid); + } + taosThreadMutexUnlock(&gStreamMetaMgt.mutex); + return 0; } SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId, int64_t stage) { @@ -102,6 +141,8 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF int64_t* pRid = taosMemoryMalloc(sizeof(int64_t)); *pRid = pMeta->rid; + streamMetaAddRidToGlobalMgt(pMeta->vgId, pRid); + pMeta->hbInfo.hbTmr = taosTmrStart(metaHbToMnode, META_HB_CHECK_INTERVAL, pRid, streamEnv.timer); pMeta->hbInfo.tickCounter = 0; pMeta->hbInfo.stopFlag = 0; @@ -214,6 +255,12 @@ void streamMetaClear(SStreamMeta* pMeta) { void streamMetaClose(SStreamMeta* pMeta) { qDebug("start to close stream meta"); + // int64_t rid = *(int64_t*)pMeta->pRid; + // if (taosTmrStop(pMeta->hbInfo.hbTmr)) { + // taosMemoryFree(pMeta->pRid); + // } else { + // // do nothing, stop by timer thread + // } taosRemoveRef(streamMetaId, pMeta->rid); } @@ -641,7 +688,7 @@ void metaHbToMnode(void* param, void* tmrId) { SStreamHbMsg hbMsg = {0}; SStreamMeta* pMeta = taosAcquireRef(streamMetaId, rid); if (pMeta == NULL) { - taosMemoryFree(param); + // taosMemoryFree(param); return; }