fix(stream): fix race condition in handling the hbMsg rsp.

This commit is contained in:
Haojun Liao 2024-07-08 16:57:13 +08:00
parent 7cfdf0c14d
commit de7e25f259
1 changed files with 2 additions and 2 deletions

View File

@ -332,7 +332,7 @@ int32_t streamProcessHeartbeatRsp(SStreamMeta* pMeta, SMStreamHbRspMsg* pRsp) {
stDebug("vgId:%d process hbMsg rsp, msgId:%d rsp confirmed", pMeta->vgId, pRsp->msgId); stDebug("vgId:%d process hbMsg rsp, msgId:%d rsp confirmed", pMeta->vgId, pRsp->msgId);
SMetaHbInfo* pInfo = pMeta->pHbInfo; SMetaHbInfo* pInfo = pMeta->pHbInfo;
streamMetaRLock(pMeta); streamMetaWLock(pMeta);
// current waiting rsp recved // current waiting rsp recved
if (pRsp->msgId == pInfo->hbCount) { if (pRsp->msgId == pInfo->hbCount) {
@ -345,6 +345,6 @@ int32_t streamProcessHeartbeatRsp(SStreamMeta* pMeta, SMStreamHbRspMsg* pRsp) {
stWarn("vgId:%d recv expired hb rsp, msgId:%d, discarded", pMeta->vgId, pRsp->msgId); stWarn("vgId:%d recv expired hb rsp, msgId:%d, discarded", pMeta->vgId, pRsp->msgId);
} }
streamMetaRUnLock(pMeta); streamMetaWUnLock(pMeta);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }