Merge pull request #14769 from taosdata/feature/3.0_mhli
refactor(sync): modify append log
This commit is contained in:
commit
a52c089d82
|
@ -201,6 +201,43 @@ static SyncTerm raftLogLastTerm(struct SSyncLogStore* pLogStore) {
|
||||||
return SYNC_TERM_INVALID;
|
return SYNC_TERM_INVALID;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
|
||||||
|
SSyncLogStoreData* pData = pLogStore->data;
|
||||||
|
SWal* pWal = pData->pWal;
|
||||||
|
|
||||||
|
SyncIndex index = 0;
|
||||||
|
SWalSyncInfo syncMeta;
|
||||||
|
syncMeta.isWeek = pEntry->isWeak;
|
||||||
|
syncMeta.seqNum = pEntry->seqNum;
|
||||||
|
syncMeta.term = pEntry->term;
|
||||||
|
index = walAppendLog(pWal, pEntry->originalRpcType, syncMeta, pEntry->data, pEntry->dataLen);
|
||||||
|
if (index < 0) {
|
||||||
|
int32_t err = terrno;
|
||||||
|
const char* errStr = tstrerror(err);
|
||||||
|
int32_t sysErr = errno;
|
||||||
|
const char* sysErrStr = strerror(errno);
|
||||||
|
|
||||||
|
char logBuf[128];
|
||||||
|
snprintf(logBuf, sizeof(logBuf), "wal write error, index:%" PRId64 ", err:%d %X, msg:%s, syserr:%d, sysmsg:%s",
|
||||||
|
pEntry->index, err, err, errStr, sysErr, sysErrStr);
|
||||||
|
syncNodeErrorLog(pData->pSyncNode, logBuf);
|
||||||
|
|
||||||
|
ASSERT(0);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
pEntry->index = index;
|
||||||
|
|
||||||
|
do {
|
||||||
|
char eventLog[128];
|
||||||
|
snprintf(eventLog, sizeof(eventLog), "write index:%" PRId64 ", type:%s,%d, type2:%s,%d", pEntry->index,
|
||||||
|
TMSG_INFO(pEntry->msgType), pEntry->msgType, TMSG_INFO(pEntry->originalRpcType), pEntry->originalRpcType);
|
||||||
|
syncNodeEventLog(pData->pSyncNode, eventLog);
|
||||||
|
} while (0);
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
#if 0
|
||||||
static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
|
static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
|
||||||
SSyncLogStoreData* pData = pLogStore->data;
|
SSyncLogStoreData* pData = pLogStore->data;
|
||||||
SWal* pWal = pData->pWal;
|
SWal* pWal = pData->pWal;
|
||||||
|
@ -243,6 +280,7 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
// entry found, return 0
|
// entry found, return 0
|
||||||
// entry not found, return -1, terrno = TSDB_CODE_WAL_LOG_NOT_EXIST
|
// entry not found, return -1, terrno = TSDB_CODE_WAL_LOG_NOT_EXIST
|
||||||
|
@ -361,6 +399,8 @@ static int32_t raftLogGetLastEntry(SSyncLogStore* pLogStore, SSyncRaftEntry** pp
|
||||||
|
|
||||||
//-------------------------------
|
//-------------------------------
|
||||||
// log[0 .. n]
|
// log[0 .. n]
|
||||||
|
|
||||||
|
#if 0
|
||||||
int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
|
int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
|
||||||
SSyncLogStoreData* pData = pLogStore->data;
|
SSyncLogStoreData* pData = pLogStore->data;
|
||||||
SWal* pWal = pData->pWal;
|
SWal* pWal = pData->pWal;
|
||||||
|
@ -397,6 +437,44 @@ int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
|
||||||
|
SSyncLogStoreData* pData = pLogStore->data;
|
||||||
|
SWal* pWal = pData->pWal;
|
||||||
|
|
||||||
|
SyncIndex index = 0;
|
||||||
|
SWalSyncInfo syncMeta;
|
||||||
|
syncMeta.isWeek = pEntry->isWeak;
|
||||||
|
syncMeta.seqNum = pEntry->seqNum;
|
||||||
|
syncMeta.term = pEntry->term;
|
||||||
|
|
||||||
|
index = walAppendLog(pWal, pEntry->originalRpcType, syncMeta, pEntry->data, pEntry->dataLen);
|
||||||
|
if (index < 0) {
|
||||||
|
int32_t err = terrno;
|
||||||
|
const char* errStr = tstrerror(err);
|
||||||
|
int32_t sysErr = errno;
|
||||||
|
const char* sysErrStr = strerror(errno);
|
||||||
|
|
||||||
|
char logBuf[128];
|
||||||
|
snprintf(logBuf, sizeof(logBuf), "wal write error, index:%" PRId64 ", err:%d %X, msg:%s, syserr:%d, sysmsg:%s",
|
||||||
|
pEntry->index, err, err, errStr, sysErr, sysErrStr);
|
||||||
|
syncNodeErrorLog(pData->pSyncNode, logBuf);
|
||||||
|
|
||||||
|
ASSERT(0);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
pEntry->index = index;
|
||||||
|
|
||||||
|
do {
|
||||||
|
char eventLog[128];
|
||||||
|
snprintf(eventLog, sizeof(eventLog), "write2 index:%" PRId64 ", type:%s,%d, type2:%s,%d", pEntry->index,
|
||||||
|
TMSG_INFO(pEntry->msgType), pEntry->msgType, TMSG_INFO(pEntry->originalRpcType), pEntry->originalRpcType);
|
||||||
|
syncNodeEventLog(pData->pSyncNode, eventLog);
|
||||||
|
} while (0);
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) {
|
SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) {
|
||||||
SSyncLogStoreData* pData = pLogStore->data;
|
SSyncLogStoreData* pData = pLogStore->data;
|
||||||
|
|
Loading…
Reference in New Issue