fix two review bugs
This commit is contained in:
parent
89d9be4d78
commit
291aaff1c6
|
@ -312,11 +312,11 @@ static int tsdbCommitTSData(STsdbRepo *pRepo) {
|
||||||
// Skip expired memory data and expired FSET
|
// Skip expired memory data and expired FSET
|
||||||
tsdbSeekCommitIter(&commith, commith.rtn.minKey);
|
tsdbSeekCommitIter(&commith, commith.rtn.minKey);
|
||||||
while ((pSet = tsdbFSIterNext(&(commith.fsIter)))) {
|
while ((pSet = tsdbFSIterNext(&(commith.fsIter)))) {
|
||||||
if (pSet->fid >= commith.rtn.minFid) {
|
if (pSet->fid < commith.rtn.minFid) {
|
||||||
break;
|
|
||||||
} else {
|
|
||||||
tsdbInfo("vgId:%d FSET %d on level %d disk id %d expires, remove it", REPO_ID(pRepo), pSet->fid,
|
tsdbInfo("vgId:%d FSET %d on level %d disk id %d expires, remove it", REPO_ID(pRepo), pSet->fid,
|
||||||
TSDB_FSET_LEVEL(pSet), TSDB_FSET_ID(pSet));
|
TSDB_FSET_LEVEL(pSet), TSDB_FSET_ID(pSet));
|
||||||
|
} else {
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -427,14 +427,18 @@ static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid) {
|
||||||
if (pIter->pTable == NULL) continue;
|
if (pIter->pTable == NULL) continue;
|
||||||
|
|
||||||
if (tsdbCommitToTable(pCommith, tid) < 0) {
|
if (tsdbCommitToTable(pCommith, tid) < 0) {
|
||||||
// TODO: revert the file change
|
|
||||||
tsdbCloseCommitFile(pCommith, true);
|
tsdbCloseCommitFile(pCommith, true);
|
||||||
|
// revert the file change
|
||||||
|
tsdbApplyDFileSetChange(TSDB_COMMIT_WRITE_FSET(pCommith), pSet);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tsdbWriteBlockIdx(pCommith) < 0) {
|
if (tsdbWriteBlockIdx(pCommith) < 0) {
|
||||||
|
tsdbError("vgId:%d failed to write SBlockIdx part to FSET %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno));
|
||||||
tsdbCloseCommitFile(pCommith, true);
|
tsdbCloseCommitFile(pCommith, true);
|
||||||
|
// revert the file change
|
||||||
|
tsdbApplyDFileSetChange(TSDB_COMMIT_WRITE_FSET(pCommith), pSet);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -688,7 +692,11 @@ static int tsdbCommitToTable(SCommitH *pCommith, int tid) {
|
||||||
|
|
||||||
TSDB_RUNLOCK_TABLE(pIter->pTable);
|
TSDB_RUNLOCK_TABLE(pIter->pTable);
|
||||||
|
|
||||||
if (tsdbWriteBlockInfo(pCommith) < 0) return -1;
|
if (tsdbWriteBlockInfo(pCommith) < 0) {
|
||||||
|
tsdbError("vgId:%d failed to write SBlockInfo part into file %s since %s", REPO_ID(pRepo),
|
||||||
|
TSDB_FILE_FULL_NAME(TSDB_COMMIT_HEAD_FILE(pCommith)), tstrerror(terrno));
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -940,6 +948,8 @@ static int tsdbWriteBlockIdx(SCommitH *pCommih) {
|
||||||
|
|
||||||
if (nidx <= 0) {
|
if (nidx <= 0) {
|
||||||
// All data are deleted
|
// All data are deleted
|
||||||
|
pHeadf->info.offset = 0;
|
||||||
|
pHeadf->info.len = 0;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1241,7 +1251,6 @@ static void tsdbResetCommitFile(SCommitH *pCommith) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static void tsdbResetCommitTable(SCommitH *pCommith) {
|
static void tsdbResetCommitTable(SCommitH *pCommith) {
|
||||||
tdResetDataCols(pCommith->pDataCols);
|
|
||||||
taosArrayClear(pCommith->aSubBlk);
|
taosArrayClear(pCommith->aSubBlk);
|
||||||
taosArrayClear(pCommith->aSupBlk);
|
taosArrayClear(pCommith->aSupBlk);
|
||||||
pCommith->pTable = NULL;
|
pCommith->pTable = NULL;
|
||||||
|
@ -1270,6 +1279,9 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid
|
||||||
tsdbCloseAndUnsetFSet(&(pCommith->readh));
|
tsdbCloseAndUnsetFSet(&(pCommith->readh));
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tsdbDebug("vgId:%d FSET %d at level %d disk id %d is opened to read to commit", REPO_ID(pRepo), TSDB_FSET_FID(pSet),
|
||||||
|
TSDB_FSET_LEVEL(pSet), TSDB_FSET_ID(pSet));
|
||||||
} else {
|
} else {
|
||||||
pCommith->isRFileSet = false;
|
pCommith->isRFileSet = false;
|
||||||
}
|
}
|
||||||
|
@ -1280,6 +1292,8 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid
|
||||||
tsdbInitDFileSet(pWSet, did, REPO_ID(pRepo), fid, FS_TXN_VERSION(REPO_FS(pRepo)));
|
tsdbInitDFileSet(pWSet, did, REPO_ID(pRepo), fid, FS_TXN_VERSION(REPO_FS(pRepo)));
|
||||||
|
|
||||||
if (tsdbCreateDFileSet(pWSet, true) < 0) {
|
if (tsdbCreateDFileSet(pWSet, true) < 0) {
|
||||||
|
tsdbError("vgId:%d failed to create FSET %d at level %d disk id %d since %s", REPO_ID(pRepo),
|
||||||
|
TSDB_FSET_FID(pWSet), TSDB_FSET_LEVEL(pWSet), TSDB_FSET_ID(pWSet), tstrerror(terrno));
|
||||||
if (pCommith->isRFileSet) {
|
if (pCommith->isRFileSet) {
|
||||||
tsdbCloseAndUnsetFSet(&(pCommith->readh));
|
tsdbCloseAndUnsetFSet(&(pCommith->readh));
|
||||||
}
|
}
|
||||||
|
@ -1288,6 +1302,9 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid
|
||||||
|
|
||||||
pCommith->isDFileSame = false;
|
pCommith->isDFileSame = false;
|
||||||
pCommith->isLFileSame = false;
|
pCommith->isLFileSame = false;
|
||||||
|
|
||||||
|
tsdbDebug("vgId:%d FSET %d at level %d disk id %d is created to commit", REPO_ID(pRepo), TSDB_FSET_FID(pWSet),
|
||||||
|
TSDB_FSET_LEVEL(pWSet), TSDB_FSET_ID(pWSet));
|
||||||
} else {
|
} else {
|
||||||
did.level = TSDB_FSET_LEVEL(pSet);
|
did.level = TSDB_FSET_LEVEL(pSet);
|
||||||
did.id = TSDB_FSET_ID(pSet);
|
did.id = TSDB_FSET_ID(pSet);
|
||||||
|
@ -1299,6 +1316,9 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid
|
||||||
SDFile *pWHeadf = TSDB_COMMIT_HEAD_FILE(pCommith);
|
SDFile *pWHeadf = TSDB_COMMIT_HEAD_FILE(pCommith);
|
||||||
tsdbInitDFile(pWHeadf, did, REPO_ID(pRepo), fid, FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_HEAD);
|
tsdbInitDFile(pWHeadf, did, REPO_ID(pRepo), fid, FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_HEAD);
|
||||||
if (tsdbCreateDFile(pWHeadf, true) < 0) {
|
if (tsdbCreateDFile(pWHeadf, true) < 0) {
|
||||||
|
tsdbError("vgId:%d failed to create file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWHeadf),
|
||||||
|
tstrerror(terrno));
|
||||||
|
|
||||||
if (pCommith->isRFileSet) {
|
if (pCommith->isRFileSet) {
|
||||||
tsdbCloseAndUnsetFSet(&(pCommith->readh));
|
tsdbCloseAndUnsetFSet(&(pCommith->readh));
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -1310,7 +1330,10 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid
|
||||||
SDFile *pWDataf = TSDB_COMMIT_DATA_FILE(pCommith);
|
SDFile *pWDataf = TSDB_COMMIT_DATA_FILE(pCommith);
|
||||||
tsdbInitDFileEx(pWDataf, pRDataf);
|
tsdbInitDFileEx(pWDataf, pRDataf);
|
||||||
if (tsdbOpenDFile(pWDataf, O_WRONLY) < 0) {
|
if (tsdbOpenDFile(pWDataf, O_WRONLY) < 0) {
|
||||||
tsdbCloseDFile(pWHeadf);
|
tsdbError("vgId:%d failed to open file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWDataf),
|
||||||
|
tstrerror(terrno));
|
||||||
|
|
||||||
|
tsdbCloseDFileSet(pWSet);
|
||||||
tsdbRemoveDFile(pWHeadf);
|
tsdbRemoveDFile(pWHeadf);
|
||||||
if (pCommith->isRFileSet) {
|
if (pCommith->isRFileSet) {
|
||||||
tsdbCloseAndUnsetFSet(&(pCommith->readh));
|
tsdbCloseAndUnsetFSet(&(pCommith->readh));
|
||||||
|
@ -1327,6 +1350,9 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid
|
||||||
pCommith->isLFileSame = true;
|
pCommith->isLFileSame = true;
|
||||||
|
|
||||||
if (tsdbOpenDFile(pWLastf, O_WRONLY) < 0) {
|
if (tsdbOpenDFile(pWLastf, O_WRONLY) < 0) {
|
||||||
|
tsdbError("vgId:%d failed to open file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWLastf),
|
||||||
|
tstrerror(terrno));
|
||||||
|
|
||||||
tsdbCloseDFileSet(pWSet);
|
tsdbCloseDFileSet(pWSet);
|
||||||
tsdbRemoveDFile(pWHeadf);
|
tsdbRemoveDFile(pWHeadf);
|
||||||
if (pCommith->isRFileSet) {
|
if (pCommith->isRFileSet) {
|
||||||
|
@ -1339,6 +1365,9 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid
|
||||||
pCommith->isLFileSame = false;
|
pCommith->isLFileSame = false;
|
||||||
|
|
||||||
if (tsdbCreateDFile(pWLastf, true) < 0) {
|
if (tsdbCreateDFile(pWLastf, true) < 0) {
|
||||||
|
tsdbError("vgId:%d failed to create file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWLastf),
|
||||||
|
tstrerror(terrno));
|
||||||
|
|
||||||
tsdbCloseDFileSet(pWSet);
|
tsdbCloseDFileSet(pWSet);
|
||||||
tsdbRemoveDFile(pWHeadf);
|
tsdbRemoveDFile(pWHeadf);
|
||||||
if (pCommith->isRFileSet) {
|
if (pCommith->isRFileSet) {
|
||||||
|
@ -1374,7 +1403,7 @@ static bool tsdbCanAddSubBlock(SCommitH *pCommith, SBlock *pBlock, SMergeInfo *p
|
||||||
if (pBlock->last) {
|
if (pBlock->last) {
|
||||||
if (pCommith->isLFileSame && mergeRows < pCfg->minRowsPerFileBlock) return true;
|
if (pCommith->isLFileSame && mergeRows < pCfg->minRowsPerFileBlock) return true;
|
||||||
} else {
|
} else {
|
||||||
if (mergeRows < pCfg->maxRowsPerFileBlock) return true;
|
if (pCommith->isDFileSame && mergeRows <= pCfg->maxRowsPerFileBlock) return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue