Merge pull request #8860 from taosdata/feature/tq

refactor wal
This commit is contained in:
Liu Jicong 2021-11-30 16:44:50 +08:00 committed by GitHub
commit e123467221
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 152 additions and 92 deletions

View File

@ -18,6 +18,7 @@
#include "os.h"
#include "tdef.h"
#include "tlog.h"
#include "tarray.h"
#ifdef __cplusplus
extern "C" {
#endif
@ -39,12 +40,14 @@ typedef enum {
typedef struct {
int8_t sver;
int8_t reserved[3];
uint8_t msgType;
int8_t reserved[2];
int32_t len;
int64_t version;
uint32_t signature;
uint32_t cksum;
char cont[];
uint32_t cksumHead;
uint32_t cksumBody;
//char cont[];
} SWalHead;
typedef struct {
@ -54,16 +57,20 @@ typedef struct {
} SWalCfg;
#define WAL_PREFIX "wal"
#define WAL_PREFIX_LEN 3
#define WAL_NOSUFFIX_LEN 20
#define WAL_SUFFIX_AT (WAL_NOSUFFIX_LEN+1)
#define WAL_LOG_SUFFIX "log"
#define WAL_INDEX_SUFFIX "idx"
#define WAL_PREFIX_LEN 3
#define WAL_REFRESH_MS 1000
#define WAL_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead) + 16)
#define WAL_SIGNATURE ((uint32_t)(0xFAFBFDFEUL))
#define WAL_PATH_LEN (TSDB_FILENAME_LEN + 12)
#define WAL_FILE_LEN (WAL_PATH_LEN + 32)
//#define WAL_FILE_NUM 1 // 3
#define WAL_FILESET_MAX 128
#define WAL_IDX_ENTRY_SIZE (sizeof(int64_t)*2)
#define WAL_CUR_POS_READ_ONLY 1
#define WAL_CUR_FILE_READ_ONLY 2
@ -94,6 +101,10 @@ typedef struct SWal {
pthread_mutex_t mutex;
//path
char path[WAL_PATH_LEN];
//file set
SArray* fileSet;
//reusable write head
SWalHead head;
} SWal; // WAL HANDLE
typedef int32_t (*FWalWrite)(void *ahandle, void *pHead);
@ -104,21 +115,20 @@ void walCleanUp();
// handle open and ctl
SWal *walOpen(const char *path, SWalCfg *pCfg);
void walStop(SWal *pWal);
int32_t walAlter(SWal *, SWalCfg *pCfg);
void walClose(SWal *);
// write
//int64_t walWriteWithMsgType(SWal*, int8_t msgType, void* body, int32_t bodyLen);
int64_t walWrite(SWal *, int64_t index, void *body, int32_t bodyLen);
int64_t walWriteBatch(SWal *, void **bodies, int32_t *bodyLen, int32_t batchSize);
int64_t walWrite(SWal *, int64_t index, uint8_t msgType, void *body, int32_t bodyLen);
//int64_t walWriteBatch(SWal *, void **bodies, int32_t *bodyLen, int32_t batchSize);
// apis for lifecycle management
void walFsync(SWal *, bool force);
int32_t walCommit(SWal *, int64_t ver);
// truncate after
int32_t walRollback(SWal *, int64_t ver);
// notify that previous log can be pruned safely
// notify that previous logs can be pruned safely
int32_t walTakeSnapshot(SWal *, int64_t ver);
//int32_t walDataCorrupted(SWal*);
@ -131,11 +141,6 @@ int64_t walGetFirstVer(SWal *);
int64_t walGetSnapshotVer(SWal *);
int64_t walGetLastVer(SWal *);
//internal
int32_t walGetNextFile(SWal *pWal, int64_t *nextFileId);
int32_t walGetOldFile(SWal *pWal, int64_t curFileId, int32_t minDiff, int64_t *oldFileId);
int32_t walGetNewFile(SWal *pWal, int64_t *newFileId);
#ifdef __cplusplus
}
#endif

View File

@ -20,16 +20,42 @@
#include "tfile.h"
#include "walInt.h"
int walSetCurVerImpl(SWal *pWal, int64_t ver) {
int walSeekVerImpl(SWal *pWal, int64_t ver) {
//close old file
//iterate all files
//open right file
int code = 0;
code = tfClose(pWal->curLogTfd);
if(code != 0) {
//TODO
}
code = tfClose(pWal->curIdxTfd);
if(code != 0) {
//TODO
}
//bsearch in fileSet
int fName = 0;//TODO
//open the right file
char fNameStr[WAL_FILE_LEN];
sprintf(fNameStr, "%d."WAL_INDEX_SUFFIX, fName);
bool closed = 1; //TODO:read only
int64_t idxTfd = tfOpenReadWrite(fNameStr);
sprintf(fNameStr, "%d."WAL_LOG_SUFFIX, fName);
int64_t logTfd = tfOpenReadWrite(fNameStr);
//seek position
int64_t offset = (ver - fName) * WAL_IDX_ENTRY_SIZE;
tfLseek(idxTfd, offset, SEEK_SET);
//set cur version, cur file version and cur status
return 0;
pWal->curFileFirstVersion = fName;
pWal->curFileLastVersion = 1;//TODO
pWal->curLogTfd = logTfd;
pWal->curIdxTfd = idxTfd;
pWal->curVersion = ver;
pWal->curOffset = offset;
pWal->curStatus = 0;//TODO
return code;
}
int walSetCurVer(SWal *pWal, int64_t ver) {
if(ver > pWal->lastVersion + 1) {
int walSeekVer(SWal *pWal, int64_t ver) {
if(ver > pWal->lastVersion) {
//TODO: some records are skipped
return -1;
}
@ -40,13 +66,19 @@ int walSetCurVer(SWal *pWal, int64_t ver) {
if(ver < pWal->snapshotVersion) {
//TODO: seek snapshotted log
}
if(ver >= pWal->curFileFirstVersion
&& ((pWal->curFileLastVersion == -1 && ver <= pWal->lastVersion) || (ver <= pWal->curFileLastVersion))) {
}
if(ver < pWal->curFileFirstVersion || (pWal->curFileLastVersion != -1 && ver > pWal->curFileLastVersion)) {
int index = 0;
index = 1;
//back up to avoid inconsistency
int64_t curVersion = pWal->curVersion;
int64_t curOffset = pWal->curOffset;
int64_t curFileFirstVersion = pWal->curFileFirstVersion;
int64_t curFileLastVersion = pWal->curFileLastVersion;
if(walSetCurVerImpl(pWal, ver) < 0) {
if(walSeekVerImpl(pWal, ver) < 0) {
//TODO: errno
pWal->curVersion = curVersion;
pWal->curOffset = curOffset;
@ -67,7 +99,7 @@ int walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) {
wError("vgId:%d, file:%"PRId64".idx, failed to open since %s", pWal->vgId, pWal->curFileFirstVersion, strerror(errno));
}
if(pWal->curVersion != ver) {
if(walSetCurVer(pWal, ver) != 0) {
if(walSeekVer(pWal, ver) != 0) {
//TODO: some records are skipped
return -1;
}

View File

@ -18,8 +18,17 @@
#include "taoserror.h"
#include "tref.h"
#include "tfile.h"
#include "compare.h"
#include "walInt.h"
//internal
int32_t walGetNextFile(SWal *pWal, int64_t *nextFileId);
int32_t walGetOldFile(SWal *pWal, int64_t curFileId, int32_t minDiff, int64_t *oldFileId);
int32_t walGetNewFile(SWal *pWal, int64_t *newFileId);
static pthread_mutex_t walInitLock = PTHREAD_MUTEX_INITIALIZER;
static int8_t walInited = 0;
typedef struct {
int32_t refSetId;
int32_t seq;
@ -35,11 +44,21 @@ static int32_t walInitObj(SWal *pWal);
static void walFreeObj(void *pWal);
int32_t walInit() {
//TODO: change to atomic
pthread_mutex_lock(&walInitLock);
if(walInited) {
pthread_mutex_unlock(&walInitLock);
return 0;
} else {
walInited = 1;
pthread_mutex_unlock(&walInitLock);
}
int32_t code = 0;
tsWal.refSetId = taosOpenRef(TSDB_MIN_VNODES, walFreeObj);
code = pthread_mutex_init(&tsWal.mutex, NULL);
if (code) {
if (code != 0) {
wError("failed to init wal mutex since %s", tstrerror(code));
return code;
}
@ -61,6 +80,27 @@ void walCleanUp() {
wInfo("wal module is cleaned up");
}
static int walLoadFileset(SWal *pWal) {
DIR *dir = opendir(pWal->path);
if (dir == NULL) {
wError("vgId:%d, path:%s, failed to open since %s", pWal->vgId, pWal->path, strerror(errno));
return -1;
}
struct dirent* ent;
while ((ent = readdir(dir)) != NULL) {
char *name = ent->d_name;
name[WAL_NOSUFFIX_LEN] = 0;
//validate file name by regex matching
if(1 /* regex match */) {
int64_t fnameInt64 = atoll(name);
taosArrayPush(pWal->fileSet, &fnameInt64);
}
}
taosArraySort(pWal->fileSet, compareInt64Val);
return 0;
}
SWal *walOpen(const char *path, SWalCfg *pCfg) {
SWal *pWal = malloc(sizeof(SWal));
if (pWal == NULL) {
@ -70,9 +110,13 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
pWal->vgId = pCfg->vgId;
pWal->curLogTfd = -1;
/*pWal->curFileId = -1;*/
pWal->curIdxTfd = -1;
pWal->level = pCfg->walLevel;
pWal->fsyncPeriod = pCfg->fsyncPeriod;
memset(&pWal->head, 0, sizeof(SWalHead));
pWal->head.sver = 0;
tstrncpy(pWal->path, path, sizeof(pWal->path));
pthread_mutex_init(&pWal->mutex, NULL);
@ -129,6 +173,11 @@ static int32_t walInitObj(SWal *pWal) {
wError("vgId:%d, path:%s, failed to create directory since %s", pWal->vgId, pWal->path, strerror(errno));
return TAOS_SYSTEM_ERROR(errno);
}
pWal->fileSet = taosArrayInit(0, sizeof(int64_t));
if(pWal->fileSet == NULL) {
wError("vgId:%d, path:%s, failed to init taosArray %s", pWal->vgId, pWal->path, strerror(errno));
return TAOS_SYSTEM_ERROR(errno);
}
wDebug("vgId:%d, object is initialized", pWal->vgId);
return 0;

View File

@ -14,18 +14,13 @@
*/
#include "wal.h"
#include "tchecksum.h"
int32_t walCommit(SWal *pWal, int64_t ver) {
return 0;
static int walValidateChecksum(SWalHead *pHead, void* body, int64_t bodyLen) {
return taosCheckChecksum((uint8_t*)pHead, sizeof(SWalHead) - sizeof(uint32_t)*2, pHead->cksumHead) &&
taosCheckChecksum(body, bodyLen, pHead->cksumBody);
}
int32_t walRollback(SWal *pWal, int64_t ver) {
return 0;
}
int32_t walTakeSnapshot(SWal *pWal, int64_t ver) {
return 0;
}
int32_t walRead(SWal *pWal, SWalHead **ppHead, int64_t ver) {
return 0;
@ -36,13 +31,16 @@ int32_t walReadWithFp(SWal *pWal, FWalWrite writeFp, int64_t verStart, int32_t r
}
int64_t walGetFirstVer(SWal *pWal) {
return 0;
if (pWal == NULL) return 0;
return pWal->firstVersion;
}
int64_t walGetSnapshotVer(SWal *pWal) {
return 0;
int64_t walGetSnaphostVer(SWal *pWal) {
if (pWal == NULL) return 0;
return pWal->snapshotVersion;
}
int64_t walGetLastVer(SWal *pWal) {
return 0;
if (pWal == NULL) return 0;
return pWal->lastVersion;
}

View File

@ -21,6 +21,18 @@
#include "tfile.h"
#include "walInt.h"
int32_t walCommit(SWal *pWal, int64_t ver) {
return 0;
}
int32_t walRollback(SWal *pWal, int64_t ver) {
return 0;
}
int32_t walTakeSnapshot(SWal *pWal, int64_t ver) {
return 0;
}
#if 0
static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, char *name, int64_t fileId);
@ -112,60 +124,40 @@ void walRemoveAllOldFiles(void *handle) {
}
#endif
static void walUpdateChecksum(SWalHead *pHead) {
pHead->sver = 2;
pHead->cksum = taosCalcChecksum(0, (uint8_t *)pHead, sizeof(SWalHead) + pHead->len);
}
static int walValidateChecksum(SWalHead *pHead) {
if (pHead->sver == 0) { // for compatible with wal before sver 1
return taosCheckChecksumWhole((uint8_t *)pHead, sizeof(*pHead));
} else if (pHead->sver >= 1) {
uint32_t cksum = pHead->cksum;
pHead->cksum = 0;
return taosCheckChecksum((uint8_t *)pHead, sizeof(*pHead) + pHead->len, cksum);
}
return 0;
}
int64_t walWrite(SWal *pWal, int64_t index, void *body, int32_t bodyLen) {
int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, void *body, int32_t bodyLen) {
if (pWal == NULL) return -1;
SWalHead *pHead = malloc(sizeof(SWalHead) + bodyLen);
if(pHead == NULL) {
return -1;
}
pHead->version = index;
int32_t code = 0;
// no wal
if (!tfValid(pWal->curLogTfd)) return 0;
if (pWal->level == TAOS_WAL_NOLOG) return 0;
if (pHead->version <= pWal->curVersion) return 0;
if (index > pWal->lastVersion + 1) return -1;
pHead->signature = WAL_SIGNATURE;
pHead->len = bodyLen;
memcpy(pHead->cont, body, bodyLen);
pWal->head.version = index;
int32_t code = 0;
walUpdateChecksum(pHead);
pWal->head.signature = WAL_SIGNATURE;
pWal->head.len = bodyLen;
pWal->head.msgType = msgType;
int32_t contLen = pHead->len + sizeof(SWalHead);
pWal->head.cksumHead = taosCalcChecksum(0, (const uint8_t*)&pWal->head, sizeof(SWalHead)- sizeof(uint32_t)*2);
pWal->head.cksumBody = taosCalcChecksum(0, (const uint8_t*)&body, bodyLen);
pthread_mutex_lock(&pWal->mutex);
if (tfWrite(pWal->curLogTfd, pHead, contLen) != contLen) {
if (tfWrite(pWal->curLogTfd, &pWal->head, sizeof(SWalHead)) != sizeof(SWalHead)) {
//ftruncate
code = TAOS_SYSTEM_ERROR(errno);
wError("vgId:%d, file:%"PRId64".log, failed to write since %s", pWal->vgId, pWal->curFileFirstVersion, strerror(errno));
} else {
/*wTrace("vgId:%d, write wal, fileId:%" PRId64 " tfd:%" PRId64 " hver:%" PRId64 " wver:%" PRIu64 " len:%d", pWal->vgId,*/
/*pWal->curFileId, pWal->logTfd, pHead->version, pWal->curVersion, pHead->len);*/
pWal->curVersion = pHead->version;
}
pthread_mutex_unlock(&pWal->mutex);
if (tfWrite(pWal->curLogTfd, &body, bodyLen) != bodyLen) {
//ftruncate
code = TAOS_SYSTEM_ERROR(errno);
wError("vgId:%d, file:%"PRId64".log, failed to write since %s", pWal->vgId, pWal->curFileFirstVersion, strerror(errno));
}
//TODO:write idx
ASSERT(contLen == pHead->len + sizeof(SWalHead));
pthread_mutex_unlock(&pWal->mutex);
return code;
}
@ -254,6 +246,7 @@ static void walFtruncate(SWal *pWal, int64_t tfd, int64_t offset) {
tfFsync(tfd);
}
#if 0
static int32_t walSkipCorruptedRecord(SWal *pWal, SWalHead *pHead, int64_t tfd, int64_t *offset) {
int64_t pos = *offset;
while (1) {
@ -387,21 +380,4 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch
wDebug("vgId:%d, file:%s, it is closed after restore", pWal->vgId, name);
return code;
}
uint64_t walGetVersion(SWal *pWal) {
if (pWal == NULL) return 0;
return pWal->curVersion;
}
// Wal version in slave (dnode1) must be reset.
// Because after the data file is recovered from peer (dnode2), the new file version in dnode1 may become smaller than origin.
// Some new wal record cannot be written to the wal file in dnode1 for wal version not reset, then fversion and the record in wal file may inconsistent,
// At this time, if dnode2 down, dnode1 switched to master. After dnode2 start and restore data from dnode1, data loss will occur
void walResetVersion(SWal *pWal, uint64_t newVer) {
if (pWal == NULL) return;
wInfo("vgId:%d, version reset from %" PRIu64 " to %" PRIu64, pWal->vgId, pWal->curVersion, newVer);
pWal->curVersion = newVer;
}
#endif