wal add sync info

This commit is contained in:
Liu Jicong 2022-03-26 15:36:49 +08:00
parent 25344869a9
commit 7bd71bcb81
3 changed files with 41 additions and 29 deletions

View File

@ -61,30 +61,41 @@ extern "C" {
} \ } \
} }
#define WAL_HEAD_VER 0 #define WAL_HEAD_VER 0
#define WAL_NOSUFFIX_LEN 20 #define WAL_NOSUFFIX_LEN 20
#define WAL_SUFFIX_AT (WAL_NOSUFFIX_LEN + 1) #define WAL_SUFFIX_AT (WAL_NOSUFFIX_LEN + 1)
#define WAL_LOG_SUFFIX "log" #define WAL_LOG_SUFFIX "log"
#define WAL_INDEX_SUFFIX "idx" #define WAL_INDEX_SUFFIX "idx"
#define WAL_REFRESH_MS 1000 #define WAL_REFRESH_MS 1000
#define WAL_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead)) #define WAL_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead))
#define WAL_PATH_LEN (TSDB_FILENAME_LEN + 12) #define WAL_PATH_LEN (TSDB_FILENAME_LEN + 12)
#define WAL_FILE_LEN (WAL_PATH_LEN + 32) #define WAL_FILE_LEN (WAL_PATH_LEN + 32)
#define WAL_MAGIC 0xFAFBFCFDULL #define WAL_MAGIC 0xFAFBFCFDULL
#define WAL_CUR_FAILED 1 #define WAL_CUR_FAILED 1
#pragma pack(push, 1) #pragma pack(push, 1)
typedef enum { TAOS_WAL_NOLOG = 0, TAOS_WAL_WRITE = 1, TAOS_WAL_FSYNC = 2 } EWalType; typedef enum { TAOS_WAL_NOLOG = 0, TAOS_WAL_WRITE = 1, TAOS_WAL_FSYNC = 2 } EWalType;
// used by sync module
typedef struct {
int8_t isWeek;
uint64_t seqNum;
uint64_t term;
} SSyncInfo;
typedef struct SWalReadHead { typedef struct SWalReadHead {
int8_t headVer; int8_t headVer;
int16_t msgType;
int8_t reserved; int8_t reserved;
int16_t msgType;
int32_t len; int32_t len;
int64_t ingestTs; // not implemented int64_t ingestTs; // not implemented
int64_t version; int64_t version;
char body[];
// sync info
SSyncInfo syncInfo;
char body[];
} SWalReadHead; } SWalReadHead;
typedef struct { typedef struct {
@ -117,16 +128,16 @@ typedef struct SWal {
SWalCfg cfg; SWalCfg cfg;
int32_t fsyncSeq; int32_t fsyncSeq;
// meta // meta
SWalVer vers; SWalVer vers;
TdFilePtr pWriteLogTFile; TdFilePtr pWriteLogTFile;
TdFilePtr pWriteIdxTFile; TdFilePtr pWriteIdxTFile;
int32_t writeCur; int32_t writeCur;
SArray *fileInfoSet; SArray *fileInfoSet;
// status // status
int64_t totSize; int64_t totSize;
int64_t lastRollSeq; int64_t lastRollSeq;
// ctl // ctl
int64_t refId; int64_t refId;
TdThreadMutex mutex; TdThreadMutex mutex;
// path // path
char path[WAL_PATH_LEN]; char path[WAL_PATH_LEN];
@ -158,6 +169,7 @@ int32_t walAlter(SWal *, SWalCfg *pCfg);
void walClose(SWal *); void walClose(SWal *);
// write // write
int64_t walWriteWithSyncInfo(SWal *, int64_t index, tmsg_t msgType, SSyncInfo info, const void *body, int32_t bodyLen);
int64_t walWrite(SWal *, int64_t index, tmsg_t msgType, const void *body, int32_t bodyLen); int64_t walWrite(SWal *, int64_t index, tmsg_t msgType, const void *body, int32_t bodyLen);
void walFsync(SWal *, bool force); void walFsync(SWal *, bool force);

View File

@ -1,14 +0,0 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

View File

@ -253,7 +253,8 @@ static int walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) {
return 0; return 0;
} }
int64_t walWrite(SWal *pWal, int64_t index, tmsg_t msgType, const void *body, int32_t bodyLen) { int64_t walWriteWithSyncInfo(SWal *pWal, int64_t index, tmsg_t msgType, SSyncInfo syncInfo, const void *body,
int32_t bodyLen) {
if (pWal == NULL) return -1; if (pWal == NULL) return -1;
int code = 0; int code = 0;
@ -296,6 +297,10 @@ int64_t walWrite(SWal *pWal, int64_t index, tmsg_t msgType, const void *body, in
int64_t offset = walGetCurFileOffset(pWal); int64_t offset = walGetCurFileOffset(pWal);
pWal->writeHead.head.len = bodyLen; pWal->writeHead.head.len = bodyLen;
pWal->writeHead.head.msgType = msgType; pWal->writeHead.head.msgType = msgType;
// sync info
pWal->writeHead.head.syncInfo = syncInfo;
pWal->writeHead.cksumHead = walCalcHeadCksum(&pWal->writeHead); pWal->writeHead.cksumHead = walCalcHeadCksum(&pWal->writeHead);
pWal->writeHead.cksumBody = walCalcBodyCksum(body, bodyLen); pWal->writeHead.cksumBody = walCalcBodyCksum(body, bodyLen);
@ -332,6 +337,15 @@ int64_t walWrite(SWal *pWal, int64_t index, tmsg_t msgType, const void *body, in
return 0; return 0;
} }
int64_t walWrite(SWal *pWal, int64_t index, tmsg_t msgType, const void *body, int32_t bodyLen) {
SSyncInfo info = {
.isWeek = -1,
.seqNum = UINT64_MAX,
.term = UINT64_MAX,
};
return walWriteWithSyncInfo(pWal, index, msgType, info, body, bodyLen);
}
void walFsync(SWal *pWal, bool forceFsync) { void walFsync(SWal *pWal, bool forceFsync) {
if (forceFsync || (pWal->cfg.level == TAOS_WAL_FSYNC && pWal->cfg.fsyncPeriod == 0)) { if (forceFsync || (pWal->cfg.level == TAOS_WAL_FSYNC && pWal->cfg.fsyncPeriod == 0)) {
wTrace("vgId:%d, fileId:%" PRId64 ".log, do fsync", pWal->cfg.vgId, walGetCurFileFirstVer(pWal)); wTrace("vgId:%d, fileId:%" PRId64 ".log, do fsync", pWal->cfg.vgId, walGetCurFileFirstVer(pWal));