From 5c1ff76082d393fae04374e09e679004a07dc7ad Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 23 Dec 2023 00:58:52 +0800 Subject: [PATCH] fix(stream): set rwlock attr. --- source/libs/stream/src/streamMeta.c | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index eebde87b28..25f0fbf991 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -375,7 +375,13 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF stage); pMeta->rid = taosAddRef(streamMetaId, pMeta); - taosThreadRwlockInit(&pMeta->lock, NULL); + + TdThreadRwlockAttr attr; + taosThreadRwlockAttrInit(&attr); + + pthread_rwlockattr_setkind_np(&attr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP); + taosThreadRwlockInit(&pMeta->lock, &attr); + taosThreadRwlockAttrDestroy(&attr); int64_t* pRid = taosMemoryMalloc(sizeof(int64_t)); memcpy(pRid, &pMeta->rid, sizeof(pMeta->rid)); @@ -1386,6 +1392,7 @@ void streamMetaUpdateStageRole(SStreamMeta* pMeta, int64_t stage, bool isLeader) } pMeta->role = (isLeader)? NODE_ROLE_LEADER:NODE_ROLE_FOLLOWER; + streamMetaWUnLock(pMeta); if (isLeader) { stInfo("vgId:%d update meta stage:%" PRId64 ", prev:%" PRId64 " leader:%d, start to send Hb", pMeta->vgId, @@ -1395,8 +1402,6 @@ void streamMetaUpdateStageRole(SStreamMeta* pMeta, int64_t stage, bool isLeader) stInfo("vgId:%d update meta stage:%" PRId64 " prev:%" PRId64 " leader:%d sendMsg beforeClosing:%d", pMeta->vgId, prevStage, stage, isLeader, pMeta->sendMsgBeforeClosing); } - - streamMetaWUnLock(pMeta); } int32_t streamMetaStopAllTasks(SStreamMeta* pMeta) {