fix/remove-sync-heartbeat-lock

This commit is contained in:
dmchen 2025-02-26 16:55:06 +08:00
parent ac056f153f
commit bd492f790e
2 changed files with 14 additions and 3 deletions

View File

@ -40,6 +40,7 @@ typedef struct SSyncLogReplMgr {
int32_t retryBackoff; int32_t retryBackoff;
int32_t peerId; int32_t peerId;
int32_t sendCount; int32_t sendCount;
TdThreadMutex mutex;
} SSyncLogReplMgr; } SSyncLogReplMgr;
typedef struct SSyncLogBufEntry { typedef struct SSyncLogBufEntry {

View File

@ -1140,26 +1140,29 @@ int32_t syncLogReplRecover(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEn
int32_t syncLogReplProcessHeartbeatReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncHeartbeatReply* pMsg) { int32_t syncLogReplProcessHeartbeatReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncHeartbeatReply* pMsg) {
SSyncLogBuffer* pBuf = pNode->pLogBuf; SSyncLogBuffer* pBuf = pNode->pLogBuf;
(void)taosThreadMutexLock(&pBuf->mutex); (void)taosThreadMutexLock(&pMgr->mutex);
if (pMsg->startTime != 0 && pMsg->startTime != pMgr->peerStartTime) { if (pMsg->startTime != 0 && pMsg->startTime != pMgr->peerStartTime) {
sInfo("vgId:%d, reset sync log repl in heartbeat. peer:%" PRIx64 ", start time:%" PRId64 ", old:%" PRId64 "", sInfo("vgId:%d, reset sync log repl in heartbeat. peer:%" PRIx64 ", start time:%" PRId64 ", old:%" PRId64 "",
pNode->vgId, pMsg->srcId.addr, pMsg->startTime, pMgr->peerStartTime); pNode->vgId, pMsg->srcId.addr, pMsg->startTime, pMgr->peerStartTime);
syncLogReplReset(pMgr); syncLogReplReset(pMgr);
pMgr->peerStartTime = pMsg->startTime; pMgr->peerStartTime = pMsg->startTime;
} }
(void)taosThreadMutexUnlock(&pBuf->mutex); (void)taosThreadMutexUnlock(&pMgr->mutex);
return 0; return 0;
} }
int32_t syncLogReplProcessReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg) { int32_t syncLogReplProcessReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg) {
SSyncLogBuffer* pBuf = pNode->pLogBuf; SSyncLogBuffer* pBuf = pNode->pLogBuf;
(void)taosThreadMutexLock(&pBuf->mutex); (void)taosThreadMutexLock(&pMgr->mutex);
if (pMsg->startTime != pMgr->peerStartTime) { if (pMsg->startTime != pMgr->peerStartTime) {
sInfo("vgId:%d, reset sync log repl in appendlog reply. peer:%" PRIx64 ", start time:%" PRId64 ", old:%" PRId64, sInfo("vgId:%d, reset sync log repl in appendlog reply. peer:%" PRIx64 ", start time:%" PRId64 ", old:%" PRId64,
pNode->vgId, pMsg->srcId.addr, pMsg->startTime, pMgr->peerStartTime); pNode->vgId, pMsg->srcId.addr, pMsg->startTime, pMgr->peerStartTime);
syncLogReplReset(pMgr); syncLogReplReset(pMgr);
pMgr->peerStartTime = pMsg->startTime; pMgr->peerStartTime = pMsg->startTime;
} }
(void)taosThreadMutexUnlock(&pMgr->mutex);
(void)taosThreadMutexLock(&pBuf->mutex);
int32_t code = 0; int32_t code = 0;
if (pMgr->restored) { if (pMgr->restored) {
@ -1324,6 +1327,12 @@ SSyncLogReplMgr* syncLogReplCreate() {
return NULL; return NULL;
} }
int32_t code = taosThreadMutexInit(&pMgr->mutex, NULL);
if (code) {
terrno = code;
return NULL;
}
return pMgr; return pMgr;
} }
@ -1331,6 +1340,7 @@ void syncLogReplDestroy(SSyncLogReplMgr* pMgr) {
if (pMgr == NULL) { if (pMgr == NULL) {
return; return;
} }
taosThreadMutexDestroy(&pMgr->mutex);
taosMemoryFree(pMgr); taosMemoryFree(pMgr);
return; return;
} }