refactor(sync): hold wal handle with log store

This commit is contained in:
Minghao Li 2022-06-23 14:58:52 +08:00
parent cdb6b2429a
commit 00c0926c44
4 changed files with 60 additions and 34 deletions

View File

@ -26,10 +26,12 @@ extern "C" {
#include "syncInt.h" #include "syncInt.h"
#include "syncRaftEntry.h" #include "syncRaftEntry.h"
#include "taosdef.h" #include "taosdef.h"
#include "wal.h"
typedef struct SSyncLogStoreData { typedef struct SSyncLogStoreData {
SSyncNode* pSyncNode; SSyncNode* pSyncNode;
SWal* pWal; SWal* pWal;
SWalReadHandle* pWalHandle;
SyncIndex beginIndex; // valid begin index, default 0, may be set beginIndex > 0 SyncIndex beginIndex; // valid begin index, default 0, may be set beginIndex > 0
} SSyncLogStoreData; } SSyncLogStoreData;

View File

@ -1311,6 +1311,10 @@ void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) {
SyncIndex logBeginIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore); SyncIndex logBeginIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore);
char* pCfgStr = syncCfg2SimpleStr(&(pSyncNode->pRaftCfg->cfg)); char* pCfgStr = syncCfg2SimpleStr(&(pSyncNode->pRaftCfg->cfg));
char* printStr = "";
if (pCfgStr != NULL) {
printStr = pCfgStr;
}
if (userStrLen < 256) { if (userStrLen < 256) {
char logBuf[256 + 256]; char logBuf[256 + 256];
@ -1322,7 +1326,7 @@ void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) {
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm, pSyncNode->vgId, syncUtilState2String(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm,
pSyncNode->commitIndex, logBeginIndex, logLastIndex, snapshot.lastApplyIndex, pSyncNode->commitIndex, logBeginIndex, logLastIndex, snapshot.lastApplyIndex,
pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex, pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex,
pSyncNode->changing, pCfgStr); pSyncNode->changing, printStr);
} else { } else {
snprintf(logBuf, sizeof(logBuf), "%s", str); snprintf(logBuf, sizeof(logBuf), "%s", str);
} }
@ -1339,7 +1343,7 @@ void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) {
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm, pSyncNode->vgId, syncUtilState2String(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm,
pSyncNode->commitIndex, logBeginIndex, logLastIndex, snapshot.lastApplyIndex, pSyncNode->commitIndex, logBeginIndex, logLastIndex, snapshot.lastApplyIndex,
pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex, pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex,
pSyncNode->changing, pCfgStr); pSyncNode->changing, printStr);
} else { } else {
snprintf(s, len, "%s", str); snprintf(s, len, "%s", str);
} }

View File

