diff --git a/source/libs/stream/inc/streamBackendRocksdb.h b/source/libs/stream/inc/streamBackendRocksdb.h index bed0f79f02..9bfec5577c 100644 --- a/source/libs/stream/inc/streamBackendRocksdb.h +++ b/source/libs/stream/inc/streamBackendRocksdb.h @@ -79,6 +79,8 @@ typedef struct { TdThreadRwlock chkpDirLock; int64_t dataWritten; + void* pMeta; + } STaskDbWrapper; typedef struct SDbChkp { diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 0df36ec391..f709741b57 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -87,10 +87,10 @@ struct SStreamQueue { int8_t status; }; -extern void* streamTimer; -extern int32_t streamBackendId; -extern int32_t streamBackendCfWrapperId; -extern int32_t taskDbWrapperId; +extern void* streamTimer; +extern int32_t streamBackendId; +extern int32_t streamBackendCfWrapperId; +extern int32_t taskDbWrapperId; void streamRetryDispatchData(SStreamTask* pTask, int64_t waitDuration); int32_t streamDispatchStreamBlock(SStreamTask* pTask); @@ -156,6 +156,8 @@ void* streamQueueNextItem(SStreamQueue* pQueue); void streamFreeQitem(SStreamQueueItem* data); int32_t streamQueueGetItemSize(const SStreamQueue* pQueue); +void streamMetaRemoveDB(void* arg, char* key); + typedef enum UPLOAD_TYPE { UPLOAD_DISABLE = -1, UPLOAD_S3 = 0, diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 630650025d..5d2c7ec504 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -982,8 +982,10 @@ int32_t chkpPreBuildDir(char* path, int64_t chkpId, char** chkpDir, char** chkpI } int32_t taskDbBuildSnap(void* arg, SArray* pSnap) { SStreamMeta* pMeta = arg; - void* pIter = taosHashIterate(pMeta->pTaskDbUnique, NULL); - int32_t code = 0; + + taosThreadMutexLock(&pMeta->backendMutex); + void* pIter = taosHashIterate(pMeta->pTaskDbUnique, NULL); + int32_t code = 0; while (pIter) { STaskDbWrapper* pTaskDb = *(STaskDbWrapper**)pIter; @@ -1000,6 +1002,8 @@ int32_t taskDbBuildSnap(void* arg, SArray* pSnap) { taosArrayPush(pSnap, &snap); pIter = taosHashIterate(pMeta->pTaskDbUnique, pIter); } + taosThreadMutexUnlock(&pMeta->backendMutex); + return code; } int32_t streamBackendAddInUseChkp(void* arg, int64_t chkpId) { @@ -1810,6 +1814,10 @@ STaskDbWrapper* taskDbOpen(char* path, char* key, int64_t chkpId) { void taskDbDestroy(void* pDb, bool flush) { STaskDbWrapper* wrapper = pDb; + if (wrapper == NULL) return; + + streamMetaRemoveDB(wrapper->pMeta, wrapper->idstr); + qDebug("succ to destroy stream backend:%p", wrapper); int8_t nCf = sizeof(ginitDict) / sizeof(ginitDict[0]); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 23cb6f5a35..254b6de5b8 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -250,9 +250,11 @@ int32_t streamTaskSetDb(SStreamMeta* pMeta, void* arg, char* key) { taskDbAddRef(*ppBackend); STaskDbWrapper* pBackend = *ppBackend; + pBackend->pMeta = pMeta; pTask->backendRefId = pBackend->refId; pTask->pBackend = pBackend; + taosThreadMutexUnlock(&pMeta->backendMutex); stDebug("s-task:0x%x set backend %p", pTask->id.taskId, pBackend); @@ -270,6 +272,7 @@ int32_t streamTaskSetDb(SStreamMeta* pMeta, void* arg, char* key) { pTask->pBackend = pBackend; pBackend->refId = tref; pBackend->pTask = pTask; + pBackend->pMeta = pMeta; taosHashPut(pMeta->pTaskDbUnique, key, strlen(key), &pBackend, sizeof(void*)); taosThreadMutexUnlock(&pMeta->backendMutex); @@ -277,6 +280,15 @@ int32_t streamTaskSetDb(SStreamMeta* pMeta, void* arg, char* key) { stDebug("s-task:0x%x set backend %p", pTask->id.taskId, pBackend); return 0; } +void streamMetaRemoveDB(void* arg, char* key) { + if (arg == NULL || key == NULL) return; + + SStreamMeta* pMeta = arg; + taosThreadMutexLock(&pMeta->backendMutex); + taosHashRemove(pMeta->pTaskDbUnique, key, strlen(key)); + + taosThreadMutexUnlock(&pMeta->backendMutex); +} SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId, int64_t stage) { int32_t code = -1; SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta)); @@ -794,7 +806,7 @@ static void doClear(void* pKey, void* pVal, TBC* pCur, SArray* pRecycleList) { } int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) { - TBC* pCur = NULL; + TBC* pCur = NULL; void* pKey = NULL; int32_t kLen = 0; void* pVal = NULL;