more code
This commit is contained in:
parent
5b1492cb4b
commit
014bfc4ad4
|
@ -15,16 +15,6 @@
|
||||||
|
|
||||||
#include "tsdbDataFileRW.h"
|
#include "tsdbDataFileRW.h"
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
SFDataPtr brinBlkPtr[1];
|
|
||||||
SFDataPtr rsrvd[2];
|
|
||||||
} SHeadFooter;
|
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
SFDataPtr tombBlkPtr[1];
|
|
||||||
SFDataPtr rsrvd[2];
|
|
||||||
} STombFooter;
|
|
||||||
|
|
||||||
extern int32_t tsdbFileDoWriteTombBlock(STsdbFD *fd, STombBlock *tombBlock, int8_t cmprAlg, int64_t *fileSize,
|
extern int32_t tsdbFileDoWriteTombBlock(STsdbFD *fd, STombBlock *tombBlock, int8_t cmprAlg, int64_t *fileSize,
|
||||||
TTombBlkArray *tombBlkArray, uint8_t **bufArr);
|
TTombBlkArray *tombBlkArray, uint8_t **bufArr);
|
||||||
extern int32_t tsdbFileDoWriteTombBlk(STsdbFD *fd, const TTombBlkArray *tombBlkArray, SFDataPtr *ptr,
|
extern int32_t tsdbFileDoWriteTombBlk(STsdbFD *fd, const TTombBlkArray *tombBlkArray, SFDataPtr *ptr,
|
||||||
|
@ -1144,15 +1134,19 @@ _exit:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t tsdbFileWriteHeadFooter(STsdbFD *fd, int64_t *fileSize, const SHeadFooter *footer) {
|
||||||
|
int32_t code = tsdbWriteFile(fd, *fileSize, (const uint8_t *)footer, sizeof(*footer));
|
||||||
|
if (code) return code;
|
||||||
|
*fileSize += sizeof(*footer);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t tsdbDataFileWriteHeadFooter(SDataFileWriter *writer) {
|
static int32_t tsdbDataFileWriteHeadFooter(SDataFileWriter *writer) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
|
|
||||||
int32_t ftype = TSDB_FTYPE_HEAD;
|
code = tsdbFileWriteHeadFooter(writer->fd[TSDB_FTYPE_HEAD], &writer->files[TSDB_FTYPE_HEAD].size, writer->headFooter);
|
||||||
code = tsdbWriteFile(writer->fd[ftype], writer->files[ftype].size, (const uint8_t *)writer->headFooter,
|
|
||||||
sizeof(SHeadFooter));
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
writer->files[ftype].size += sizeof(SHeadFooter);
|
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
if (code) {
|
if (code) {
|
||||||
|
|
|
@ -29,6 +29,16 @@ typedef TARRAY2(SBlockIdx) TBlockIdxArray;
|
||||||
typedef TARRAY2(SDataBlk) TDataBlkArray;
|
typedef TARRAY2(SDataBlk) TDataBlkArray;
|
||||||
typedef TARRAY2(SColumnDataAgg) TColumnDataAggArray;
|
typedef TARRAY2(SColumnDataAgg) TColumnDataAggArray;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
SFDataPtr brinBlkPtr[1];
|
||||||
|
SFDataPtr rsrvd[2];
|
||||||
|
} SHeadFooter;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
SFDataPtr tombBlkPtr[1];
|
||||||
|
SFDataPtr rsrvd[2];
|
||||||
|
} STombFooter;
|
||||||
|
|
||||||
// SDataFileReader =============================================
|
// SDataFileReader =============================================
|
||||||
typedef struct SDataFileReader SDataFileReader;
|
typedef struct SDataFileReader SDataFileReader;
|
||||||
typedef struct SDataFileReaderConfig {
|
typedef struct SDataFileReaderConfig {
|
||||||
|
|
|
@ -16,106 +16,231 @@
|
||||||
#include "tsdbUpgrade.h"
|
#include "tsdbUpgrade.h"
|
||||||
|
|
||||||
// old
|
// old
|
||||||
extern void tsdbGetCurrentFName(STsdb *pTsdb, char *current, char *current_t);
|
extern void tsdbGetCurrentFName(STsdb *pTsdb, char *current, char *current_t);
|
||||||
|
extern int32_t tsdbReadDataBlockEx(SDataFReader *pReader, SDataBlk *pDataBlk, SBlockData *pBlockData);
|
||||||
|
|
||||||
// new
|
// new
|
||||||
extern int32_t save_fs(const TFileSetArray *arr, const char *fname);
|
extern int32_t save_fs(const TFileSetArray *arr, const char *fname);
|
||||||
extern int32_t current_fname(STsdb *pTsdb, char *fname, EFCurrentT ftype);
|
extern int32_t current_fname(STsdb *pTsdb, char *fname, EFCurrentT ftype);
|
||||||
|
extern int32_t tsdbFileWriteBrinBlock(STsdbFD *fd, SBrinBlock *brinBlock, int8_t cmprAlg, int64_t *fileSize,
|
||||||
|
TBrinBlkArray *brinBlkArray, uint8_t **bufArr);
|
||||||
|
extern int32_t tsdbFileWriteBrinBlk(STsdbFD *fd, TBrinBlkArray *brinBlkArray, SFDataPtr *ptr, int64_t *fileSize);
|
||||||
|
extern int32_t tsdbFileWriteHeadFooter(STsdbFD *fd, int64_t *fileSize, const SHeadFooter *footer);
|
||||||
|
|
||||||
|
static int32_t tsdbUpgradeHead(STsdb *tsdb, SDFileSet *pDFileSet, SDataFReader *reader, STFileSet *fset) {
|
||||||
|
int32_t code = 0;
|
||||||
|
int32_t lino = 0;
|
||||||
|
|
||||||
|
struct {
|
||||||
|
// config
|
||||||
|
int32_t maxRow;
|
||||||
|
int8_t cmprAlg;
|
||||||
|
int32_t szPage;
|
||||||
|
uint8_t *bufArr[8];
|
||||||
|
// reader
|
||||||
|
SArray *aBlockIdx;
|
||||||
|
SMapData mDataBlk[1];
|
||||||
|
SBlockData blockData[1];
|
||||||
|
// writer
|
||||||
|
STsdbFD *fd;
|
||||||
|
SBrinBlock brinBlock[1];
|
||||||
|
TBrinBlkArray brinBlkArray[1];
|
||||||
|
SHeadFooter footer[1];
|
||||||
|
} ctx[1] = {{
|
||||||
|
.maxRow = tsdb->pVnode->config.tsdbCfg.maxRows,
|
||||||
|
.cmprAlg = tsdb->pVnode->config.tsdbCfg.compression,
|
||||||
|
.szPage = tsdb->pVnode->config.tsdbPageSize,
|
||||||
|
}};
|
||||||
|
|
||||||
|
if ((ctx->aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx))) == NULL) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
}
|
||||||
|
|
||||||
|
code = tsdbReadBlockIdx(reader, ctx->aBlockIdx);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
|
if (taosArrayGetSize(ctx->aBlockIdx) == 0) {
|
||||||
|
goto _exit;
|
||||||
|
} else {
|
||||||
|
STFile file = {
|
||||||
|
.type = TSDB_FTYPE_HEAD,
|
||||||
|
.did = pDFileSet->diskId,
|
||||||
|
.fid = fset->fid,
|
||||||
|
.cid = pDFileSet->pHeadF->commitID,
|
||||||
|
.size = pDFileSet->pHeadF->size,
|
||||||
|
};
|
||||||
|
|
||||||
|
code = tsdbTFileObjInit(tsdb, &file, &fset->farr[TSDB_FTYPE_HEAD]);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
|
// open fd
|
||||||
|
char fname[TSDB_FILENAME_LEN];
|
||||||
|
tsdbTFileName(tsdb, &file, fname);
|
||||||
|
|
||||||
|
code = tsdbOpenFile(fname, ctx->szPage, TD_FILE_READ | TD_FILE_WRITE, &ctx->fd);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int32_t iBlockIdx = 0; iBlockIdx < taosArrayGetSize(ctx->aBlockIdx); ++iBlockIdx) {
|
||||||
|
SBlockIdx *pBlockIdx = taosArrayGet(ctx->aBlockIdx, iBlockIdx);
|
||||||
|
|
||||||
|
code = tsdbReadDataBlk(reader, pBlockIdx, ctx->mDataBlk);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
|
for (int32_t iDataBlk = 0; iDataBlk < ctx->mDataBlk->nItem; ++iDataBlk) {
|
||||||
|
SDataBlk dataBlk[1];
|
||||||
|
tMapDataGetItemByIdx(ctx->mDataBlk, iDataBlk, dataBlk, tGetDataBlk);
|
||||||
|
|
||||||
|
SBrinRecord record = {
|
||||||
|
.suid = pBlockIdx->suid,
|
||||||
|
.uid = pBlockIdx->uid,
|
||||||
|
.firstKey = dataBlk->minKey.ts,
|
||||||
|
.firstKeyVer = dataBlk->minKey.version,
|
||||||
|
.lastKey = dataBlk->maxKey.ts,
|
||||||
|
.lastKeyVer = dataBlk->maxKey.version,
|
||||||
|
.minVer = dataBlk->minVer,
|
||||||
|
.maxVer = dataBlk->maxVer,
|
||||||
|
.blockOffset = dataBlk->aSubBlock->offset,
|
||||||
|
.smaOffset = dataBlk->smaInfo.offset,
|
||||||
|
.blockSize = dataBlk->aSubBlock->szBlock,
|
||||||
|
.blockKeySize = dataBlk->aSubBlock->szKey,
|
||||||
|
.smaSize = dataBlk->smaInfo.size,
|
||||||
|
.numRow = dataBlk->nRow,
|
||||||
|
.count = dataBlk->nRow,
|
||||||
|
};
|
||||||
|
|
||||||
|
if (dataBlk->hasDup) {
|
||||||
|
code = tsdbReadDataBlockEx(reader, dataBlk, ctx->blockData);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
|
record.count = 1;
|
||||||
|
for (int32_t i = 1; i < ctx->blockData->nRow; ++i) {
|
||||||
|
if (ctx->blockData->aTSKEY[i] != ctx->blockData->aTSKEY[i - 1]) {
|
||||||
|
record.count++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
code = tBrinBlockPut(ctx->brinBlock, &record);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
|
if (BRIN_BLOCK_SIZE(ctx->brinBlock) >= ctx->maxRow) {
|
||||||
|
code = tsdbFileWriteBrinBlock(ctx->fd, ctx->brinBlock, ctx->cmprAlg, &fset->farr[TSDB_FTYPE_HEAD]->f->size,
|
||||||
|
ctx->brinBlkArray, ctx->bufArr);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (BRIN_BLOCK_SIZE(ctx->brinBlock) > 0) {
|
||||||
|
code = tsdbFileWriteBrinBlock(ctx->fd, ctx->brinBlock, ctx->cmprAlg, &fset->farr[TSDB_FTYPE_HEAD]->f->size,
|
||||||
|
ctx->brinBlkArray, ctx->bufArr);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
}
|
||||||
|
|
||||||
|
code =
|
||||||
|
tsdbFileWriteBrinBlk(ctx->fd, ctx->brinBlkArray, ctx->footer->brinBlkPtr, &fset->farr[TSDB_FTYPE_HEAD]->f->size);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
|
code = tsdbFileWriteHeadFooter(ctx->fd, &fset->farr[TSDB_FTYPE_HEAD]->f->size, ctx->footer);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
|
code = tsdbFsyncFile(ctx->fd);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
|
tsdbCloseFile(&ctx->fd);
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
if (code) {
|
||||||
|
TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code);
|
||||||
|
}
|
||||||
|
TARRAY2_DESTROY(ctx->brinBlkArray, NULL);
|
||||||
|
tBrinBlockDestroy(ctx->brinBlock);
|
||||||
|
tBlockDataDestroy(ctx->blockData);
|
||||||
|
tMapDataClear(ctx->mDataBlk);
|
||||||
|
taosArrayDestroy(ctx->aBlockIdx);
|
||||||
|
for (int32_t i = 0; i < ARRAY_SIZE(ctx->bufArr); ++i) {
|
||||||
|
tFree(ctx->bufArr[i]);
|
||||||
|
}
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t tsdbUpgradeData(STsdb *tsdb, SDFileSet *pDFileSet, SDataFReader *reader, STFileSet *fset) {
|
||||||
|
int32_t code = 0;
|
||||||
|
int32_t lino = 0;
|
||||||
|
|
||||||
|
// TODO
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
if (code) {
|
||||||
|
TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code);
|
||||||
|
}
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t tsdbUpgradeSma(STsdb *tsdb, SDFileSet *pDFileSet, SDataFReader *reader, STFileSet *fset) {
|
||||||
|
int32_t code = 0;
|
||||||
|
int32_t lino = 0;
|
||||||
|
|
||||||
|
// TODO
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
if (code) {
|
||||||
|
TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code);
|
||||||
|
}
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t tsdbUpgradeStt(STsdb *tsdb, SDFileSet *pDFileSet, SDataFReader *reader, STFileSet *fset) {
|
||||||
|
int32_t code = 0;
|
||||||
|
int32_t lino = 0;
|
||||||
|
|
||||||
|
// TODO
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
if (code) {
|
||||||
|
TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code);
|
||||||
|
}
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t tsdbUpgradeFileSet(STsdb *tsdb, SDFileSet *pDFileSet, TFileSetArray *fileSetArray) {
|
static int32_t tsdbUpgradeFileSet(STsdb *tsdb, SDFileSet *pDFileSet, TFileSetArray *fileSetArray) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
|
|
||||||
SDataFReader *reader;
|
SDataFReader *reader;
|
||||||
|
STFileSet *fset;
|
||||||
|
|
||||||
|
code = tsdbTFileSetInit(pDFileSet->fid, &fset);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
code = tsdbDataFReaderOpen(&reader, tsdb, pDFileSet);
|
code = tsdbDataFReaderOpen(&reader, tsdb, pDFileSet);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
// .head
|
// .head
|
||||||
{
|
code = tsdbUpgradeHead(tsdb, pDFileSet, reader, fset);
|
||||||
SArray *aBlockIdx = NULL;
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
SMapData mDataBlk[1] = {0};
|
|
||||||
SBrinBlock brinBlock[1] = {0};
|
|
||||||
TBrinBlkArray brinBlkArray[1] = {0};
|
|
||||||
|
|
||||||
if ((aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx))) == NULL) {
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
|
||||||
|
|
||||||
code = tsdbReadBlockIdx(reader, aBlockIdx);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
|
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(aBlockIdx); ++i) {
|
|
||||||
SBlockIdx *pBlockIdx = taosArrayGet(aBlockIdx, i);
|
|
||||||
|
|
||||||
code = tsdbReadDataBlk(reader, pBlockIdx, mDataBlk);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
|
|
||||||
for (int32_t j = 0; j < mDataBlk->nItem; ++j) {
|
|
||||||
SDataBlk dataBlk[1];
|
|
||||||
|
|
||||||
tMapDataGetItemByIdx(mDataBlk, j, dataBlk, tGetDataBlk);
|
|
||||||
|
|
||||||
SBrinRecord record = {
|
|
||||||
.suid = pBlockIdx->suid,
|
|
||||||
.uid = pBlockIdx->uid,
|
|
||||||
.firstKey = dataBlk->minKey.ts,
|
|
||||||
.firstKeyVer = dataBlk->minKey.version,
|
|
||||||
.lastKey = dataBlk->maxKey.ts,
|
|
||||||
.lastKeyVer = dataBlk->maxKey.version,
|
|
||||||
.minVer = dataBlk->minVer,
|
|
||||||
.maxVer = dataBlk->maxVer,
|
|
||||||
.blockOffset = dataBlk->aSubBlock->offset,
|
|
||||||
.smaOffset = dataBlk->smaInfo.offset,
|
|
||||||
.blockSize = dataBlk->aSubBlock->szBlock,
|
|
||||||
.blockKeySize = dataBlk->aSubBlock->szKey,
|
|
||||||
.smaSize = dataBlk->smaInfo.size,
|
|
||||||
.numRow = dataBlk->nRow,
|
|
||||||
.count = dataBlk->nRow,
|
|
||||||
};
|
|
||||||
|
|
||||||
if (dataBlk->hasDup) {
|
|
||||||
ASSERT(0);
|
|
||||||
// TODO: need to get count
|
|
||||||
// record.count = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
code = tBrinBlockPut(brinBlock, &record);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
|
|
||||||
if (BRIN_BLOCK_SIZE(brinBlock) >= tsdb->pVnode->config.tsdbCfg.maxRows) {
|
|
||||||
// TODO
|
|
||||||
tBrinBlockClear(brinBlock);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (BRIN_BLOCK_SIZE(brinBlock) > 0) {
|
|
||||||
// TODO
|
|
||||||
ASSERT(0);
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO
|
|
||||||
ASSERT(0);
|
|
||||||
|
|
||||||
TARRAY2_DESTROY(brinBlkArray, NULL);
|
|
||||||
tBrinBlockDestroy(brinBlock);
|
|
||||||
taosArrayDestroy(aBlockIdx);
|
|
||||||
tMapDataClear(mDataBlk);
|
|
||||||
}
|
|
||||||
|
|
||||||
// .data
|
// .data
|
||||||
|
code = tsdbUpgradeData(tsdb, pDFileSet, reader, fset);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
// .sma
|
// .sma
|
||||||
|
code = tsdbUpgradeSma(tsdb, pDFileSet, reader, fset);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
// .stt
|
// .stt
|
||||||
for (int32_t i = 0; i < pDFileSet->nSttF; ++i) {
|
if (pDFileSet->nSttF > 0) {
|
||||||
// TODO
|
code = tsdbUpgradeStt(tsdb, pDFileSet, reader, fset);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
}
|
}
|
||||||
|
|
||||||
tsdbDataFReaderClose(&reader);
|
tsdbDataFReaderClose(&reader);
|
||||||
|
|
||||||
|
code = TARRAY2_APPEND(fileSetArray, fset);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
if (code) {
|
if (code) {
|
||||||
TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code);
|
TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code);
|
||||||
|
@ -258,7 +383,30 @@ _exit:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tsdbDoUpgradeFileSystem(STsdb *tsdb, int8_t rollback) {
|
static int32_t tsdbDoUpgradeFileSystem(STsdb *tsdb, TFileSetArray *fileSetArray) {
|
||||||
|
int32_t code = 0;
|
||||||
|
int32_t lino = 0;
|
||||||
|
|
||||||
|
// upgrade each file set
|
||||||
|
for (int32_t i = 0; i < taosArrayGetSize(tsdb->fs.aDFileSet); i++) {
|
||||||
|
code = tsdbUpgradeFileSet(tsdb, taosArrayGet(tsdb->fs.aDFileSet, i), fileSetArray);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
}
|
||||||
|
|
||||||
|
// upgrade tomb file
|
||||||
|
if (tsdb->fs.pDelFile != NULL) {
|
||||||
|
code = tsdbUpgradeTombFile(tsdb, tsdb->fs.pDelFile, fileSetArray);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
}
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
if (code) {
|
||||||
|
TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code);
|
||||||
|
}
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t tsdbUpgradeFileSystem(STsdb *tsdb, int8_t rollback) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
|
|
||||||
|
@ -268,19 +416,8 @@ static int32_t tsdbDoUpgradeFileSystem(STsdb *tsdb, int8_t rollback) {
|
||||||
code = tsdbFSOpen(tsdb, rollback);
|
code = tsdbFSOpen(tsdb, rollback);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
// upgrade each file set
|
code = tsdbDoUpgradeFileSystem(tsdb, fileSetArray);
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(tsdb->fs.aDFileSet); i++) {
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
SDFileSet *pDFileSet = taosArrayGet(tsdb->fs.aDFileSet, i);
|
|
||||||
|
|
||||||
code = tsdbUpgradeFileSet(tsdb, pDFileSet, fileSetArray);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
|
||||||
|
|
||||||
// upgrade tomb file
|
|
||||||
if (tsdb->fs.pDelFile != NULL) {
|
|
||||||
code = tsdbUpgradeTombFile(tsdb, tsdb->fs.pDelFile, fileSetArray);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
|
||||||
|
|
||||||
// close file system
|
// close file system
|
||||||
code = tsdbFSClose(tsdb);
|
code = tsdbFSClose(tsdb);
|
||||||
|
@ -292,13 +429,11 @@ static int32_t tsdbDoUpgradeFileSystem(STsdb *tsdb, int8_t rollback) {
|
||||||
code = save_fs(fileSetArray, fname);
|
code = save_fs(fileSetArray, fname);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
// clear
|
|
||||||
TARRAY2_DESTROY(fileSetArray, tsdbTFileSetClear);
|
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
if (code) {
|
if (code) {
|
||||||
TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code);
|
TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code);
|
||||||
}
|
}
|
||||||
|
TARRAY2_DESTROY(fileSetArray, tsdbTFileSetClear);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -308,7 +443,7 @@ int32_t tsdbCheckAndUpgradeFileSystem(STsdb *tsdb, int8_t rollback) {
|
||||||
tsdbGetCurrentFName(tsdb, fname, NULL);
|
tsdbGetCurrentFName(tsdb, fname, NULL);
|
||||||
if (!taosCheckExistFile(fname)) return 0;
|
if (!taosCheckExistFile(fname)) return 0;
|
||||||
|
|
||||||
int32_t code = tsdbDoUpgradeFileSystem(tsdb, rollback);
|
int32_t code = tsdbUpgradeFileSystem(tsdb, rollback);
|
||||||
if (code) return code;
|
if (code) return code;
|
||||||
|
|
||||||
taosRemoveFile(fname);
|
taosRemoveFile(fname);
|
||||||
|
|
|
@ -14,6 +14,7 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "tsdb.h"
|
#include "tsdb.h"
|
||||||
|
#include "tsdbDataFileRW.h"
|
||||||
#include "tsdbDef.h"
|
#include "tsdbDef.h"
|
||||||
#include "tsdbFS2.h"
|
#include "tsdbFS2.h"
|
||||||
#include "tsdbUtil2.h"
|
#include "tsdbUtil2.h"
|
||||||
|
|
Loading…
Reference in New Issue