Merge branch 'feature/linux' of https://github.com/taosdata/TDengine into feature/linux
This commit is contained in:
commit
eebc675863
|
@ -142,6 +142,8 @@ int64_t taosCopy(char *from, char *to) {
|
|||
if (bytes < sizeof(buffer)) break;
|
||||
}
|
||||
|
||||
fsync(fidto);
|
||||
|
||||
close(fidfrom);
|
||||
close(fidto);
|
||||
return size;
|
||||
|
|
|
@ -50,7 +50,7 @@ typedef struct {
|
|||
} SMFile;
|
||||
|
||||
void tsdbInitMFile(SMFile* pMFile, SDiskID did, int vid, uint32_t ver);
|
||||
void tsdbInitMFileEx(SMFile* pMFile, SMFile* pOMFile);
|
||||
void tsdbInitMFileEx(SMFile* pMFile, const SMFile* pOMFile);
|
||||
int tsdbEncodeSMFile(void** buf, SMFile* pMFile);
|
||||
void* tsdbDecodeSMFile(void* buf, SMFile* pMFile);
|
||||
int tsdbEncodeSMFileEx(void** buf, SMFile* pMFile);
|
||||
|
|
|
@ -52,7 +52,7 @@ static int tsdbCommitMeta(STsdbRepo *pRepo);
|
|||
static int tsdbUpdateMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid, void *cont, int contLen);
|
||||
static int tsdbDropMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid);
|
||||
static int tsdbCommitTSData(STsdbRepo *pRepo);
|
||||
static int tsdbStartCommit(STsdbRepo *pRepo);
|
||||
static void tsdbStartCommit(STsdbRepo *pRepo);
|
||||
static void tsdbEndCommit(STsdbRepo *pRepo, int eno);
|
||||
static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid);
|
||||
static int tsdbCreateCommitIters(SCommitH *pCommith);
|
||||
|
@ -84,10 +84,7 @@ static int tsdbApplyRtn(STsdbRepo *pRepo);
|
|||
static int tsdbApplyRtnOnFSet(STsdbRepo *pRepo, SDFileSet *pSet, SRtn *pRtn);
|
||||
|
||||
void *tsdbCommitData(STsdbRepo *pRepo) {
|
||||
if (tsdbStartCommit(pRepo) < 0) {
|
||||
tsdbError("vgId:%d failed to commit data while startting to commit since %s", REPO_ID(pRepo), tstrerror(terrno));
|
||||
goto _err;
|
||||
}
|
||||
tsdbStartCommit(pRepo);
|
||||
|
||||
// Commit to update meta file
|
||||
if (tsdbCommitMeta(pRepo) < 0) {
|
||||
|
@ -138,11 +135,15 @@ static int tsdbCommitMeta(STsdbRepo *pRepo) {
|
|||
tsdbInitMFile(&mf, did, REPO_ID(pRepo), FS_TXN_VERSION(REPO_FS(pRepo)));
|
||||
|
||||
if (tsdbCreateMFile(&mf, true) < 0) {
|
||||
tsdbError("vgId:%d failed to create META file since %s", REPO_ID(pRepo), tstrerror(terrno));
|
||||
return -1;
|
||||
}
|
||||
|
||||
tsdbInfo("vgId:%d meta file %s is created to commit", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(&mf));
|
||||
} else {
|
||||
tsdbInitMFileEx(&mf, pOMFile);
|
||||
if (tsdbOpenMFile(&mf, O_WRONLY) < 0) {
|
||||
tsdbError("vgId:%d failed to open META file since %s", REPO_ID(pRepo), tstrerror(terrno));
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
@ -154,12 +155,20 @@ static int tsdbCommitMeta(STsdbRepo *pRepo) {
|
|||
if (pAct->act == TSDB_UPDATE_META) {
|
||||
pCont = (SActCont *)POINTER_SHIFT(pAct, sizeof(SActObj));
|
||||
if (tsdbUpdateMetaRecord(pfs, &mf, pAct->uid, (void *)(pCont->cont), pCont->len) < 0) {
|
||||
tsdbError("vgId:%d failed to update META record, uid %" PRIu64 " since %s", REPO_ID(pRepo), pAct->uid,
|
||||
tstrerror(terrno));
|
||||
tsdbCloseMFile(&mf);
|
||||
tsdbApplyMFileChange(&mf, pOMFile);
|
||||
// TODO: need to reload metaCache
|
||||
return -1;
|
||||
}
|
||||
} else if (pAct->act == TSDB_DROP_META) {
|
||||
if (tsdbDropMetaRecord(pfs, &mf, pAct->uid) < 0) {
|
||||
tsdbError("vgId:%d failed to drop META record, uid %" PRIu64 " since %s", REPO_ID(pRepo), pAct->uid,
|
||||
tstrerror(terrno));
|
||||
tsdbCloseMFile(&mf);
|
||||
tsdbApplyMFileChange(&mf, pOMFile);
|
||||
// TODO: need to reload metaCache
|
||||
return -1;
|
||||
}
|
||||
} else {
|
||||
|
@ -168,6 +177,9 @@ static int tsdbCommitMeta(STsdbRepo *pRepo) {
|
|||
}
|
||||
|
||||
if (tsdbUpdateMFileHeader(&mf) < 0) {
|
||||
tsdbError("vgId:%d failed to update META file header since %s, revert it", REPO_ID(pRepo), tstrerror(terrno));
|
||||
tsdbApplyMFileChange(&mf, pOMFile);
|
||||
// TODO: need to reload metaCache
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -208,6 +220,8 @@ void tsdbGetRtnSnap(STsdbRepo *pRepo, SRtn *pRtn) {
|
|||
pRtn->minFid = (int)(TSDB_KEY_FID(minKey, pCfg->daysPerFile, pCfg->precision));
|
||||
pRtn->midFid = (int)(TSDB_KEY_FID(midKey, pCfg->daysPerFile, pCfg->precision));
|
||||
pRtn->maxFid = (int)(TSDB_KEY_FID(maxKey, pCfg->daysPerFile, pCfg->precision));
|
||||
tsdbDebug("vgId:%d now:%" PRId64 " minKey:%" PRId64 " minFid:%d, midFid:%d, maxFid:%d", REPO_ID(pRepo), now, minKey,
|
||||
pRtn->minFid, pRtn->midFid, pRtn->maxFid);
|
||||
}
|
||||
|
||||
static int tsdbUpdateMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid, void *cont, int contLen) {
|
||||
|
@ -238,7 +252,7 @@ static int tsdbUpdateMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid, void
|
|||
tsdbUpdateMFileMagic(pMFile, POINTER_SHIFT(cont, contLen - sizeof(TSCKSUM)));
|
||||
SKVRecord *pRecord = taosHashGet(pfs->metaCache, (void *)&uid, sizeof(uid));
|
||||
if (pRecord != NULL) {
|
||||
pMFile->info.tombSize += pRecord->size;
|
||||
pMFile->info.tombSize += (pRecord->size + sizeof(SKVRecord));
|
||||
} else {
|
||||
pMFile->info.nRecords++;
|
||||
}
|
||||
|
@ -253,7 +267,7 @@ static int tsdbDropMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid) {
|
|||
|
||||
SKVRecord *pRecord = taosHashGet(pfs->metaCache, (void *)(&uid), sizeof(uid));
|
||||
if (pRecord == NULL) {
|
||||
tsdbError("failed to drop KV store record with key %" PRIu64 " since not find", uid);
|
||||
tsdbError("failed to drop META record with key %" PRIu64 " since not find", uid);
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -264,11 +278,11 @@ static int tsdbDropMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid) {
|
|||
void *pBuf = buf;
|
||||
tsdbEncodeKVRecord(&pBuf, &rInfo);
|
||||
|
||||
if (tsdbAppendMFile(pMFile, buf, POINTER_DISTANCE(pBuf, buf), NULL) < 0) {
|
||||
if (tsdbAppendMFile(pMFile, buf, sizeof(SKVRecord), NULL) < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
pMFile->info.magic = taosCalcChecksum(pMFile->info.magic, (uint8_t *)buf, (uint32_t)POINTER_DISTANCE(pBuf, buf));
|
||||
pMFile->info.magic = taosCalcChecksum(pMFile->info.magic, (uint8_t *)buf, sizeof(SKVRecord));
|
||||
pMFile->info.nDels++;
|
||||
pMFile->info.nRecords--;
|
||||
pMFile->info.tombSize += (rInfo.size + sizeof(SKVRecord) * 2);
|
||||
|
@ -302,7 +316,12 @@ static int tsdbCommitTSData(STsdbRepo *pRepo) {
|
|||
// Skip expired memory data and expired FSET
|
||||
tsdbSeekCommitIter(&commith, commith.rtn.minKey);
|
||||
while ((pSet = tsdbFSIterNext(&(commith.fsIter)))) {
|
||||
if (pSet->fid >= commith.rtn.minFid) break;
|
||||
if (pSet->fid < commith.rtn.minFid) {
|
||||
tsdbInfo("vgId:%d FSET %d on level %d disk id %d expires, remove it", REPO_ID(pRepo), pSet->fid,
|
||||
TSDB_FSET_LEVEL(pSet), TSDB_FSET_ID(pSet));
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Loop to commit to each file
|
||||
|
@ -349,7 +368,7 @@ static int tsdbCommitTSData(STsdbRepo *pRepo) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int tsdbStartCommit(STsdbRepo *pRepo) {
|
||||
static void tsdbStartCommit(STsdbRepo *pRepo) {
|
||||
SMemTable *pMem = pRepo->imem;
|
||||
|
||||
ASSERT(pMem->numOfRows > 0 || listNEles(pMem->actList) > 0);
|
||||
|
@ -360,7 +379,6 @@ static int tsdbStartCommit(STsdbRepo *pRepo) {
|
|||
tsdbStartFSTxn(pRepo, pMem->pointsAdd, pMem->storageAdd);
|
||||
|
||||
pRepo->code = TSDB_CODE_SUCCESS;
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void tsdbEndCommit(STsdbRepo *pRepo, int eno) {
|
||||
|
@ -413,14 +431,18 @@ static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid) {
|
|||
if (pIter->pTable == NULL) continue;
|
||||
|
||||
if (tsdbCommitToTable(pCommith, tid) < 0) {
|
||||
// TODO: revert the file change
|
||||
tsdbCloseCommitFile(pCommith, true);
|
||||
// revert the file change
|
||||
tsdbApplyDFileSetChange(TSDB_COMMIT_WRITE_FSET(pCommith), pSet);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
if (tsdbWriteBlockIdx(pCommith) < 0) {
|
||||
tsdbError("vgId:%d failed to write SBlockIdx part to FSET %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno));
|
||||
tsdbCloseCommitFile(pCommith, true);
|
||||
// revert the file change
|
||||
tsdbApplyDFileSetChange(TSDB_COMMIT_WRITE_FSET(pCommith), pSet);
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -674,7 +696,11 @@ static int tsdbCommitToTable(SCommitH *pCommith, int tid) {
|
|||
|
||||
TSDB_RUNLOCK_TABLE(pIter->pTable);
|
||||
|
||||
if (tsdbWriteBlockInfo(pCommith) < 0) return -1;
|
||||
if (tsdbWriteBlockInfo(pCommith) < 0) {
|
||||
tsdbError("vgId:%d failed to write SBlockInfo part into file %s since %s", TSDB_COMMIT_REPO_ID(pCommith),
|
||||
TSDB_FILE_FULL_NAME(TSDB_COMMIT_HEAD_FILE(pCommith)), tstrerror(terrno));
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -926,6 +952,8 @@ static int tsdbWriteBlockIdx(SCommitH *pCommih) {
|
|||
|
||||
if (nidx <= 0) {
|
||||
// All data are deleted
|
||||
pHeadf->info.offset = 0;
|
||||
pHeadf->info.len = 0;
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -1227,7 +1255,6 @@ static void tsdbResetCommitFile(SCommitH *pCommith) {
|
|||
}
|
||||
|
||||
static void tsdbResetCommitTable(SCommitH *pCommith) {
|
||||
tdResetDataCols(pCommith->pDataCols);
|
||||
taosArrayClear(pCommith->aSubBlk);
|
||||
taosArrayClear(pCommith->aSupBlk);
|
||||
pCommith->pTable = NULL;
|
||||
|
@ -1256,6 +1283,9 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid
|
|||
tsdbCloseAndUnsetFSet(&(pCommith->readh));
|
||||
return -1;
|
||||
}
|
||||
|
||||
tsdbDebug("vgId:%d FSET %d at level %d disk id %d is opened to read to commit", REPO_ID(pRepo), TSDB_FSET_FID(pSet),
|
||||
TSDB_FSET_LEVEL(pSet), TSDB_FSET_ID(pSet));
|
||||
} else {
|
||||
pCommith->isRFileSet = false;
|
||||
}
|
||||
|
@ -1266,6 +1296,8 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid
|
|||
tsdbInitDFileSet(pWSet, did, REPO_ID(pRepo), fid, FS_TXN_VERSION(REPO_FS(pRepo)));
|
||||
|
||||
if (tsdbCreateDFileSet(pWSet, true) < 0) {
|
||||
tsdbError("vgId:%d failed to create FSET %d at level %d disk id %d since %s", REPO_ID(pRepo),
|
||||
TSDB_FSET_FID(pWSet), TSDB_FSET_LEVEL(pWSet), TSDB_FSET_ID(pWSet), tstrerror(terrno));
|
||||
if (pCommith->isRFileSet) {
|
||||
tsdbCloseAndUnsetFSet(&(pCommith->readh));
|
||||
}
|
||||
|
@ -1274,6 +1306,9 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid
|
|||
|
||||
pCommith->isDFileSame = false;
|
||||
pCommith->isLFileSame = false;
|
||||
|
||||
tsdbDebug("vgId:%d FSET %d at level %d disk id %d is created to commit", REPO_ID(pRepo), TSDB_FSET_FID(pWSet),
|
||||
TSDB_FSET_LEVEL(pWSet), TSDB_FSET_ID(pWSet));
|
||||
} else {
|
||||
did.level = TSDB_FSET_LEVEL(pSet);
|
||||
did.id = TSDB_FSET_ID(pSet);
|
||||
|
@ -1285,6 +1320,9 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid
|
|||
SDFile *pWHeadf = TSDB_COMMIT_HEAD_FILE(pCommith);
|
||||
tsdbInitDFile(pWHeadf, did, REPO_ID(pRepo), fid, FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_HEAD);
|
||||
if (tsdbCreateDFile(pWHeadf, true) < 0) {
|
||||
tsdbError("vgId:%d failed to create file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWHeadf),
|
||||
tstrerror(terrno));
|
||||
|
||||
if (pCommith->isRFileSet) {
|
||||
tsdbCloseAndUnsetFSet(&(pCommith->readh));
|
||||
return -1;
|
||||
|
@ -1296,7 +1334,10 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid
|
|||
SDFile *pWDataf = TSDB_COMMIT_DATA_FILE(pCommith);
|
||||
tsdbInitDFileEx(pWDataf, pRDataf);
|
||||
if (tsdbOpenDFile(pWDataf, O_WRONLY) < 0) {
|
||||
tsdbCloseDFile(pWHeadf);
|
||||
tsdbError("vgId:%d failed to open file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWDataf),
|
||||
tstrerror(terrno));
|
||||
|
||||
tsdbCloseDFileSet(pWSet);
|
||||
tsdbRemoveDFile(pWHeadf);
|
||||
if (pCommith->isRFileSet) {
|
||||
tsdbCloseAndUnsetFSet(&(pCommith->readh));
|
||||
|
@ -1313,6 +1354,9 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid
|
|||
pCommith->isLFileSame = true;
|
||||
|
||||
if (tsdbOpenDFile(pWLastf, O_WRONLY) < 0) {
|
||||
tsdbError("vgId:%d failed to open file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWLastf),
|
||||
tstrerror(terrno));
|
||||
|
||||
tsdbCloseDFileSet(pWSet);
|
||||
tsdbRemoveDFile(pWHeadf);
|
||||
if (pCommith->isRFileSet) {
|
||||
|
@ -1325,6 +1369,9 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid
|
|||
pCommith->isLFileSame = false;
|
||||
|
||||
if (tsdbCreateDFile(pWLastf, true) < 0) {
|
||||
tsdbError("vgId:%d failed to create file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWLastf),
|
||||
tstrerror(terrno));
|
||||
|
||||
tsdbCloseDFileSet(pWSet);
|
||||
tsdbRemoveDFile(pWHeadf);
|
||||
if (pCommith->isRFileSet) {
|
||||
|
@ -1360,7 +1407,7 @@ static bool tsdbCanAddSubBlock(SCommitH *pCommith, SBlock *pBlock, SMergeInfo *p
|
|||
if (pBlock->last) {
|
||||
if (pCommith->isLFileSame && mergeRows < pCfg->minRowsPerFileBlock) return true;
|
||||
} else {
|
||||
if (mergeRows < pCfg->maxRowsPerFileBlock) return true;
|
||||
if (pCommith->isDFileSame && mergeRows <= pCfg->maxRowsPerFileBlock) return true;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1373,12 +1420,16 @@ static int tsdbApplyRtn(STsdbRepo *pRepo) {
|
|||
STsdbFS * pfs = REPO_FS(pRepo);
|
||||
SDFileSet *pSet;
|
||||
|
||||
// Get retentioni snapshot
|
||||
// Get retention snapshot
|
||||
tsdbGetRtnSnap(pRepo, &rtn);
|
||||
|
||||
tsdbFSIterInit(&fsiter, pfs, TSDB_FS_ITER_FORWARD);
|
||||
while ((pSet = tsdbFSIterNext(&fsiter))) {
|
||||
if (pSet->fid < rtn.minFid) continue;
|
||||
if (pSet->fid < rtn.minFid) {
|
||||
tsdbInfo("vgId:%d FSET %d at level %d disk id %d expires, remove it", REPO_ID(pRepo), pSet->fid,
|
||||
TSDB_FSET_LEVEL(pSet), TSDB_FSET_ID(pSet));
|
||||
continue;
|
||||
}
|
||||
|
||||
if (tsdbApplyRtnOnFSet(pRepo, pSet, &rtn) < 0) {
|
||||
return -1;
|
||||
|
@ -1392,10 +1443,13 @@ static int tsdbApplyRtnOnFSet(STsdbRepo *pRepo, SDFileSet *pSet, SRtn *pRtn) {
|
|||
SDiskID did;
|
||||
SDFileSet nSet;
|
||||
STsdbFS * pfs = REPO_FS(pRepo);
|
||||
int level;
|
||||
|
||||
ASSERT(pSet->fid >= pRtn->minFid);
|
||||
|
||||
tfsAllocDisk(tsdbGetFidLevel(pSet->fid, pRtn), &(did.level), &(did.id));
|
||||
level = tsdbGetFidLevel(pSet->fid, pRtn);
|
||||
|
||||
tfsAllocDisk(level, &(did.level), &(did.id));
|
||||
if (did.level == TFS_UNDECIDED_LEVEL) {
|
||||
terrno = TSDB_CODE_TDB_NO_AVAIL_DISK;
|
||||
return -1;
|
||||
|
@ -1406,12 +1460,17 @@ static int tsdbApplyRtnOnFSet(STsdbRepo *pRepo, SDFileSet *pSet, SRtn *pRtn) {
|
|||
tsdbInitDFileSet(&nSet, did, REPO_ID(pRepo), pSet->fid, FS_TXN_VERSION(pfs));
|
||||
|
||||
if (tsdbCopyDFileSet(pSet, &nSet) < 0) {
|
||||
tsdbError("vgId:%d failed to copy FSET %d from level %d to level %d since %s", REPO_ID(pRepo), pSet->fid,
|
||||
TSDB_FSET_LEVEL(pSet), did.level, tstrerror(terrno));
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (tsdbUpdateDFileSet(pfs, &nSet) < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
tsdbInfo("vgId:%d FSET %d is copied from level %d disk id %d to level %d disk id %d", REPO_ID(pRepo), pSet->fid,
|
||||
TSDB_FSET_LEVEL(pSet), TSDB_FSET_ID(pSet), did.level, did.id);
|
||||
} else {
|
||||
// On a correct level
|
||||
if (tsdbUpdateDFileSet(pfs, pSet) < 0) {
|
||||
|
|
|
@ -701,6 +701,8 @@ int tsdbLoadMetaCache(STsdbRepo *pRepo, bool recoverMeta) {
|
|||
int64_t maxBufSize = 0;
|
||||
SMFInfo minfo;
|
||||
|
||||
taosHashEmpty(pfs->metaCache);
|
||||
|
||||
// No meta file, just return
|
||||
if (pfs->cstatus->pmf == NULL) return 0;
|
||||
|
||||
|
@ -718,6 +720,12 @@ int tsdbLoadMetaCache(STsdbRepo *pRepo, bool recoverMeta) {
|
|||
while (true) {
|
||||
int64_t tsize = tsdbReadMFile(pMFile, tbuf, sizeof(SKVRecord));
|
||||
if (tsize == 0) break;
|
||||
|
||||
if (tsize < 0) {
|
||||
tsdbError("vgId:%d failed to read META file since %s", REPO_ID(pRepo), tstrerror(terrno));
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (tsize < sizeof(SKVRecord)) {
|
||||
tsdbError("vgId:%d failed to read %" PRIzu " bytes from file %s", REPO_ID(pRepo), sizeof(SKVRecord),
|
||||
TSDB_FILE_FULL_NAME(pMFile));
|
||||
|
@ -840,7 +848,7 @@ static int tsdbScanRootDir(STsdbRepo *pRepo) {
|
|||
continue;
|
||||
}
|
||||
|
||||
if (tfsIsSameFile(pf, &(pfs->cstatus->pmf->f))) {
|
||||
if (pfs->cstatus->pmf && tfsIsSameFile(pf, &(pfs->cstatus->pmf->f))) {
|
||||
continue;
|
||||
}
|
||||
|
||||
|
|
|
@ -42,7 +42,7 @@ void tsdbInitMFile(SMFile *pMFile, SDiskID did, int vid, uint32_t ver) {
|
|||
tfsInitFile(TSDB_FILE_F(pMFile), did.level, did.id, fname);
|
||||
}
|
||||
|
||||
void tsdbInitMFileEx(SMFile *pMFile, SMFile *pOMFile) {
|
||||
void tsdbInitMFileEx(SMFile *pMFile, const SMFile *pOMFile) {
|
||||
*pMFile = *pOMFile;
|
||||
TSDB_FILE_SET_CLOSED(pMFile);
|
||||
}
|
||||
|
|
|
@ -34,6 +34,7 @@ static void vnodeLoadCfg(SVnodeObj *pVnode, SCreateVnodeMsg* vnodeMsg) {
|
|||
pVnode->tsdbCfg.maxRowsPerFileBlock = vnodeMsg->cfg.maxRowsPerFileBlock;
|
||||
pVnode->tsdbCfg.precision = vnodeMsg->cfg.precision;
|
||||
pVnode->tsdbCfg.compression = vnodeMsg->cfg.compression;
|
||||
pVnode->tsdbCfg.update = vnodeMsg->cfg.update;
|
||||
pVnode->tsdbCfg.cacheLastRow = vnodeMsg->cfg.cacheLastRow;
|
||||
pVnode->walCfg.walLevel = vnodeMsg->cfg.walLevel;
|
||||
pVnode->walCfg.fsyncPeriod = vnodeMsg->cfg.fsyncPeriod;
|
||||
|
@ -227,6 +228,15 @@ int32_t vnodeReadCfg(SVnodeObj *pVnode) {
|
|||
}
|
||||
vnodeMsg.cfg.quorum = (int8_t)quorum->valueint;
|
||||
|
||||
cJSON *update = cJSON_GetObjectItem(root, "update");
|
||||
if (!update || update->type != cJSON_Number) {
|
||||
vError("vgId: %d, failed to read %s, update not found", pVnode->vgId, file);
|
||||
vnodeMsg.cfg.update = 0;
|
||||
vnodeMsg.cfg.vgCfgVersion = 0;
|
||||
} else {
|
||||
vnodeMsg.cfg.update = (int8_t)update->valueint;
|
||||
}
|
||||
|
||||
cJSON *cacheLastRow = cJSON_GetObjectItem(root, "cacheLastRow");
|
||||
if (!cacheLastRow || cacheLastRow->type != cJSON_Number) {
|
||||
vError("vgId: %d, failed to read %s, cacheLastRow not found", pVnode->vgId, file);
|
||||
|
@ -325,6 +335,7 @@ int32_t vnodeWriteCfg(SCreateVnodeMsg *pMsg) {
|
|||
len += snprintf(content + len, maxLen - len, " \"dbReplica\": %d,\n", pMsg->cfg.dbReplica);
|
||||
len += snprintf(content + len, maxLen - len, " \"wals\": %d,\n", pMsg->cfg.wals);
|
||||
len += snprintf(content + len, maxLen - len, " \"quorum\": %d,\n", pMsg->cfg.quorum);
|
||||
len += snprintf(content + len, maxLen - len, " \"update\": %d,\n", pMsg->cfg.update);
|
||||
len += snprintf(content + len, maxLen - len, " \"cacheLastRow\": %d,\n", pMsg->cfg.cacheLastRow);
|
||||
len += snprintf(content + len, maxLen - len, " \"nodeInfos\": [{\n");
|
||||
for (int32_t i = 0; i < pMsg->cfg.vgReplica; i++) {
|
||||
|
|
|
@ -44,7 +44,8 @@ class TDTestRetetion:
|
|||
caller = inspect.getframeinfo(inspect.stack()[1][0])
|
||||
args = (caller.filename, caller.lineno, sql, self.queryRows, expectRows)
|
||||
os.system("sudo timedatectl set-ntp true")
|
||||
time.sleep(40)
|
||||
os.system("date -s '%s'"%(datetime.datetime.now()+datetime.timedelta(hours=1)))
|
||||
time.sleep(5)
|
||||
tdLog.exit("%s(%d) failed: sql:%s, queryRows:%d != expect:%d" % args)
|
||||
|
||||
def run(self):
|
||||
|
@ -63,7 +64,7 @@ class TDTestRetetion:
|
|||
tdLog.info("=============== step2")
|
||||
tdDnodes.stop(1)
|
||||
os.system("sudo timedatectl set-ntp false")
|
||||
os.system("sudo date -s $(date -d \"${DATE} 2 days\" \"+%Y%m%d\")")
|
||||
os.system("date -s '%s'"%(datetime.datetime.now()+datetime.timedelta(hours=48)))
|
||||
tdDnodes.start(1)
|
||||
cmd = 'insert into test values(now,5);'
|
||||
tdDnodes.stop(1)
|
||||
|
@ -79,7 +80,7 @@ class TDTestRetetion:
|
|||
self.checkRows(5,cmd)
|
||||
tdLog.info("=============== step3")
|
||||
tdDnodes.stop(1)
|
||||
os.system("sudo date -s $(date -d \"${DATE} 2 days\" \"+%Y%m%d\")")
|
||||
os.system("date -s '%s'"%(datetime.datetime.now()+datetime.timedelta(hours=48)))
|
||||
tdDnodes.start(1)
|
||||
tdLog.info(cmd)
|
||||
tdSql.execute(cmd)
|
||||
|
@ -110,7 +111,8 @@ class TDTestRetetion:
|
|||
|
||||
def stop(self):
|
||||
os.system("sudo timedatectl set-ntp true")
|
||||
time.sleep(40)
|
||||
os.system("date -s '%s'"%(datetime.datetime.now()+datetime.timedelta(hours=1)))
|
||||
time.sleep(5)
|
||||
tdSql.close()
|
||||
tdLog.success("%s successfully executed" % __file__)
|
||||
|
||||
|
|
|
@ -1,18 +1,18 @@
|
|||
|
||||
|
||||
# update
|
||||
#python3 ./test.py -f update/allow_update.py
|
||||
python3 ./test.py -f update/allow_update.py
|
||||
python3 ./test.py -f update/allow_update-0.py
|
||||
python3 ./test.py -f update/append_commit_data.py
|
||||
python3 ./test.py -f update/append_commit_last-0.py
|
||||
python3 ./test.py -f update/append_commit_last.py
|
||||
#python3 ./test.py -f update/merge_commit_data.py
|
||||
#python3 ./test.py -f update/merge_commit_data-0.py
|
||||
#python3 ./test.py -f update/merge_commit_data2.py
|
||||
#python3 ./test.py -f update/merge_commit_data2_update0.py
|
||||
#python3 ./test.py -f update/merge_commit_last-0.py
|
||||
#python3 ./test.py -f update/merge_commit_last.py
|
||||
#python3 ./test.py -f update/bug_td2279.py
|
||||
python3 ./test.py -f update/merge_commit_data.py
|
||||
python3 ./test.py -f update/merge_commit_data-0.py
|
||||
python3 ./test.py -f update/merge_commit_data2.py
|
||||
python3 ./test.py -f update/merge_commit_data2_update0.py
|
||||
python3 ./test.py -f update/merge_commit_last-0.py
|
||||
python3 ./test.py -f update/merge_commit_last.py
|
||||
python3 ./test.py -f update/bug_td2279.py
|
||||
|
||||
# wal
|
||||
python3 ./test.py -f wal/addOldWalTest.py
|
||||
|
|
Loading…
Reference in New Issue