fix: use recursive mutex for relocking ring log buffer in syncNodeDoConfigChange

This commit is contained in:
Benguang Zhao 2022-11-29 20:21:33 +08:00
parent 470441c541
commit decb17fcb1
5 changed files with 19 additions and 3 deletions

View File

@ -55,6 +55,7 @@ typedef struct SSyncLogBuffer {
int64_t endIndex; int64_t endIndex;
int64_t size; int64_t size;
TdThreadMutex mutex; TdThreadMutex mutex;
TdThreadMutexAttr attr;
} SSyncLogBuffer; } SSyncLogBuffer;
// SSyncLogRepMgr // SSyncLogRepMgr

View File

@ -84,6 +84,7 @@ void syncOneReplicaAdvance(SSyncNode* pSyncNode) {
} }
void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
ASSERT(false && "deprecated");
if (pSyncNode == NULL) { if (pSyncNode == NULL) {
sError("pSyncNode is NULL"); sError("pSyncNode is NULL");
return; return;

View File

@ -1602,7 +1602,7 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde
// Raft 3.6.2 Committing entries from previous terms // Raft 3.6.2 Committing entries from previous terms
syncNodeAppendNoop(pSyncNode); syncNodeAppendNoop(pSyncNode);
syncMaybeAdvanceCommitIndex(pSyncNode); // syncMaybeAdvanceCommitIndex(pSyncNode);
} else { } else {
syncNodeBecomeFollower(pSyncNode, tmpbuf); syncNodeBecomeFollower(pSyncNode, tmpbuf);

View File

@ -916,11 +916,24 @@ SSyncLogBuffer* syncLogBufferCreate() {
ASSERT(pBuf->size == TSDB_SYNC_LOG_BUFFER_SIZE); ASSERT(pBuf->size == TSDB_SYNC_LOG_BUFFER_SIZE);
if (taosThreadMutexInit(&pBuf->mutex, NULL) < 0) { if (taosThreadMutexAttrInit(&pBuf->attr) < 0) {
sError("failed to init log buffer mutexattr due to %s", strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
if (taosThreadMutexAttrSetType(&pBuf->attr, PTHREAD_MUTEX_RECURSIVE) < 0) {
sError("failed to set log buffer mutexattr type due to %s", strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
if (taosThreadMutexInit(&pBuf->mutex, &pBuf->attr) < 0) {
sError("failed to init log buffer mutex due to %s", strerror(errno)); sError("failed to init log buffer mutex due to %s", strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
goto _err; goto _err;
} }
return pBuf; return pBuf;
_err: _err:
@ -947,6 +960,7 @@ void syncLogBufferDestroy(SSyncLogBuffer* pBuf) {
} }
syncLogBufferClear(pBuf); syncLogBufferClear(pBuf);
(void)taosThreadMutexDestroy(&pBuf->mutex); (void)taosThreadMutexDestroy(&pBuf->mutex);
(void)taosThreadMutexAttrDestroy(&pBuf->attr);
(void)taosMemoryFree(pBuf); (void)taosMemoryFree(pBuf);
return; return;
} }

View File

@ -49,7 +49,7 @@
int32_t syncNodeMaybeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, SRpcMsg* pRpcMsg); int32_t syncNodeMaybeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, SRpcMsg* pRpcMsg);
int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId, bool snapshot) { int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId, bool snapshot) {
ASSERT(false && "deplicated"); ASSERT(false && "deprecated");
// next index // next index
SyncIndex nextIndex = syncIndexMgrGetIndex(pSyncNode->pNextIndex, pDestId); SyncIndex nextIndex = syncIndexMgrGetIndex(pSyncNode->pNextIndex, pDestId);