make more comile

This commit is contained in:
Hongze Cheng 2022-01-07 09:11:57 +00:00
parent 507763258c
commit 7dd5b4ad93
6 changed files with 353 additions and 337 deletions

View File

@ -12,7 +12,7 @@ else(0)
"src/tsdbWrite.c" "src/tsdbWrite.c"
"src/tsdbReadImpl.c" "src/tsdbReadImpl.c"
"src/tsdbFile.c" "src/tsdbFile.c"
# "src/tsdbFS.c" "src/tsdbFS.c"
) )
endif(0) endif(0)

View File

@ -29,10 +29,19 @@ typedef struct {
int64_t size; int64_t size;
} SKVRecord; } SKVRecord;
void tsdbGetRtnSnap(STsdb *pRepo, SRtn *pRtn);
static FORCE_INLINE int TSDB_KEY_FID(TSKEY key, int32_t days, int8_t precision) {
if (key < 0) {
return (int)((key + 1) / tsTickPerDay[precision] / days - 1);
} else {
return (int)((key / tsTickPerDay[precision] / days));
}
}
#if 0 #if 0
#define TSDB_DEFAULT_BLOCK_ROWS(maxRows) ((maxRows)*4 / 5) #define TSDB_DEFAULT_BLOCK_ROWS(maxRows) ((maxRows)*4 / 5)
void tsdbGetRtnSnap(STsdbRepo *pRepo, SRtn *pRtn);
int tsdbEncodeKVRecord(void **buf, SKVRecord *pRecord); int tsdbEncodeKVRecord(void **buf, SKVRecord *pRecord);
void *tsdbDecodeKVRecord(void *buf, SKVRecord *pRecord); void *tsdbDecodeKVRecord(void *buf, SKVRecord *pRecord);
void *tsdbCommitData(STsdbRepo *pRepo); void *tsdbCommitData(STsdbRepo *pRepo);

View File

