diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index 4d5b4d977a..e77540bf90 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -18,6 +18,7 @@ #include "os.h" #include "tdef.h" #include "tlog.h" +#include "tarray.h" #ifdef __cplusplus extern "C" { #endif @@ -39,12 +40,14 @@ typedef enum { typedef struct { int8_t sver; - int8_t reserved[3]; + uint8_t msgType; + int8_t reserved[2]; int32_t len; int64_t version; uint32_t signature; - uint32_t cksum; - char cont[]; + uint32_t cksumHead; + uint32_t cksumBody; + //char cont[]; } SWalHead; typedef struct { @@ -54,16 +57,20 @@ typedef struct { } SWalCfg; #define WAL_PREFIX "wal" +#define WAL_PREFIX_LEN 3 +#define WAL_NOSUFFIX_LEN 20 +#define WAL_SUFFIX_AT (WAL_NOSUFFIX_LEN+1) #define WAL_LOG_SUFFIX "log" #define WAL_INDEX_SUFFIX "idx" -#define WAL_PREFIX_LEN 3 #define WAL_REFRESH_MS 1000 #define WAL_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead) + 16) #define WAL_SIGNATURE ((uint32_t)(0xFAFBFDFEUL)) #define WAL_PATH_LEN (TSDB_FILENAME_LEN + 12) #define WAL_FILE_LEN (WAL_PATH_LEN + 32) //#define WAL_FILE_NUM 1 // 3 +#define WAL_FILESET_MAX 128 +#define WAL_IDX_ENTRY_SIZE (sizeof(int64_t)*2) #define WAL_CUR_POS_READ_ONLY 1 #define WAL_CUR_FILE_READ_ONLY 2 @@ -94,6 +101,10 @@ typedef struct SWal { pthread_mutex_t mutex; //path char path[WAL_PATH_LEN]; + //file set + SArray* fileSet; + //reusable write head + SWalHead head; } SWal; // WAL HANDLE typedef int32_t (*FWalWrite)(void *ahandle, void *pHead); @@ -104,21 +115,20 @@ void walCleanUp(); // handle open and ctl SWal *walOpen(const char *path, SWalCfg *pCfg); -void walStop(SWal *pWal); int32_t walAlter(SWal *, SWalCfg *pCfg); void walClose(SWal *); // write //int64_t walWriteWithMsgType(SWal*, int8_t msgType, void* body, int32_t bodyLen); -int64_t walWrite(SWal *, int64_t index, void *body, int32_t bodyLen); -int64_t walWriteBatch(SWal *, void **bodies, int32_t *bodyLen, int32_t batchSize); +int64_t walWrite(SWal *, int64_t index, uint8_t msgType, void *body, int32_t bodyLen); +//int64_t walWriteBatch(SWal *, void **bodies, int32_t *bodyLen, int32_t batchSize); // apis for lifecycle management void walFsync(SWal *, bool force); int32_t walCommit(SWal *, int64_t ver); // truncate after int32_t walRollback(SWal *, int64_t ver); -// notify that previous log can be pruned safely +// notify that previous logs can be pruned safely int32_t walTakeSnapshot(SWal *, int64_t ver); //int32_t walDataCorrupted(SWal*); @@ -131,11 +141,6 @@ int64_t walGetFirstVer(SWal *); int64_t walGetSnapshotVer(SWal *); int64_t walGetLastVer(SWal *); -//internal -int32_t walGetNextFile(SWal *pWal, int64_t *nextFileId); -int32_t walGetOldFile(SWal *pWal, int64_t curFileId, int32_t minDiff, int64_t *oldFileId); -int32_t walGetNewFile(SWal *pWal, int64_t *newFileId); - #ifdef __cplusplus } #endif diff --git a/source/libs/wal/src/walIndex.c b/source/libs/wal/src/walIndex.c index e1fa8c72dd..2569af841f 100644 --- a/source/libs/wal/src/walIndex.c +++ b/source/libs/wal/src/walIndex.c @@ -20,16 +20,42 @@ #include "tfile.h" #include "walInt.h" -int walSetCurVerImpl(SWal *pWal, int64_t ver) { +int walSeekVerImpl(SWal *pWal, int64_t ver) { //close old file - //iterate all files - //open right file + int code = 0; + code = tfClose(pWal->curLogTfd); + if(code != 0) { + //TODO + } + code = tfClose(pWal->curIdxTfd); + if(code != 0) { + //TODO + } + //bsearch in fileSet + int fName = 0;//TODO + //open the right file + char fNameStr[WAL_FILE_LEN]; + sprintf(fNameStr, "%d."WAL_INDEX_SUFFIX, fName); + bool closed = 1; //TODO:read only + int64_t idxTfd = tfOpenReadWrite(fNameStr); + sprintf(fNameStr, "%d."WAL_LOG_SUFFIX, fName); + int64_t logTfd = tfOpenReadWrite(fNameStr); + //seek position + int64_t offset = (ver - fName) * WAL_IDX_ENTRY_SIZE; + tfLseek(idxTfd, offset, SEEK_SET); //set cur version, cur file version and cur status - return 0; + pWal->curFileFirstVersion = fName; + pWal->curFileLastVersion = 1;//TODO + pWal->curLogTfd = logTfd; + pWal->curIdxTfd = idxTfd; + pWal->curVersion = ver; + pWal->curOffset = offset; + pWal->curStatus = 0;//TODO + return code; } -int walSetCurVer(SWal *pWal, int64_t ver) { - if(ver > pWal->lastVersion + 1) { +int walSeekVer(SWal *pWal, int64_t ver) { + if(ver > pWal->lastVersion) { //TODO: some records are skipped return -1; } @@ -40,13 +66,19 @@ int walSetCurVer(SWal *pWal, int64_t ver) { if(ver < pWal->snapshotVersion) { //TODO: seek snapshotted log } + if(ver >= pWal->curFileFirstVersion + && ((pWal->curFileLastVersion == -1 && ver <= pWal->lastVersion) || (ver <= pWal->curFileLastVersion))) { + + } if(ver < pWal->curFileFirstVersion || (pWal->curFileLastVersion != -1 && ver > pWal->curFileLastVersion)) { + int index = 0; + index = 1; //back up to avoid inconsistency int64_t curVersion = pWal->curVersion; int64_t curOffset = pWal->curOffset; int64_t curFileFirstVersion = pWal->curFileFirstVersion; int64_t curFileLastVersion = pWal->curFileLastVersion; - if(walSetCurVerImpl(pWal, ver) < 0) { + if(walSeekVerImpl(pWal, ver) < 0) { //TODO: errno pWal->curVersion = curVersion; pWal->curOffset = curOffset; @@ -67,7 +99,7 @@ int walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) { wError("vgId:%d, file:%"PRId64".idx, failed to open since %s", pWal->vgId, pWal->curFileFirstVersion, strerror(errno)); } if(pWal->curVersion != ver) { - if(walSetCurVer(pWal, ver) != 0) { + if(walSeekVer(pWal, ver) != 0) { //TODO: some records are skipped return -1; } diff --git a/source/libs/wal/src/walMgmt.c b/source/libs/wal/src/walMgmt.c index 83f9c3b04e..d60cdfe118 100644 --- a/source/libs/wal/src/walMgmt.c +++ b/source/libs/wal/src/walMgmt.c @@ -18,8 +18,17 @@ #include "taoserror.h" #include "tref.h" #include "tfile.h" +#include "compare.h" #include "walInt.h" +//internal +int32_t walGetNextFile(SWal *pWal, int64_t *nextFileId); +int32_t walGetOldFile(SWal *pWal, int64_t curFileId, int32_t minDiff, int64_t *oldFileId); +int32_t walGetNewFile(SWal *pWal, int64_t *newFileId); + +static pthread_mutex_t walInitLock = PTHREAD_MUTEX_INITIALIZER; +static int8_t walInited = 0; + typedef struct { int32_t refSetId; int32_t seq; @@ -35,11 +44,21 @@ static int32_t walInitObj(SWal *pWal); static void walFreeObj(void *pWal); int32_t walInit() { + //TODO: change to atomic + pthread_mutex_lock(&walInitLock); + if(walInited) { + pthread_mutex_unlock(&walInitLock); + return 0; + } else { + walInited = 1; + pthread_mutex_unlock(&walInitLock); + } + int32_t code = 0; tsWal.refSetId = taosOpenRef(TSDB_MIN_VNODES, walFreeObj); code = pthread_mutex_init(&tsWal.mutex, NULL); - if (code) { + if (code != 0) { wError("failed to init wal mutex since %s", tstrerror(code)); return code; } @@ -61,6 +80,27 @@ void walCleanUp() { wInfo("wal module is cleaned up"); } +static int walLoadFileset(SWal *pWal) { + DIR *dir = opendir(pWal->path); + if (dir == NULL) { + wError("vgId:%d, path:%s, failed to open since %s", pWal->vgId, pWal->path, strerror(errno)); + return -1; + } + + struct dirent* ent; + while ((ent = readdir(dir)) != NULL) { + char *name = ent->d_name; + name[WAL_NOSUFFIX_LEN] = 0; + //validate file name by regex matching + if(1 /* regex match */) { + int64_t fnameInt64 = atoll(name); + taosArrayPush(pWal->fileSet, &fnameInt64); + } + } + taosArraySort(pWal->fileSet, compareInt64Val); + return 0; +} + SWal *walOpen(const char *path, SWalCfg *pCfg) { SWal *pWal = malloc(sizeof(SWal)); if (pWal == NULL) { @@ -70,9 +110,13 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) { pWal->vgId = pCfg->vgId; pWal->curLogTfd = -1; - /*pWal->curFileId = -1;*/ + pWal->curIdxTfd = -1; pWal->level = pCfg->walLevel; pWal->fsyncPeriod = pCfg->fsyncPeriod; + + memset(&pWal->head, 0, sizeof(SWalHead)); + pWal->head.sver = 0; + tstrncpy(pWal->path, path, sizeof(pWal->path)); pthread_mutex_init(&pWal->mutex, NULL); @@ -129,6 +173,11 @@ static int32_t walInitObj(SWal *pWal) { wError("vgId:%d, path:%s, failed to create directory since %s", pWal->vgId, pWal->path, strerror(errno)); return TAOS_SYSTEM_ERROR(errno); } + pWal->fileSet = taosArrayInit(0, sizeof(int64_t)); + if(pWal->fileSet == NULL) { + wError("vgId:%d, path:%s, failed to init taosArray %s", pWal->vgId, pWal->path, strerror(errno)); + return TAOS_SYSTEM_ERROR(errno); + } wDebug("vgId:%d, object is initialized", pWal->vgId); return 0; diff --git a/source/libs/wal/src/wal.c b/source/libs/wal/src/walRead.c similarity index 65% rename from source/libs/wal/src/wal.c rename to source/libs/wal/src/walRead.c index 59f9c48814..b475183b7b 100644 --- a/source/libs/wal/src/wal.c +++ b/source/libs/wal/src/walRead.c @@ -14,18 +14,13 @@ */ #include "wal.h" +#include "tchecksum.h" -int32_t walCommit(SWal *pWal, int64_t ver) { - return 0; +static int walValidateChecksum(SWalHead *pHead, void* body, int64_t bodyLen) { + return taosCheckChecksum((uint8_t*)pHead, sizeof(SWalHead) - sizeof(uint32_t)*2, pHead->cksumHead) && + taosCheckChecksum(body, bodyLen, pHead->cksumBody); } -int32_t walRollback(SWal *pWal, int64_t ver) { - return 0; -} - -int32_t walTakeSnapshot(SWal *pWal, int64_t ver) { - return 0; -} int32_t walRead(SWal *pWal, SWalHead **ppHead, int64_t ver) { return 0; @@ -36,13 +31,16 @@ int32_t walReadWithFp(SWal *pWal, FWalWrite writeFp, int64_t verStart, int32_t r } int64_t walGetFirstVer(SWal *pWal) { - return 0; + if (pWal == NULL) return 0; + return pWal->firstVersion; } -int64_t walGetSnapshotVer(SWal *pWal) { - return 0; +int64_t walGetSnaphostVer(SWal *pWal) { + if (pWal == NULL) return 0; + return pWal->snapshotVersion; } int64_t walGetLastVer(SWal *pWal) { - return 0; + if (pWal == NULL) return 0; + return pWal->lastVersion; } diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index a8123f9c25..7563ec02c7 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -21,6 +21,18 @@ #include "tfile.h" #include "walInt.h" +int32_t walCommit(SWal *pWal, int64_t ver) { + return 0; +} + +int32_t walRollback(SWal *pWal, int64_t ver) { + return 0; +} + +int32_t walTakeSnapshot(SWal *pWal, int64_t ver) { + return 0; +} + #if 0 static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, char *name, int64_t fileId); @@ -112,60 +124,40 @@ void walRemoveAllOldFiles(void *handle) { } #endif -static void walUpdateChecksum(SWalHead *pHead) { - pHead->sver = 2; - pHead->cksum = taosCalcChecksum(0, (uint8_t *)pHead, sizeof(SWalHead) + pHead->len); -} - -static int walValidateChecksum(SWalHead *pHead) { - if (pHead->sver == 0) { // for compatible with wal before sver 1 - return taosCheckChecksumWhole((uint8_t *)pHead, sizeof(*pHead)); - } else if (pHead->sver >= 1) { - uint32_t cksum = pHead->cksum; - pHead->cksum = 0; - return taosCheckChecksum((uint8_t *)pHead, sizeof(*pHead) + pHead->len, cksum); - } - - return 0; -} - -int64_t walWrite(SWal *pWal, int64_t index, void *body, int32_t bodyLen) { +int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, void *body, int32_t bodyLen) { if (pWal == NULL) return -1; - SWalHead *pHead = malloc(sizeof(SWalHead) + bodyLen); - if(pHead == NULL) { - return -1; - } - pHead->version = index; - int32_t code = 0; - // no wal if (!tfValid(pWal->curLogTfd)) return 0; if (pWal->level == TAOS_WAL_NOLOG) return 0; - if (pHead->version <= pWal->curVersion) return 0; + if (index > pWal->lastVersion + 1) return -1; - pHead->signature = WAL_SIGNATURE; - pHead->len = bodyLen; - memcpy(pHead->cont, body, bodyLen); + pWal->head.version = index; + int32_t code = 0; - walUpdateChecksum(pHead); + pWal->head.signature = WAL_SIGNATURE; + pWal->head.len = bodyLen; + pWal->head.msgType = msgType; - int32_t contLen = pHead->len + sizeof(SWalHead); + pWal->head.cksumHead = taosCalcChecksum(0, (const uint8_t*)&pWal->head, sizeof(SWalHead)- sizeof(uint32_t)*2); + pWal->head.cksumBody = taosCalcChecksum(0, (const uint8_t*)&body, bodyLen); pthread_mutex_lock(&pWal->mutex); - if (tfWrite(pWal->curLogTfd, pHead, contLen) != contLen) { + if (tfWrite(pWal->curLogTfd, &pWal->head, sizeof(SWalHead)) != sizeof(SWalHead)) { + //ftruncate code = TAOS_SYSTEM_ERROR(errno); wError("vgId:%d, file:%"PRId64".log, failed to write since %s", pWal->vgId, pWal->curFileFirstVersion, strerror(errno)); - } else { - /*wTrace("vgId:%d, write wal, fileId:%" PRId64 " tfd:%" PRId64 " hver:%" PRId64 " wver:%" PRIu64 " len:%d", pWal->vgId,*/ - /*pWal->curFileId, pWal->logTfd, pHead->version, pWal->curVersion, pHead->len);*/ - pWal->curVersion = pHead->version; } - pthread_mutex_unlock(&pWal->mutex); + if (tfWrite(pWal->curLogTfd, &body, bodyLen) != bodyLen) { + //ftruncate + code = TAOS_SYSTEM_ERROR(errno); + wError("vgId:%d, file:%"PRId64".log, failed to write since %s", pWal->vgId, pWal->curFileFirstVersion, strerror(errno)); + } + //TODO:write idx - ASSERT(contLen == pHead->len + sizeof(SWalHead)); + pthread_mutex_unlock(&pWal->mutex); return code; } @@ -254,6 +246,7 @@ static void walFtruncate(SWal *pWal, int64_t tfd, int64_t offset) { tfFsync(tfd); } +#if 0 static int32_t walSkipCorruptedRecord(SWal *pWal, SWalHead *pHead, int64_t tfd, int64_t *offset) { int64_t pos = *offset; while (1) { @@ -387,21 +380,4 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch wDebug("vgId:%d, file:%s, it is closed after restore", pWal->vgId, name); return code; } - -uint64_t walGetVersion(SWal *pWal) { - if (pWal == NULL) return 0; - - return pWal->curVersion; -} - -// Wal version in slave (dnode1) must be reset. -// Because after the data file is recovered from peer (dnode2), the new file version in dnode1 may become smaller than origin. -// Some new wal record cannot be written to the wal file in dnode1 for wal version not reset, then fversion and the record in wal file may inconsistent, -// At this time, if dnode2 down, dnode1 switched to master. After dnode2 start and restore data from dnode1, data loss will occur - -void walResetVersion(SWal *pWal, uint64_t newVer) { - if (pWal == NULL) return; - wInfo("vgId:%d, version reset from %" PRIu64 " to %" PRIu64, pWal->vgId, pWal->curVersion, newVer); - - pWal->curVersion = newVer; -} +#endif