fix(stream): set rwlock attr.
This commit is contained in:
parent
66bdace9eb
commit
5c1ff76082
|
@ -375,7 +375,13 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
|
||||||
stage);
|
stage);
|
||||||
|
|
||||||
pMeta->rid = taosAddRef(streamMetaId, pMeta);
|
pMeta->rid = taosAddRef(streamMetaId, pMeta);
|
||||||
taosThreadRwlockInit(&pMeta->lock, NULL);
|
|
||||||
|
TdThreadRwlockAttr attr;
|
||||||
|
taosThreadRwlockAttrInit(&attr);
|
||||||
|
|
||||||
|
pthread_rwlockattr_setkind_np(&attr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP);
|
||||||
|
taosThreadRwlockInit(&pMeta->lock, &attr);
|
||||||
|
taosThreadRwlockAttrDestroy(&attr);
|
||||||
|
|
||||||
int64_t* pRid = taosMemoryMalloc(sizeof(int64_t));
|
int64_t* pRid = taosMemoryMalloc(sizeof(int64_t));
|
||||||
memcpy(pRid, &pMeta->rid, sizeof(pMeta->rid));
|
memcpy(pRid, &pMeta->rid, sizeof(pMeta->rid));
|
||||||
|
@ -1386,6 +1392,7 @@ void streamMetaUpdateStageRole(SStreamMeta* pMeta, int64_t stage, bool isLeader)
|
||||||
}
|
}
|
||||||
|
|
||||||
pMeta->role = (isLeader)? NODE_ROLE_LEADER:NODE_ROLE_FOLLOWER;
|
pMeta->role = (isLeader)? NODE_ROLE_LEADER:NODE_ROLE_FOLLOWER;
|
||||||
|
streamMetaWUnLock(pMeta);
|
||||||
|
|
||||||
if (isLeader) {
|
if (isLeader) {
|
||||||
stInfo("vgId:%d update meta stage:%" PRId64 ", prev:%" PRId64 " leader:%d, start to send Hb", pMeta->vgId,
|
stInfo("vgId:%d update meta stage:%" PRId64 ", prev:%" PRId64 " leader:%d, start to send Hb", pMeta->vgId,
|
||||||
|
@ -1395,8 +1402,6 @@ void streamMetaUpdateStageRole(SStreamMeta* pMeta, int64_t stage, bool isLeader)
|
||||||
stInfo("vgId:%d update meta stage:%" PRId64 " prev:%" PRId64 " leader:%d sendMsg beforeClosing:%d", pMeta->vgId,
|
stInfo("vgId:%d update meta stage:%" PRId64 " prev:%" PRId64 " leader:%d sendMsg beforeClosing:%d", pMeta->vgId,
|
||||||
prevStage, stage, isLeader, pMeta->sendMsgBeforeClosing);
|
prevStage, stage, isLeader, pMeta->sendMsgBeforeClosing);
|
||||||
}
|
}
|
||||||
|
|
||||||
streamMetaWUnLock(pMeta);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamMetaStopAllTasks(SStreamMeta* pMeta) {
|
int32_t streamMetaStopAllTasks(SStreamMeta* pMeta) {
|
||||||
|
|
Loading…
Reference in New Issue