@ -27,7 +27,7 @@
#include "tsdb.h" #include "tsdb.h"
#include "tsdbCommit.h" #include "tsdbCommit.h"
// #include "tsdbFS.h" #include "tsdbFS.h"
#include "tsdbFile.h" #include "tsdbFile.h"
#include "tsdbLog.h" #include "tsdbLog.h"
#include "tsdbMemTable.h" #include "tsdbMemTable.h"
@ -47,12 +47,12 @@ struct STsdb {
STsdbMemTable * imem; STsdbMemTable * imem;
SRtn rtn; SRtn rtn;
SMemAllocatorFactory *pmaf; SMemAllocatorFactory *pmaf;
// STsdbFS fs; STsdbFS fs;
}; };
#define REPO_ID(r) 0 #define REPO_ID(r) 0
#define REPO_CFG(r) (&(r)->config) #define REPO_CFG(r) (&(r)->config)
// #define REPO_FS(r) (&(r)->fs) #define REPO_FS(r) (&(r)->fs)
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -32,6 +32,23 @@ int tsdbCommit(STsdb *pTsdb) {
return 0; return 0;
} }
void tsdbGetRtnSnap(STsdb *pRepo, SRtn *pRtn) {
STsdbCfg *pCfg = REPO_CFG(pRepo);
TSKEY minKey, midKey, maxKey, now;
now = taosGetTimestamp(pCfg->precision);
minKey = now - pCfg->keep * tsTickPerDay[pCfg->precision];
midKey = now - pCfg->keep2 * tsTickPerDay[pCfg->precision];
maxKey = now - pCfg->keep1 * tsTickPerDay[pCfg->precision];
pRtn->minKey = minKey;
pRtn->minFid = (int)(TSDB_KEY_FID(minKey, pCfg->daysPerFile, pCfg->precision));
pRtn->midFid = (int)(TSDB_KEY_FID(midKey, pCfg->daysPerFile, pCfg->precision));
pRtn->maxFid = (int)(TSDB_KEY_FID(maxKey, pCfg->daysPerFile, pCfg->precision));
tsdbDebug("vgId:%d now:%" PRId64 " minKey:%" PRId64 " minFid:%d, midFid:%d, maxFid:%d", REPO_ID(pRepo), now, minKey,
pRtn->minFid, pRtn->midFid, pRtn->maxFid);
}
#if 0 #if 0
/* /*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com> * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
@ -420,23 +437,6 @@ void *tsdbDecodeKVRecord(void *buf, SKVRecord *pRecord) {
return buf; return buf;
} }
void tsdbGetRtnSnap(STsdbRepo *pRepo, SRtn *pRtn) {
STsdbCfg *pCfg = REPO_CFG(pRepo);
TSKEY minKey, midKey, maxKey, now;
now = taosGetTimestamp(pCfg->precision);
minKey = now - pCfg->keep * tsTickPerDay[pCfg->precision];
midKey = now - pCfg->keep2 * tsTickPerDay[pCfg->precision];
maxKey = now - pCfg->keep1 * tsTickPerDay[pCfg->precision];
pRtn->minKey = minKey;
pRtn->minFid = (int)(TSDB_KEY_FID(minKey, pCfg->daysPerFile, pCfg->precision));
pRtn->midFid = (int)(TSDB_KEY_FID(midKey, pCfg->daysPerFile, pCfg->precision));
pRtn->maxFid = (int)(TSDB_KEY_FID(maxKey, pCfg->daysPerFile, pCfg->precision));
tsdbDebug("vgId:%d now:%" PRId64 " minKey:%" PRId64 " minFid:%d, midFid:%d, maxFid:%d", REPO_ID(pRepo), now, minKey,
pRtn->minFid, pRtn->midFid, pRtn->maxFid);
}
static int tsdbUpdateMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid, void *cont, int contLen, bool compact) { static int tsdbUpdateMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid, void *cont, int contLen, bool compact) {
char buf[64] = "\0"; char buf[64] = "\0";
void * pBuf = buf; void * pBuf = buf;

View File

@ -34,8 +34,16 @@ static bool tsdbIsTFileInFS(STsdbFS *pfs, const TFILE *pf);
static int tsdbRestoreCurrent(STsdb *pRepo); static int tsdbRestoreCurrent(STsdb *pRepo);
static int tsdbComparTFILE(const void *arg1, const void *arg2); static int tsdbComparTFILE(const void *arg1, const void *arg2);
static void tsdbScanAndTryFixDFilesHeader(STsdb *pRepo, int32_t *nExpired); static void tsdbScanAndTryFixDFilesHeader(STsdb *pRepo, int32_t *nExpired);
static int tsdbProcessExpiredFS(STsdb *pRepo); // static int tsdbProcessExpiredFS(STsdb *pRepo);
static int tsdbCreateMeta(STsdb *pRepo); // static int tsdbCreateMeta(STsdb *pRepo);
static void tsdbGetRootDir(int repoid, char dirName[]) {
snprintf(dirName, TSDB_FILENAME_LEN, "vnode/vnode%d/tsdb", repoid);
}
static void tsdbGetDataDir(int repoid, char dirName[]) {
snprintf(dirName, TSDB_FILENAME_LEN, "vnode/vnode%d/tsdb/data", repoid);
}
// For backward compatibility // For backward compatibility
// ================== CURRENT file header info // ================== CURRENT file header info
@ -104,11 +112,11 @@ static void *tsdbDecodeDFileSetArray(void *buf, SArray *pArray) {
} }
static int tsdbEncodeFSStatus(void **buf, SFSStatus *pStatus) { static int tsdbEncodeFSStatus(void **buf, SFSStatus *pStatus) {
ASSERT(pStatus->pmf); // ASSERT(pStatus->pmf);
int tlen = 0; int tlen = 0;
tlen += tsdbEncodeSMFile(buf, pStatus->pmf); // tlen += tsdbEncodeSMFile(buf, pStatus->pmf);
tlen += tsdbEncodeDFileSetArray(buf, pStatus->df); tlen += tsdbEncodeDFileSetArray(buf, pStatus->df);
return tlen; return tlen;
@ -117,9 +125,9 @@ static int tsdbEncodeFSStatus(void **buf, SFSStatus *pStatus) {
static void *tsdbDecodeFSStatus(void *buf, SFSStatus *pStatus) { static void *tsdbDecodeFSStatus(void *buf, SFSStatus *pStatus) {
tsdbResetFSStatus(pStatus); tsdbResetFSStatus(pStatus);
pStatus->pmf = &(pStatus->mf); // pStatus->pmf = &(pStatus->mf);
buf = tsdbDecodeSMFile(buf, pStatus->pmf); // buf = tsdbDecodeSMFile(buf, pStatus->pmf);
buf = tsdbDecodeDFileSetArray(buf, pStatus->df); buf = tsdbDecodeDFileSetArray(buf, pStatus->df);
return buf; return buf;
@ -132,7 +140,7 @@ static SFSStatus *tsdbNewFSStatus(int maxFSet) {
return NULL; return NULL;
} }
TSDB_FILE_SET_CLOSED(&(pStatus->mf)); // TSDB_FILE_SET_CLOSED(&(pStatus->mf));
pStatus->df = taosArrayInit(maxFSet, sizeof(SDFileSet)); pStatus->df = taosArrayInit(maxFSet, sizeof(SDFileSet));
if (pStatus->df == NULL) { if (pStatus->df == NULL) {
@ -158,18 +166,18 @@ static void tsdbResetFSStatus(SFSStatus *pStatus) {
return; return;
} }
TSDB_FILE_SET_CLOSED(&(pStatus->mf)); // TSDB_FILE_SET_CLOSED(&(pStatus->mf));
pStatus->pmf = NULL; // pStatus->pmf = NULL;
taosArrayClear(pStatus->df); taosArrayClear(pStatus->df);
} }
static void tsdbSetStatusMFile(SFSStatus *pStatus, const SMFile *pMFile) { // static void tsdbSetStatusMFile(SFSStatus *pStatus, const SMFile *pMFile) {
ASSERT(pStatus->pmf == NULL); // ASSERT(pStatus->pmf == NULL);
pStatus->pmf = &(pStatus->mf); // pStatus->pmf = &(pStatus->mf);
tsdbInitMFileEx(pStatus->pmf, (SMFile *)pMFile); // tsdbInitMFileEx(pStatus->pmf, (SMFile *)pMFile);
} // }
static int tsdbAddDFileSetToStatus(SFSStatus *pStatus, const SDFileSet *pSet) { static int tsdbAddDFileSetToStatus(SFSStatus *pStatus, const SDFileSet *pSet) {
if (taosArrayPush(pStatus->df, (void *)pSet) == NULL) { if (taosArrayPush(pStatus->df, (void *)pSet) == NULL) {
@ -240,61 +248,61 @@ void *tsdbFreeFS(STsdbFS *pfs) {
return NULL; return NULL;
} }
static int tsdbProcessExpiredFS(STsdb *pRepo) { // static int tsdbProcessExpiredFS(STsdb *pRepo) {
tsdbStartFSTxn(pRepo, 0, 0); // tsdbStartFSTxn(pRepo, 0, 0);
if (tsdbCreateMeta(pRepo) < 0) { // // if (tsdbCreateMeta(pRepo) < 0) {
tsdbError("vgId:%d failed to create meta since %s", REPO_ID(pRepo), tstrerror(terrno)); // // tsdbError("vgId:%d failed to create meta since %s", REPO_ID(pRepo), tstrerror(terrno));
return -1; // // return -1;
} // // }
if (tsdbApplyRtn(pRepo) < 0) { // if (tsdbApplyRtn(pRepo) < 0) {
tsdbEndFSTxnWithError(REPO_FS(pRepo)); // tsdbEndFSTxnWithError(REPO_FS(pRepo));
tsdbError("vgId:%d failed to apply rtn since %s", REPO_ID(pRepo), tstrerror(terrno)); // tsdbError("vgId:%d failed to apply rtn since %s", REPO_ID(pRepo), tstrerror(terrno));
return -1; // return -1;
} // }
if (tsdbEndFSTxn(pRepo) < 0) { // if (tsdbEndFSTxn(pRepo) < 0) {
tsdbError("vgId:%d failed to end fs txn since %s", REPO_ID(pRepo), tstrerror(terrno)); // tsdbError("vgId:%d failed to end fs txn since %s", REPO_ID(pRepo), tstrerror(terrno));
return -1; // return -1;
} // }
return 0; // return 0;
} // }
static int tsdbCreateMeta(STsdb *pRepo) { // static int tsdbCreateMeta(STsdb *pRepo) {
STsdbFS *pfs = REPO_FS(pRepo); // STsdbFS *pfs = REPO_FS(pRepo);
SMFile * pOMFile = pfs->cstatus->pmf; // SMFile * pOMFile = pfs->cstatus->pmf;
SMFile mf; // SMFile mf;
SDiskID did; // SDiskID did;
if (pOMFile != NULL) { // if (pOMFile != NULL) {
// keep the old meta file // // keep the old meta file
tsdbUpdateMFile(pfs, pOMFile); // tsdbUpdateMFile(pfs, pOMFile);
return 0; // return 0;
} // }
// Create a new meta file // // Create a new meta file
did.level = TFS_PRIMARY_LEVEL; // did.level = TFS_PRIMARY_LEVEL;
did.id = TFS_PRIMARY_ID; // did.id = TFS_PRIMARY_ID;
tsdbInitMFile(&mf, did, REPO_ID(pRepo), FS_TXN_VERSION(REPO_FS(pRepo))); // tsdbInitMFile(&mf, did, REPO_ID(pRepo), FS_TXN_VERSION(REPO_FS(pRepo)));
if (tsdbCreateMFile(&mf, true) < 0) { // if (tsdbCreateMFile(&mf, true) < 0) {
tsdbError("vgId:%d failed to create META file since %s", REPO_ID(pRepo), tstrerror(terrno)); // tsdbError("vgId:%d failed to create META file since %s", REPO_ID(pRepo), tstrerror(terrno));
return -1; // return -1;
} // }
tsdbInfo("vgId:%d meta file %s is created", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(&mf)); // tsdbInfo("vgId:%d meta file %s is created", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(&mf));
if (tsdbUpdateMFileHeader(&mf) < 0) { // if (tsdbUpdateMFileHeader(&mf) < 0) {
tsdbError("vgId:%d failed to update META file header since %s, revert it", REPO_ID(pRepo), tstrerror(terrno)); // tsdbError("vgId:%d failed to update META file header since %s, revert it", REPO_ID(pRepo), tstrerror(terrno));
tsdbApplyMFileChange(&mf, pOMFile); // tsdbApplyMFileChange(&mf, pOMFile);
return -1; // return -1;
} // }
TSDB_FILE_FSYNC(&mf); // TSDB_FILE_FSYNC(&mf);
tsdbCloseMFile(&mf); // tsdbCloseMFile(&mf);
tsdbUpdateMFile(pfs, &mf); // tsdbUpdateMFile(pfs, &mf);
return 0; // return 0;
} // }
int tsdbOpenFS(STsdb *pRepo) { int tsdbOpenFS(STsdb *pRepo) {
STsdbFS *pfs = REPO_FS(pRepo); STsdbFS *pfs = REPO_FS(pRepo);
@ -313,9 +321,9 @@ int tsdbOpenFS(STsdb *pRepo) {
} }
tsdbScanAndTryFixDFilesHeader(pRepo, &nExpired); tsdbScanAndTryFixDFilesHeader(pRepo, &nExpired);
if (nExpired > 0) { // if (nExpired > 0) {
tsdbProcessExpiredFS(pRepo); // tsdbProcessExpiredFS(pRepo);
} // }
} else { } else {
// should skip expired fileset inside of the function // should skip expired fileset inside of the function
if (tsdbRestoreCurrent(pRepo) < 0) { if (tsdbRestoreCurrent(pRepo) < 0) {
@ -329,11 +337,11 @@ int tsdbOpenFS(STsdb *pRepo) {
return -1; return -1;
} }
// Load meta cache if has meta file // // Load meta cache if has meta file
if ((!(pRepo->state & TSDB_STATE_BAD_META)) && tsdbLoadMetaCache(pRepo, true) < 0) { // if ((!(pRepo->state & TSDB_STATE_BAD_META)) && tsdbLoadMetaCache(pRepo, true) < 0) {
tsdbError("vgId:%d failed to open FS while loading meta cache since %s", REPO_ID(pRepo), tstrerror(terrno)); // tsdbError("vgId:%d failed to open FS while loading meta cache since %s", REPO_ID(pRepo), tstrerror(terrno));
return -1; // return -1;
} // }
return 0; return 0;
} }
@ -350,11 +358,11 @@ void tsdbStartFSTxn(STsdb *pRepo, int64_t pointsAdd, int64_t storageAdd) {
pfs->intxn = true; pfs->intxn = true;
tsdbResetFSStatus(pfs->nstatus); tsdbResetFSStatus(pfs->nstatus);
pfs->nstatus->meta = pfs->cstatus->meta; pfs->nstatus->meta = pfs->cstatus->meta;
if (pfs->cstatus->pmf == NULL) { // if (pfs->cstatus->pmf == NULL) {
pfs->nstatus->meta.version = 0; pfs->nstatus->meta.version = 0;
} else { // } else {
pfs->nstatus->meta.version = pfs->cstatus->meta.version + 1; // pfs->nstatus->meta.version = pfs->cstatus->meta.version + 1;
} // }
pfs->nstatus->meta.totalPoints = pfs->cstatus->meta.totalPoints + pointsAdd; pfs->nstatus->meta.totalPoints = pfs->cstatus->meta.totalPoints + pointsAdd;
pfs->nstatus->meta.totalStorage = pfs->cstatus->meta.totalStorage += storageAdd; pfs->nstatus->meta.totalStorage = pfs->cstatus->meta.totalStorage += storageAdd;
} }
@ -393,7 +401,7 @@ int tsdbEndFSTxnWithError(STsdbFS *pfs) {
return 0; return 0;
} }
void tsdbUpdateMFile(STsdbFS *pfs, const SMFile *pMFile) { tsdbSetStatusMFile(pfs->nstatus, pMFile); } // void tsdbUpdateMFile(STsdbFS *pfs, const SMFile *pMFile) { tsdbSetStatusMFile(pfs->nstatus, pMFile); }
int tsdbUpdateDFileSet(STsdbFS *pfs, const SDFileSet *pSet) { return tsdbAddDFileSetToStatus(pfs->nstatus, pSet); } int tsdbUpdateDFileSet(STsdbFS *pfs, const SDFileSet *pSet) { return tsdbAddDFileSetToStatus(pfs->nstatus, pSet); }
@ -415,8 +423,7 @@ static int tsdbSaveFSStatus(SFSStatus *pStatus, int vid) {
} }
fsheader.version = TSDB_FS_VERSION; fsheader.version = TSDB_FS_VERSION;
if (pStatus->pmf == NULL) { if (taosArrayGetSize(pStatus->df) == 0) {
ASSERT(taosArrayGetSize(pStatus->df) == 0);
fsheader.len = 0; fsheader.len = 0;
} else { } else {
fsheader.len = tsdbEncodeFSStatus(NULL, pStatus) + sizeof(TSCKSUM); fsheader.len = tsdbEncodeFSStatus(NULL, pStatus) + sizeof(TSCKSUM);
@ -429,7 +436,7 @@ static int tsdbSaveFSStatus(SFSStatus *pStatus, int vid) {
taosCalcChecksumAppend(0, (uint8_t *)hbuf, TSDB_FILE_HEAD_SIZE); taosCalcChecksumAppend(0, (uint8_t *)hbuf, TSDB_FILE_HEAD_SIZE);
if (taosWrite(fd, hbuf, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) { if (taosWriteFile(fd, hbuf, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
close(fd); close(fd);
remove(tfname); remove(tfname);
@ -448,7 +455,7 @@ static int tsdbSaveFSStatus(SFSStatus *pStatus, int vid) {
tsdbEncodeFSStatus(&ptr, pStatus); tsdbEncodeFSStatus(&ptr, pStatus);
taosCalcChecksumAppend(0, (uint8_t *)pBuf, fsheader.len); taosCalcChecksumAppend(0, (uint8_t *)pBuf, fsheader.len);
if (taosWrite(fd, pBuf, fsheader.len) < fsheader.len) { if (taosWriteFile(fd, pBuf, fsheader.len) < fsheader.len) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
close(fd); close(fd);
(void)remove(tfname); (void)remove(tfname);
@ -458,7 +465,7 @@ static int tsdbSaveFSStatus(SFSStatus *pStatus, int vid) {
} }
// fsync, close and rename // fsync, close and rename
if (taosFsync(fd) < 0) { if (taosFsyncFile(fd) < 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
close(fd); close(fd);
remove(tfname); remove(tfname);
@ -467,7 +474,7 @@ static int tsdbSaveFSStatus(SFSStatus *pStatus, int vid) {
} }
(void)close(fd); (void)close(fd);
(void)taosRename(tfname, cfname); (void)taosRenameFile(tfname, cfname);
taosTZfree(pBuf); taosTZfree(pBuf);
return 0; return 0;
@ -484,7 +491,7 @@ static void tsdbApplyFSTxnOnDisk(SFSStatus *pFrom, SFSStatus *pTo) {
sizeTo = taosArrayGetSize(pTo->df); sizeTo = taosArrayGetSize(pTo->df);
// Apply meta file change // Apply meta file change
(void)tsdbApplyMFileChange(pFrom->pmf, pTo->pmf); // (void)tsdbApplyMFileChange(pFrom->pmf, pTo->pmf);
// Apply SDFileSet change // Apply SDFileSet change
if (ifrom >= sizeFrom) { if (ifrom >= sizeFrom) {
@ -664,7 +671,7 @@ static int tsdbOpenFSFromCurrent(STsdb *pRepo) {
goto _err; goto _err;
} }
int nread = (int)taosRead(fd, buffer, TSDB_FILE_HEAD_SIZE); int nread = (int)taosReadFile(fd, buffer, TSDB_FILE_HEAD_SIZE);
if (nread < 0) { if (nread < 0) {
tsdbError("vgId:%d failed to read %d bytes from file %s since %s", REPO_ID(pRepo), TSDB_FILENAME_LEN, current, tsdbError("vgId:%d failed to read %d bytes from file %s since %s", REPO_ID(pRepo), TSDB_FILENAME_LEN, current,
strerror(errno)); strerror(errno));
@ -698,7 +705,7 @@ static int tsdbOpenFSFromCurrent(STsdb *pRepo) {
goto _err; goto _err;
} }
nread = (int)taosRead(fd, buffer, fsheader.len); nread = (int)taosReadFile(fd, buffer, fsheader.len);
if (nread < 0) { if (nread < 0) {
tsdbError("vgId:%d failed to read file %s since %s", REPO_ID(pRepo), current, strerror(errno)); tsdbError("vgId:%d failed to read file %s since %s", REPO_ID(pRepo), current, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
@ -741,10 +748,10 @@ static int tsdbScanAndTryFixFS(STsdb *pRepo) {
STsdbFS * pfs = REPO_FS(pRepo); STsdbFS * pfs = REPO_FS(pRepo);
SFSStatus *pStatus = pfs->cstatus; SFSStatus *pStatus = pfs->cstatus;
if (tsdbScanAndTryFixMFile(pRepo) < 0) { // if (tsdbScanAndTryFixMFile(pRepo) < 0) {
tsdbError("vgId:%d failed to fix MFile since %s", REPO_ID(pRepo), tstrerror(terrno)); // tsdbError("vgId:%d failed to fix MFile since %s", REPO_ID(pRepo), tstrerror(terrno));
return -1; // return -1;
} // }
size_t size = taosArrayGetSize(pStatus->df); size_t size = taosArrayGetSize(pStatus->df);
@ -763,141 +770,141 @@ static int tsdbScanAndTryFixFS(STsdb *pRepo) {
return 0; return 0;
} }
int tsdbLoadMetaCache(STsdb *pRepo, bool recoverMeta) { // int tsdbLoadMetaCache(STsdb *pRepo, bool recoverMeta) {
char tbuf[128]; // char tbuf[128];
STsdbFS * pfs = REPO_FS(pRepo); // STsdbFS * pfs = REPO_FS(pRepo);
SMFile mf; // SMFile mf;
SMFile * pMFile = &mf; // SMFile * pMFile = &mf;
void * pBuf = NULL; // void * pBuf = NULL;
SKVRecord rInfo; // SKVRecord rInfo;
int64_t maxBufSize = 0; // int64_t maxBufSize = 0;
SMFInfo minfo; // SMFInfo minfo;
taosHashClear(pfs->metaCache); // taosHashClear(pfs->metaCache);
// No meta file, just return // // No meta file, just return
if (pfs->cstatus->pmf == NULL) return 0; // if (pfs->cstatus->pmf == NULL) return 0;
mf = pfs->cstatus->mf; // mf = pfs->cstatus->mf;
// Load cache first // // Load cache first
if (tsdbOpenMFile(pMFile, O_RDONLY) < 0) { // if (tsdbOpenMFile(pMFile, O_RDONLY) < 0) {
return -1; // return -1;
} // }
if (tsdbLoadMFileHeader(pMFile, &minfo) < 0) { // if (tsdbLoadMFileHeader(pMFile, &minfo) < 0) {
tsdbCloseMFile(pMFile); // tsdbCloseMFile(pMFile);
return -1; // return -1;
} // }
while (true) { // while (true) {
int64_t tsize = tsdbReadMFile(pMFile, tbuf, sizeof(SKVRecord)); // int64_t tsize = tsdbReadMFile(pMFile, tbuf, sizeof(SKVRecord));
if (tsize == 0) break; // if (tsize == 0) break;
if (tsize < 0) { // if (tsize < 0) {
tsdbError("vgId:%d failed to read META file since %s", REPO_ID(pRepo), tstrerror(terrno)); // tsdbError("vgId:%d failed to read META file since %s", REPO_ID(pRepo), tstrerror(terrno));
return -1; // return -1;
} // }
if (tsize < sizeof(SKVRecord)) { // if (tsize < sizeof(SKVRecord)) {
tsdbError("vgId:%d failed to read %" PRIzu " bytes from file %s", REPO_ID(pRepo), sizeof(SKVRecord), // tsdbError("vgId:%d failed to read %" PRIzu " bytes from file %s", REPO_ID(pRepo), sizeof(SKVRecord),
TSDB_FILE_FULL_NAME(pMFile)); // TSDB_FILE_FULL_NAME(pMFile));
terrno = TSDB_CODE_TDB_FILE_CORRUPTED; // terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
tsdbCloseMFile(pMFile); // tsdbCloseMFile(pMFile);
return -1; // return -1;
} // }
void *ptr = tsdbDecodeKVRecord(tbuf, &rInfo); // void *ptr = tsdbDecodeKVRecord(tbuf, &rInfo);
ASSERT(POINTER_DISTANCE(ptr, tbuf) == sizeof(SKVRecord)); // ASSERT(POINTER_DISTANCE(ptr, tbuf) == sizeof(SKVRecord));
// ASSERT((rInfo.offset > 0) ? (pStore->info.size == rInfo.offset) : true); // // ASSERT((rInfo.offset > 0) ? (pStore->info.size == rInfo.offset) : true);
if (rInfo.offset < 0) { // if (rInfo.offset < 0) {
taosHashRemove(pfs->metaCache, (void *)(&rInfo.uid), sizeof(rInfo.uid)); // taosHashRemove(pfs->metaCache, (void *)(&rInfo.uid), sizeof(rInfo.uid));
#if 0 // #if 0
pStore->info.size += sizeof(SKVRecord); // pStore->info.size += sizeof(SKVRecord);
pStore->info.nRecords--; // pStore->info.nRecords--;
pStore->info.nDels++; // pStore->info.nDels++;
pStore->info.tombSize += (rInfo.size + sizeof(SKVRecord) * 2); // pStore->info.tombSize += (rInfo.size + sizeof(SKVRecord) * 2);
#endif // #endif
} else { // } else {
ASSERT(rInfo.offset > 0 && rInfo.size > 0); // ASSERT(rInfo.offset > 0 && rInfo.size > 0);
if (taosHashPut(pfs->metaCache, (void *)(&rInfo.uid), sizeof(rInfo.uid), &rInfo, sizeof(rInfo)) < 0) { // if (taosHashPut(pfs->metaCache, (void *)(&rInfo.uid), sizeof(rInfo.uid), &rInfo, sizeof(rInfo)) < 0) {
tsdbError("vgId:%d failed to load meta cache from file %s since OOM", REPO_ID(pRepo), // tsdbError("vgId:%d failed to load meta cache from file %s since OOM", REPO_ID(pRepo),
TSDB_FILE_FULL_NAME(pMFile)); // TSDB_FILE_FULL_NAME(pMFile));
terrno = TSDB_CODE_COM_OUT_OF_MEMORY; // terrno = TSDB_CODE_COM_OUT_OF_MEMORY;
tsdbCloseMFile(pMFile); // tsdbCloseMFile(pMFile);
return -1; // return -1;
} // }
maxBufSize = MAX(maxBufSize, rInfo.size); // maxBufSize = MAX(maxBufSize, rInfo.size);
if (tsdbSeekMFile(pMFile, rInfo.size, SEEK_CUR) < 0) { // if (tsdbSeekMFile(pMFile, rInfo.size, SEEK_CUR) < 0) {
tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pMFile), // tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pMFile),
tstrerror(terrno)); // tstrerror(terrno));
tsdbCloseMFile(pMFile); // tsdbCloseMFile(pMFile);
return -1; // return -1;
} // }
#if 0 // #if 0
pStore->info.size += (sizeof(SKVRecord) + rInfo.size); // pStore->info.size += (sizeof(SKVRecord) + rInfo.size);
pStore->info.nRecords++; // pStore->info.nRecords++;
#endif // #endif
} // }
} // }
if (recoverMeta) { // if (recoverMeta) {
pBuf = malloc((size_t)maxBufSize); // pBuf = malloc((size_t)maxBufSize);
if (pBuf == NULL) { // if (pBuf == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; // terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbCloseMFile(pMFile); // tsdbCloseMFile(pMFile);
return -1; // return -1;
} // }
SKVRecord *pRecord = taosHashIterate(pfs->metaCache, NULL); // SKVRecord *pRecord = taosHashIterate(pfs->metaCache, NULL);
while (pRecord) { // while (pRecord) {
if (tsdbSeekMFile(pMFile, pRecord->offset + sizeof(SKVRecord), SEEK_SET) < 0) { // if (tsdbSeekMFile(pMFile, pRecord->offset + sizeof(SKVRecord), SEEK_SET) < 0) {
tsdbError("vgId:%d failed to seek file %s since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pMFile), // tsdbError("vgId:%d failed to seek file %s since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pMFile),
tstrerror(terrno)); // tstrerror(terrno));
tfree(pBuf); // tfree(pBuf);
tsdbCloseMFile(pMFile); // tsdbCloseMFile(pMFile);
return -1; // return -1;
} // }
int nread = (int)tsdbReadMFile(pMFile, pBuf, pRecord->size); // int nread = (int)tsdbReadMFile(pMFile, pBuf, pRecord->size);
if (nread < 0) { // if (nread < 0) {
tsdbError("vgId:%d failed to read file %s since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pMFile), // tsdbError("vgId:%d failed to read file %s since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pMFile),
tstrerror(terrno)); // tstrerror(terrno));
tfree(pBuf); // tfree(pBuf);
tsdbCloseMFile(pMFile); // tsdbCloseMFile(pMFile);
return -1; // return -1;
} // }
if (nread < pRecord->size) { // if (nread < pRecord->size) {
tsdbError("vgId:%d failed to read file %s since file corrupted, expected read:%" PRId64 " actual read:%d", // tsdbError("vgId:%d failed to read file %s since file corrupted, expected read:%" PRId64 " actual read:%d",
REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pMFile), pRecord->size, nread); // REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pMFile), pRecord->size, nread);
terrno = TSDB_CODE_TDB_FILE_CORRUPTED; // terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
tfree(pBuf); // tfree(pBuf);
tsdbCloseMFile(pMFile); // tsdbCloseMFile(pMFile);
return -1; // return -1;
} // }
if (tsdbRestoreTable(pRepo, pBuf, (int)pRecord->size) < 0) { // if (tsdbRestoreTable(pRepo, pBuf, (int)pRecord->size) < 0) {
tsdbError("vgId:%d failed to restore table, uid %" PRId64 ", since %s" PRIu64, REPO_ID(pRepo), pRecord->uid, // tsdbError("vgId:%d failed to restore table, uid %" PRId64 ", since %s" PRIu64, REPO_ID(pRepo), pRecord->uid,
tstrerror(terrno)); // tstrerror(terrno));
tfree(pBuf); // tfree(pBuf);
tsdbCloseMFile(pMFile); // tsdbCloseMFile(pMFile);
return -1; // return -1;
} // }
pRecord = taosHashIterate(pfs->metaCache, pRecord); // pRecord = taosHashIterate(pfs->metaCache, pRecord);
} // }
tsdbOrgMeta(pRepo); // tsdbOrgMeta(pRepo);
} // }
tsdbCloseMFile(pMFile); // tsdbCloseMFile(pMFile);
tfree(pBuf); // tfree(pBuf);
return 0; // return 0;
} // }
static int tsdbScanRootDir(STsdb *pRepo) { static int tsdbScanRootDir(STsdb *pRepo) {
char rootDir[TSDB_FILENAME_LEN]; char rootDir[TSDB_FILENAME_LEN];
@ -920,9 +927,9 @@ static int tsdbScanRootDir(STsdb *pRepo) {
continue; continue;
} }
if (pfs->cstatus->pmf && tfsIsSameFile(pf, &(pfs->cstatus->pmf->f))) { // if (/*pfs->cstatus->pmf && */ tfsIsSameFile(pf, &(pfs->cstatus->pmf->f))) {
continue; // continue;
} // }
(void)tfsremove(pf); (void)tfsremove(pf);
tsdbDebug("vgId:%d invalid file %s is removed", REPO_ID(pRepo), TFILE_NAME(pf)); tsdbDebug("vgId:%d invalid file %s is removed", REPO_ID(pRepo), TFILE_NAME(pf));
@ -977,126 +984,126 @@ static bool tsdbIsTFileInFS(STsdbFS *pfs, const TFILE *pf) {
return false; return false;
} }
static int tsdbRestoreMeta(STsdb *pRepo) { // static int tsdbRestoreMeta(STsdb *pRepo) {
char rootDir[TSDB_FILENAME_LEN]; // char rootDir[TSDB_FILENAME_LEN];
char bname[TSDB_FILENAME_LEN]; // char bname[TSDB_FILENAME_LEN];
TDIR * tdir = NULL; // TDIR * tdir = NULL;
const TFILE *pf = NULL; // const TFILE *pf = NULL;
const char * pattern = "^meta(-ver[0-9]+)?$"; // const char * pattern = "^meta(-ver[0-9]+)?$";
regex_t regex; // regex_t regex;
STsdbFS * pfs = REPO_FS(pRepo); // STsdbFS * pfs = REPO_FS(pRepo);
regcomp(&regex, pattern, REG_EXTENDED); // regcomp(&regex, pattern, REG_EXTENDED);
tsdbInfo("vgId:%d try to restore meta", REPO_ID(pRepo)); // tsdbInfo("vgId:%d try to restore meta", REPO_ID(pRepo));
tsdbGetRootDir(REPO_ID(pRepo), rootDir); // tsdbGetRootDir(REPO_ID(pRepo), rootDir);
tdir = tfsOpendir(rootDir); // tdir = tfsOpendir(rootDir);
if (tdir == NULL) { // if (tdir == NULL) {
tsdbError("vgId:%d failed to open dir %s since %s", REPO_ID(pRepo), rootDir, tstrerror(terrno)); // tsdbError("vgId:%d failed to open dir %s since %s", REPO_ID(pRepo), rootDir, tstrerror(terrno));
regfree(&regex); // regfree(&regex);
return -1; // return -1;
} // }
while ((pf = tfsReaddir(tdir))) { // while ((pf = tfsReaddir(tdir))) {
tfsbasename(pf, bname); // tfsbasename(pf, bname);
if (strcmp(bname, "data") == 0) { // if (strcmp(bname, "data") == 0) {
// Skip the data/ directory // // Skip the data/ directory
continue; // continue;
} // }
if (strcmp(bname, tsdbTxnFname[TSDB_TXN_TEMP_FILE]) == 0) { // if (strcmp(bname, tsdbTxnFname[TSDB_TXN_TEMP_FILE]) == 0) {
// Skip current.t file // // Skip current.t file
tsdbInfo("vgId:%d file %s exists, remove it", REPO_ID(pRepo), TFILE_NAME(pf)); // tsdbInfo("vgId:%d file %s exists, remove it", REPO_ID(pRepo), TFILE_NAME(pf));
(void)tfsremove(pf); // (void)tfsremove(pf);
continue; // continue;
} // }
int code = regexec(&regex, bname, 0, NULL, 0); // int code = regexec(&regex, bname, 0, NULL, 0);
if (code == 0) { // if (code == 0) {
// Match // // Match
if (pfs->cstatus->pmf != NULL) { // if (pfs->cstatus->pmf != NULL) {
tsdbError("vgId:%d failed to restore meta since two file exists, file1 %s and file2 %s", REPO_ID(pRepo), // tsdbError("vgId:%d failed to restore meta since two file exists, file1 %s and file2 %s", REPO_ID(pRepo),
TSDB_FILE_FULL_NAME(pfs->cstatus->pmf), TFILE_NAME(pf)); // TSDB_FILE_FULL_NAME(pfs->cstatus->pmf), TFILE_NAME(pf));
terrno = TSDB_CODE_TDB_FILE_CORRUPTED; // terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
tfsClosedir(tdir); // tfsClosedir(tdir);
regfree(&regex); // regfree(&regex);
return -1; // return -1;
} else { // } else {
uint32_t _version = 0; // uint32_t _version = 0;
if (strcmp(bname, "meta") != 0) { // if (strcmp(bname, "meta") != 0) {
sscanf(bname, "meta-ver%" PRIu32, &_version); // sscanf(bname, "meta-ver%" PRIu32, &_version);
pfs->cstatus->meta.version = _version; // pfs->cstatus->meta.version = _version;
} // }
pfs->cstatus->pmf = &(pfs->cstatus->mf); // pfs->cstatus->pmf = &(pfs->cstatus->mf);
pfs->cstatus->pmf->f = *pf; // pfs->cstatus->pmf->f = *pf;
TSDB_FILE_SET_CLOSED(pfs->cstatus->pmf); // TSDB_FILE_SET_CLOSED(pfs->cstatus->pmf);
if (tsdbOpenMFile(pfs->cstatus->pmf, O_RDONLY) < 0) { // if (tsdbOpenMFile(pfs->cstatus->pmf, O_RDONLY) < 0) {
tsdbError("vgId:%d failed to restore meta since %s", REPO_ID(pRepo), tstrerror(terrno)); // tsdbError("vgId:%d failed to restore meta since %s", REPO_ID(pRepo), tstrerror(terrno));
tfsClosedir(tdir); // tfsClosedir(tdir);
regfree(&regex); // regfree(&regex);
return -1; // return -1;
} // }
if (tsdbLoadMFileHeader(pfs->cstatus->pmf, &(pfs->cstatus->pmf->info)) < 0) { // if (tsdbLoadMFileHeader(pfs->cstatus->pmf, &(pfs->cstatus->pmf->info)) < 0) {
tsdbError("vgId:%d failed to restore meta since %s", REPO_ID(pRepo), tstrerror(terrno)); // tsdbError("vgId:%d failed to restore meta since %s", REPO_ID(pRepo), tstrerror(terrno));
tsdbCloseMFile(pfs->cstatus->pmf); // tsdbCloseMFile(pfs->cstatus->pmf);
tfsClosedir(tdir); // tfsClosedir(tdir);
regfree(&regex); // regfree(&regex);
return -1; // return -1;
} // }
if (tsdbForceKeepFile) { // if (tsdbForceKeepFile) {
struct stat tfstat; // struct stat tfstat;
// Get real file size // // Get real file size
if (fstat(pfs->cstatus->pmf->fd, &tfstat) < 0) { // if (fstat(pfs->cstatus->pmf->fd, &tfstat) < 0) {
terrno = TAOS_SYSTEM_ERROR(errno); // terrno = TAOS_SYSTEM_ERROR(errno);
tsdbCloseMFile(pfs->cstatus->pmf); // tsdbCloseMFile(pfs->cstatus->pmf);
tfsClosedir(tdir); // tfsClosedir(tdir);
regfree(&regex); // regfree(&regex);
return -1; // return -1;
} // }
if (pfs->cstatus->pmf->info.size != tfstat.st_size) { // if (pfs->cstatus->pmf->info.size != tfstat.st_size) {
int64_t tfsize = pfs->cstatus->pmf->info.size; // int64_t tfsize = pfs->cstatus->pmf->info.size;
pfs->cstatus->pmf->info.size = tfstat.st_size; // pfs->cstatus->pmf->info.size = tfstat.st_size;
tsdbInfo("vgId:%d file %s header size is changed from %" PRId64 " to %" PRId64, REPO_ID(pRepo), // tsdbInfo("vgId:%d file %s header size is changed from %" PRId64 " to %" PRId64, REPO_ID(pRepo),
TSDB_FILE_FULL_NAME(pfs->cstatus->pmf), tfsize, pfs->cstatus->pmf->info.size); // TSDB_FILE_FULL_NAME(pfs->cstatus->pmf), tfsize, pfs->cstatus->pmf->info.size);
} // }
} // }
tsdbCloseMFile(pfs->cstatus->pmf); // tsdbCloseMFile(pfs->cstatus->pmf);
} // }
} else if (code == REG_NOMATCH) { // } else if (code == REG_NOMATCH) {
// Not match // // Not match
tsdbInfo("vgId:%d invalid file %s exists, remove it", REPO_ID(pRepo), TFILE_NAME(pf)); // tsdbInfo("vgId:%d invalid file %s exists, remove it", REPO_ID(pRepo), TFILE_NAME(pf));
tfsremove(pf); // tfsremove(pf);
continue; // continue;
} else { // } else {
// Has other error // // Has other error
tsdbError("vgId:%d failed to restore meta file while run regexec since %s", REPO_ID(pRepo), strerror(code)); // tsdbError("vgId:%d failed to restore meta file while run regexec since %s", REPO_ID(pRepo), strerror(code));
terrno = TAOS_SYSTEM_ERROR(code); // terrno = TAOS_SYSTEM_ERROR(code);
tfsClosedir(tdir); // tfsClosedir(tdir);
regfree(&regex); // regfree(&regex);
return -1; // return -1;
} // }
} // }
if (pfs->cstatus->pmf) { // if (pfs->cstatus->pmf) {
tsdbInfo("vgId:%d meta file %s is restored", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pfs->cstatus->pmf)); // tsdbInfo("vgId:%d meta file %s is restored", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pfs->cstatus->pmf));
} else { // } else {
tsdbInfo("vgId:%d no meta file is restored", REPO_ID(pRepo)); // tsdbInfo("vgId:%d no meta file is restored", REPO_ID(pRepo));
} // }
tfsClosedir(tdir); // tfsClosedir(tdir);
regfree(&regex); // regfree(&regex);
return 0; // return 0;
} // }
static int tsdbRestoreDFileSet(STsdb *pRepo) { static int tsdbRestoreDFileSet(STsdb *pRepo) {
char dataDir[TSDB_FILENAME_LEN]; char dataDir[TSDB_FILENAME_LEN];
@ -1268,11 +1275,11 @@ static int tsdbRestoreDFileSet(STsdb *pRepo) {
} }
static int tsdbRestoreCurrent(STsdb *pRepo) { static int tsdbRestoreCurrent(STsdb *pRepo) {
// Loop to recover mfile // // Loop to recover mfile
if (tsdbRestoreMeta(pRepo) < 0) { // if (tsdbRestoreMeta(pRepo) < 0) {
tsdbError("vgId:%d failed to restore current since %s", REPO_ID(pRepo), tstrerror(terrno)); // tsdbError("vgId:%d failed to restore current since %s", REPO_ID(pRepo), tstrerror(terrno));
return -1; // return -1;
} // }
// Loop to recover dfile set // Loop to recover dfile set
if (tsdbRestoreDFileSet(pRepo) < 0) { if (tsdbRestoreDFileSet(pRepo) < 0) {
@ -1280,7 +1287,7 @@ static int tsdbRestoreCurrent(STsdb *pRepo) {
return -1; return -1;
} }
if (tsdbSaveFSStatus(pRepo->fs->cstatus, REPO_ID(pRepo)) < 0) { if (tsdbSaveFSStatus(pRepo->fs.cstatus, REPO_ID(pRepo)) < 0) {
tsdbError("vgId:%d failed to restore corrent since %s", REPO_ID(pRepo), tstrerror(terrno)); tsdbError("vgId:%d failed to restore corrent since %s", REPO_ID(pRepo), tstrerror(terrno));
return -1; return -1;
} }

View File

@ -335,7 +335,7 @@ static int tsdbTbDataComp(const void *arg1, const void *arg2) {
static char *tsdbTbDataGetUid(const void *arg) { static char *tsdbTbDataGetUid(const void *arg) {
STbData *pTbData = (STbData *)arg; STbData *pTbData = (STbData *)arg;
return &(pTbData->uid); return (char *)(&(pTbData->uid));
} }
/* ------------------------ REFACTORING ------------------------ */ /* ------------------------ REFACTORING ------------------------ */