@ -107,6 +107,7 @@ char *syncCfg2Str(SSyncCfg *pSyncCfg) {
} }
char *syncCfg2SimpleStr(SSyncCfg *pSyncCfg) { char *syncCfg2SimpleStr(SSyncCfg *pSyncCfg) {
if (pSyncCfg != NULL) {
int32_t len = 512; int32_t len = 512;
char *s = taosMemoryMalloc(len); char *s = taosMemoryMalloc(len);
memset(s, 0, len); memset(s, 0, len);
@ -127,6 +128,9 @@ char *syncCfg2SimpleStr(SSyncCfg *pSyncCfg) {
strcpy(p - 2, "}"); strcpy(p - 2, "}");
return s; return s;
}
return NULL;
} }
int32_t syncCfgFromJson(const cJSON *pRoot, SSyncCfg *pSyncCfg) { int32_t syncCfgFromJson(const cJSON *pRoot, SSyncCfg *pSyncCfg) {

View File

@ -16,7 +16,6 @@
#include "syncRaftLog.h" #include "syncRaftLog.h"
#include "syncRaftCfg.h" #include "syncRaftCfg.h"
#include "syncRaftStore.h" #include "syncRaftStore.h"
#include "wal.h"
// refactor, log[0 .. n] ==> log[m .. n] // refactor, log[0 .. n] ==> log[m .. n]
static int32_t raftLogSetBeginIndex(struct SSyncLogStore* pLogStore, SyncIndex beginIndex); static int32_t raftLogSetBeginIndex(struct SSyncLogStore* pLogStore, SyncIndex beginIndex);
@ -233,7 +232,8 @@ static int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index,
*ppEntry = NULL; *ppEntry = NULL;
SWalReadHandle* pWalHandle = walOpenReadHandle(pWal); // SWalReadHandle* pWalHandle = walOpenReadHandle(pWal);
SWalReadHandle* pWalHandle = pData->pWalHandle;
if (pWalHandle == NULL) { if (pWalHandle == NULL) {
return -1; return -1;
} }
@ -256,9 +256,11 @@ static int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index,
} }
} while (0); } while (0);
/*
int32_t saveErr = terrno; int32_t saveErr = terrno;
walCloseReadHandle(pWalHandle); walCloseReadHandle(pWalHandle);
terrno = saveErr; terrno = saveErr;
*/
return code; return code;
} }
@ -274,9 +276,11 @@ static int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index,
ASSERT((*ppEntry)->dataLen == pWalHandle->pHead->head.bodyLen); ASSERT((*ppEntry)->dataLen == pWalHandle->pHead->head.bodyLen);
memcpy((*ppEntry)->data, pWalHandle->pHead->head.body, pWalHandle->pHead->head.bodyLen); memcpy((*ppEntry)->data, pWalHandle->pHead->head.body, pWalHandle->pHead->head.bodyLen);
/*
int32_t saveErr = terrno; int32_t saveErr = terrno;
walCloseReadHandle(pWalHandle); walCloseReadHandle(pWalHandle);
terrno = saveErr; terrno = saveErr;
*/
return code; return code;
} }
@ -319,6 +323,10 @@ SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) {
SSyncLogStoreData* pData = pLogStore->data; SSyncLogStoreData* pData = pLogStore->data;
pData->pSyncNode = pSyncNode; pData->pSyncNode = pSyncNode;
pData->pWal = pSyncNode->pWal; pData->pWal = pSyncNode->pWal;
ASSERT(pData->pWal != NULL);
pData->pWalHandle = walOpenReadHandle(pData->pWal);
ASSERT(pData->pWalHandle != NULL);
SyncIndex firstVer = walGetFirstVer(pData->pWal); SyncIndex firstVer = walGetFirstVer(pData->pWal);
SyncIndex lastVer = walGetLastVer(pData->pWal); SyncIndex lastVer = walGetLastVer(pData->pWal);
@ -357,6 +365,11 @@ SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) {
void logStoreDestory(SSyncLogStore* pLogStore) { void logStoreDestory(SSyncLogStore* pLogStore) {
if (pLogStore != NULL) { if (pLogStore != NULL) {
SSyncLogStoreData* pData = pLogStore->data;
if (pData->pWalHandle != NULL) {
walCloseReadHandle(pData->pWalHandle);
}
taosMemoryFree(pLogStore->data); taosMemoryFree(pLogStore->data);
taosMemoryFree(pLogStore); taosMemoryFree(pLogStore);
} }
@ -405,7 +418,8 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) {
SWal* pWal = pData->pWal; SWal* pWal = pData->pWal;
if (index >= SYNC_INDEX_BEGIN && index <= logStoreLastIndex(pLogStore)) { if (index >= SYNC_INDEX_BEGIN && index <= logStoreLastIndex(pLogStore)) {
SWalReadHandle* pWalHandle = walOpenReadHandle(pWal); // SWalReadHandle* pWalHandle = walOpenReadHandle(pWal);
SWalReadHandle* pWalHandle = pData->pWalHandle;
ASSERT(pWalHandle != NULL); ASSERT(pWalHandle != NULL);
int32_t code = walReadWithHandle(pWalHandle, index); int32_t code = walReadWithHandle(pWalHandle, index);
@ -441,9 +455,11 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) {
ASSERT(pEntry->dataLen == pWalHandle->pHead->head.bodyLen); ASSERT(pEntry->dataLen == pWalHandle->pHead->head.bodyLen);
memcpy(pEntry->data, pWalHandle->pHead->head.body, pWalHandle->pHead->head.bodyLen); memcpy(pEntry->data, pWalHandle->pHead->head.body, pWalHandle->pHead->head.bodyLen);
/*
int32_t saveErr = terrno; int32_t saveErr = terrno;
walCloseReadHandle(pWalHandle); walCloseReadHandle(pWalHandle);
terrno = saveErr; terrno = saveErr;
*/
return pEntry; return pEntry;