diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 19247166b3..630650025d 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -1631,6 +1631,7 @@ void* taskDbAddRef(void* pTaskDb) { return taosAcquireRef(taskDbWrapperId, pBackend->refId); } void taskDbRemoveRef(void* pTaskDb) { + if (pTaskDb == NULL) return; STaskDbWrapper* pBackend = pTaskDb; taosReleaseRef(taskDbWrapperId, pBackend->refId); } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 0e137a673e..bd23e41a84 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -255,6 +255,8 @@ int32_t streamTaskSetDb(SStreamMeta* pMeta, void* arg, char* key) { pTask->backendRefId = pBackend->refId; pTask->pBackend = pBackend; taosThreadMutexUnlock(&pMeta->backendMutex); + + stDebug("s-task:0x%x set backend %p", pTask->id.taskId, pBackend); return 0; } @@ -272,6 +274,8 @@ int32_t streamTaskSetDb(SStreamMeta* pMeta, void* arg, char* key) { taosHashPut(pMeta->pTaskDbUnique, key, strlen(key), &pBackend, sizeof(void*)); taosThreadMutexUnlock(&pMeta->backendMutex); + + stDebug("s-task:0x%x set backend %p", pTask->id.taskId, pBackend); return 0; } SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId, int64_t stage) {