diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index ff97eac8e2..57abd16f1d 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -31,18 +31,28 @@ int32_t streamBackendId = 0; int32_t streamBackendCfWrapperId = 0; int32_t streamMetaId = 0; +static int64_t streamGetLatestCheckpointId(SStreamMeta* pMeta); +static void metaHbToMnode(void* param, void* tmrId); +static void streamMetaClear(SStreamMeta* pMeta); +static int32_t streamMetaBegin(SStreamMeta* pMeta); +static void streamMetaCloseImpl(void* arg); +static void extractStreamTaskKey(int64_t* pKey, const SStreamTask* pTask); + typedef struct { TdThreadMutex mutex; SHashObj* pTable; -} SGStreamMetaMgt; +} SMetaRefMgt; -SGStreamMetaMgt gStreamMetaMgt; -static int64_t streamGetLatestCheckpointId(SStreamMeta* pMeta); -static void metaHbToMnode(void* param, void* tmrId); -static void streamMetaClear(SStreamMeta* pMeta); -static int32_t streamMetaBegin(SStreamMeta* pMeta); -static void streamMetaCloseImpl(void* arg); -static void extractStreamTaskKey(int64_t* pKey, const SStreamTask* pTask); +SMetaRefMgt gMetaRefMgt; + +void metaRefMgtInit(); +void metaRefMgtCleanup(); +int32_t metaRefMgtAdd(int64_t vgId, int64_t* rid); + +void metaRefMgtInit() { + taosThreadMutexInit(&(gMetaRefMgt.mutex), NULL); + gMetaRefMgt.pTable = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); +} static void streamMetaEnvInit() { streamBackendId = taosOpenRef(64, streamBackendCleanup); @@ -50,8 +60,7 @@ static void streamMetaEnvInit() { streamMetaId = taosOpenRef(64, streamMetaCloseImpl); - taosThreadMutexInit(&(gStreamMetaMgt.mutex), NULL); - gStreamMetaMgt.pTable = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); + metaRefMgtInit(); } void streamMetaInit() { taosThreadOnce(&streamMetaModuleInit, streamMetaEnvInit); } @@ -60,9 +69,11 @@ void streamMetaCleanup() { taosCloseRef(streamBackendCfWrapperId); taosCloseRef(streamMetaId); - taosThreadMutexDestroy(&gStreamMetaMgt.mutex); + metaRefMgtCleanup(); +} - void* pIter = taosHashIterate(gStreamMetaMgt.pTable, NULL); +void metaRefMgtCleanup() { + void* pIter = taosHashIterate(gMetaRefMgt.pTable, NULL); while (pIter) { SArray* list = *(SArray**)pIter; for (int i = 0; i < taosArrayGetSize(list); i++) { @@ -70,23 +81,25 @@ void streamMetaCleanup() { taosMemoryFree(rid); } taosArrayDestroy(list); - pIter = taosHashIterate(gStreamMetaMgt.pTable, pIter); + pIter = taosHashIterate(gMetaRefMgt.pTable, pIter); } - taosHashCleanup(gStreamMetaMgt.pTable); + taosHashCleanup(gMetaRefMgt.pTable); + + taosThreadMutexDestroy(&gMetaRefMgt.mutex); } -int32_t streamMetaAddRidToGlobalMgt(int64_t vgId, int64_t* rid) { - taosThreadMutexLock(&gStreamMetaMgt.mutex); - void* p = taosHashGet(gStreamMetaMgt.pTable, &vgId, sizeof(vgId)); +int32_t metaRefMgtAdd(int64_t vgId, int64_t* rid) { + taosThreadMutexLock(&gMetaRefMgt.mutex); + void* p = taosHashGet(gMetaRefMgt.pTable, &vgId, sizeof(vgId)); if (p == NULL) { SArray* list = taosArrayInit(8, sizeof(void*)); taosArrayPush(list, &rid); - taosHashPut(gStreamMetaMgt.pTable, &vgId, sizeof(vgId), &list, sizeof(void*)); + taosHashPut(gMetaRefMgt.pTable, &vgId, sizeof(vgId), &list, sizeof(void*)); } else { SArray* list = *(SArray**)p; taosArrayPush(list, &rid); } - taosThreadMutexUnlock(&gStreamMetaMgt.mutex); + taosThreadMutexUnlock(&gMetaRefMgt.mutex); return 0; } @@ -143,7 +156,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF int64_t* pRid = taosMemoryMalloc(sizeof(int64_t)); *pRid = pMeta->rid; - streamMetaAddRidToGlobalMgt(pMeta->vgId, pRid); + metaRefMgtAdd(pMeta->vgId, pRid); pMeta->hbInfo.hbTmr = taosTmrStart(metaHbToMnode, META_HB_CHECK_INTERVAL, pRid, streamEnv.timer); pMeta->hbInfo.tickCounter = 0; @@ -333,7 +346,8 @@ void extractStreamTaskKey(int64_t* pKey, const SStreamTask* pTask) { int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int64_t* pKey) { int32_t code = tdbTbDelete(pMeta->pTaskDb, pKey, STREAM_TASK_KEY_LEN, pMeta->txn); if (code != 0) { - qError("vgId:%d failed to remove task:0x%x from metastore, code:%s", pMeta->vgId, (int32_t)pKey[1], tstrerror(terrno)); + qError("vgId:%d failed to remove task:0x%x from metastore, code:%s", pMeta->vgId, (int32_t)pKey[1], + tstrerror(terrno)); } else { qDebug("vgId:%d remove task:0x%x from metastore", pMeta->vgId, (int32_t)pKey[1]); } @@ -728,9 +742,9 @@ void metaHbToMnode(void* param, void* tmrId) { hbMsg.pTaskStatus = taosArrayInit(numOfTasks, sizeof(STaskStatusEntry)); for (int32_t i = 0; i < numOfTasks; ++i) { - SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, i); - int64_t keys[2] = {pId->streamId, pId->taskId}; - SStreamTask** pTask = taosHashGet(pMeta->pTasks, keys, sizeof(keys)); + SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, i); + int64_t keys[2] = {pId->streamId, pId->taskId}; + SStreamTask** pTask = taosHashGet(pMeta->pTasks, keys, sizeof(keys)); if ((*pTask)->info.fillHistory == 1) { continue; @@ -834,7 +848,7 @@ void streamMetaNotifyClose(SStreamMeta* pMeta) { // wait for the stream meta hb function stopping pMeta->hbInfo.stopFlag = STREAM_META_WILL_STOP; - while(pMeta->hbInfo.stopFlag != STREAM_META_OK_TO_STOP) { + while (pMeta->hbInfo.stopFlag != STREAM_META_OK_TO_STOP) { taosMsleep(100); qDebug("vgId:%d wait for meta to stop timer", pMeta->vgId); }