From f53d458b61d9ce12989d952823e8b2523f59e28c Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 3 Jan 2025 16:09:32 +0800 Subject: [PATCH] fix invalid lock --- source/libs/stream/src/streamMeta.c | 36 ++++++++++++++--------------- 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 3955343fdb..0de256d86d 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -253,7 +253,7 @@ _EXIT: int32_t len = strlen(pMeta->path) + 32; char* state = taosMemoryCalloc(1, len); if (state != NULL) { - (void) snprintf(state, len, "%s%s%s", pMeta->path, TD_DIRSEP, "state"); + (void)snprintf(state, len, "%s%s%s", pMeta->path, TD_DIRSEP, "state"); taosRemoveDir(state); taosMemoryFree(state); } else { @@ -380,7 +380,7 @@ int32_t streamMetaOpen(const char* path, void* ahandle, FTaskBuild buildTaskFn, char* tpath = taosMemoryCalloc(1, len); TSDB_CHECK_NULL(tpath, code, lino, _err, terrno); - (void) snprintf(tpath, len, "%s%s%s", path, TD_DIRSEP, "stream"); + (void)snprintf(tpath, len, "%s%s%s", path, TD_DIRSEP, "stream"); pMeta->path = tpath; code = streamMetaOpenTdb(pMeta); @@ -392,6 +392,22 @@ int32_t streamMetaOpen(const char* path, void* ahandle, FTaskBuild buildTaskFn, TSDB_CHECK_CODE(code, lino, _err); } + // set the attribute when running on Linux OS + TdThreadRwlockAttr attr; + code = taosThreadRwlockAttrInit(&attr); + TSDB_CHECK_CODE(code, lino, _err); + +#ifdef LINUX + code = pthread_rwlockattr_setkind_np(&attr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP); + TSDB_CHECK_CODE(code, lino, _err); +#endif + + code = taosThreadRwlockInit(&pMeta->lock, &attr); + TSDB_CHECK_CODE(code, lino, _err); + + code = taosThreadRwlockAttrDestroy(&attr); + TSDB_CHECK_CODE(code, lino, _err); + if ((code = streamMetaBegin(pMeta) < 0)) { stError("vgId:%d begin trans for stream meta failed", pMeta->vgId); goto _err; @@ -431,22 +447,6 @@ int32_t streamMetaOpen(const char* path, void* ahandle, FTaskBuild buildTaskFn, stInfo("vgId:%d open stream meta succ, latest checkpoint:%" PRId64 ", stage:%" PRId64, vgId, pMeta->chkpId, stage); - // set the attribute when running on Linux OS - TdThreadRwlockAttr attr; - code = taosThreadRwlockAttrInit(&attr); - TSDB_CHECK_CODE(code, lino, _err); - -#ifdef LINUX - code = pthread_rwlockattr_setkind_np(&attr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP); - TSDB_CHECK_CODE(code, lino, _err); -#endif - - code = taosThreadRwlockInit(&pMeta->lock, &attr); - TSDB_CHECK_CODE(code, lino, _err); - - code = taosThreadRwlockAttrDestroy(&attr); - TSDB_CHECK_CODE(code, lino, _err); - code = bkdMgtCreate(tpath, (SBkdMgt**)&pMeta->bkdChkptMgt); TSDB_CHECK_CODE(code, lino, _err);