fix(sync): fix AddressSanitizer error: TD-20372
This commit is contained in:
parent
c20627c1d6
commit
9b58176c58
|
@ -1832,6 +1832,9 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) {
|
||||||
SElectTimer* pElectTimer = param;
|
SElectTimer* pElectTimer = param;
|
||||||
SSyncNode* pNode = pElectTimer->pSyncNode;
|
SSyncNode* pNode = pElectTimer->pSyncNode;
|
||||||
|
|
||||||
|
if (pNode == NULL) return;
|
||||||
|
if (pNode->syncEqMsg == NULL) return;
|
||||||
|
|
||||||
SRpcMsg rpcMsg = {0};
|
SRpcMsg rpcMsg = {0};
|
||||||
int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_ELECTION, pElectTimer->logicClock, pNode->electTimerMS, pNode);
|
int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_ELECTION, pElectTimer->logicClock, pNode->electTimerMS, pNode);
|
||||||
|
|
||||||
|
|
|
@ -197,7 +197,12 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr
|
||||||
syncMeta.isWeek = pEntry->isWeak;
|
syncMeta.isWeek = pEntry->isWeak;
|
||||||
syncMeta.seqNum = pEntry->seqNum;
|
syncMeta.seqNum = pEntry->seqNum;
|
||||||
syncMeta.term = pEntry->term;
|
syncMeta.term = pEntry->term;
|
||||||
|
|
||||||
|
int64_t tsWriteBegin = taosGetTimestampMs();
|
||||||
index = walAppendLog(pWal, pEntry->originalRpcType, syncMeta, pEntry->data, pEntry->dataLen);
|
index = walAppendLog(pWal, pEntry->originalRpcType, syncMeta, pEntry->data, pEntry->dataLen);
|
||||||
|
int64_t tsWriteEnd = taosGetTimestampMs();
|
||||||
|
int64_t tsElapsed = tsWriteEnd - tsWriteBegin;
|
||||||
|
|
||||||
if (index < 0) {
|
if (index < 0) {
|
||||||
int32_t err = terrno;
|
int32_t err = terrno;
|
||||||
const char* errStr = tstrerror(err);
|
const char* errStr = tstrerror(err);
|
||||||
|
@ -210,8 +215,8 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr
|
||||||
}
|
}
|
||||||
pEntry->index = index;
|
pEntry->index = index;
|
||||||
|
|
||||||
sNTrace(pData->pSyncNode, "write index:%" PRId64 ", type:%s, origin type:%s", pEntry->index,
|
sNTrace(pData->pSyncNode, "write index:%" PRId64 ", type:%s, origin type:%s, elapsed:%" PRId64, pEntry->index,
|
||||||
TMSG_INFO(pEntry->msgType), TMSG_INFO(pEntry->originalRpcType));
|
TMSG_INFO(pEntry->msgType), TMSG_INFO(pEntry->originalRpcType), tsElapsed);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -236,7 +241,11 @@ int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncR
|
||||||
|
|
||||||
taosThreadMutexLock(&(pData->mutex));
|
taosThreadMutexLock(&(pData->mutex));
|
||||||
|
|
||||||
|
int64_t tsBegin = taosGetTimestampMs();
|
||||||
code = walReadVer(pWalHandle, index);
|
code = walReadVer(pWalHandle, index);
|
||||||
|
int64_t tsEnd = taosGetTimestampMs();
|
||||||
|
int64_t tsElapsed = tsEnd - tsBegin;
|
||||||
|
|
||||||
// code = walReadVerCached(pWalHandle, index);
|
// code = walReadVerCached(pWalHandle, index);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
int32_t err = terrno;
|
int32_t err = terrno;
|
||||||
|
|
|
@ -153,14 +153,16 @@ int32_t syncNodeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftI
|
||||||
|
|
||||||
// save index, otherwise pMsg will be free by rpc
|
// save index, otherwise pMsg will be free by rpc
|
||||||
SyncIndex saveLastSendIndex = pState->lastSendIndex;
|
SyncIndex saveLastSendIndex = pState->lastSendIndex;
|
||||||
|
bool update = false;
|
||||||
if (pMsg->dataLen > 0) {
|
if (pMsg->dataLen > 0) {
|
||||||
saveLastSendIndex = pMsg->prevLogIndex + 1;
|
saveLastSendIndex = pMsg->prevLogIndex + 1;
|
||||||
|
update = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
syncLogSendAppendEntries(pSyncNode, pMsg, "");
|
syncLogSendAppendEntries(pSyncNode, pMsg, "");
|
||||||
syncNodeSendMsgById(destRaftId, pSyncNode, pRpcMsg);
|
syncNodeSendMsgById(destRaftId, pSyncNode, pRpcMsg);
|
||||||
|
|
||||||
if (pMsg->dataLen > 0) {
|
if (update) {
|
||||||
pState->lastSendIndex = saveLastSendIndex;
|
pState->lastSendIndex = saveLastSendIndex;
|
||||||
pState->lastSendTime = taosGetTimestampMs();
|
pState->lastSendTime = taosGetTimestampMs();
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue