fix: upgrade bug
This commit is contained in:
parent
8384b6ce29
commit
2e88ad2c14
|
@ -22,11 +22,14 @@ int32_t tsdbSttLvlInit(int32_t level, SSttLvl **lvl) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void tsdbSttLvlClearFObj(void *data) { tsdbTFileObjUnref(*(STFileObj **)data); }
|
static void tsdbSttLvlClearFObj(void *data) { tsdbTFileObjUnref(*(STFileObj **)data); }
|
||||||
static int32_t tsdbSttLvlClear(SSttLvl **lvl) {
|
|
||||||
TARRAY2_DESTROY(lvl[0]->fobjArr, tsdbSttLvlClearFObj);
|
int32_t tsdbSttLvlClear(SSttLvl **lvl) {
|
||||||
taosMemoryFree(lvl[0]);
|
if (lvl[0] != NULL) {
|
||||||
lvl[0] = NULL;
|
TARRAY2_DESTROY(lvl[0]->fobjArr, tsdbSttLvlClearFObj);
|
||||||
|
taosMemoryFree(lvl[0]);
|
||||||
|
lvl[0] = NULL;
|
||||||
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -27,6 +27,7 @@ extern int32_t tsdbFileWriteBrinBlock(STsdbFD *fd, SBrinBlock *brinBlock, int8_t
|
||||||
extern int32_t tsdbFileWriteBrinBlk(STsdbFD *fd, TBrinBlkArray *brinBlkArray, SFDataPtr *ptr, int64_t *fileSize);
|
extern int32_t tsdbFileWriteBrinBlk(STsdbFD *fd, TBrinBlkArray *brinBlkArray, SFDataPtr *ptr, int64_t *fileSize);
|
||||||
extern int32_t tsdbFileWriteHeadFooter(STsdbFD *fd, int64_t *fileSize, const SHeadFooter *footer);
|
extern int32_t tsdbFileWriteHeadFooter(STsdbFD *fd, int64_t *fileSize, const SHeadFooter *footer);
|
||||||
extern int32_t tsdbSttLvlInit(int32_t level, SSttLvl **lvl);
|
extern int32_t tsdbSttLvlInit(int32_t level, SSttLvl **lvl);
|
||||||
|
extern int32_t tsdbSttLvlClear(SSttLvl **lvl);
|
||||||
extern int32_t tsdbFileWriteSttBlk(STsdbFD *fd, const TSttBlkArray *sttBlkArray, SFDataPtr *ptr, int64_t *fileSize);
|
extern int32_t tsdbFileWriteSttBlk(STsdbFD *fd, const TSttBlkArray *sttBlkArray, SFDataPtr *ptr, int64_t *fileSize);
|
||||||
extern int32_t tsdbFileWriteSttFooter(STsdbFD *fd, const SSttFooter *footer, int64_t *fileSize);
|
extern int32_t tsdbFileWriteSttFooter(STsdbFD *fd, const SSttFooter *footer, int64_t *fileSize);
|
||||||
extern int32_t tsdbFileWriteTombBlock(STsdbFD *fd, STombBlock *tombBlock, int8_t cmprAlg, int64_t *fileSize,
|
extern int32_t tsdbFileWriteTombBlock(STsdbFD *fd, STombBlock *tombBlock, int8_t cmprAlg, int64_t *fileSize,
|
||||||
|
@ -38,6 +39,7 @@ static int32_t tsdbUpgradeHead(STsdb *tsdb, SDFileSet *pDFileSet, SDataFReader *
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
|
|
||||||
|
// init
|
||||||
struct {
|
struct {
|
||||||
// config
|
// config
|
||||||
int32_t maxRow;
|
int32_t maxRow;
|
||||||
|
@ -59,6 +61,7 @@ static int32_t tsdbUpgradeHead(STsdb *tsdb, SDFileSet *pDFileSet, SDataFReader *
|
||||||
.szPage = tsdb->pVnode->config.tsdbPageSize,
|
.szPage = tsdb->pVnode->config.tsdbPageSize,
|
||||||
}};
|
}};
|
||||||
|
|
||||||
|
// read SBlockIdx array
|
||||||
if ((ctx->aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx))) == NULL) {
|
if ((ctx->aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx))) == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
@ -67,9 +70,8 @@ static int32_t tsdbUpgradeHead(STsdb *tsdb, SDFileSet *pDFileSet, SDataFReader *
|
||||||
code = tsdbReadBlockIdx(reader, ctx->aBlockIdx);
|
code = tsdbReadBlockIdx(reader, ctx->aBlockIdx);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
if (taosArrayGetSize(ctx->aBlockIdx) == 0) {
|
if (taosArrayGetSize(ctx->aBlockIdx) > 0) {
|
||||||
goto _exit;
|
// init/open file fd
|
||||||
} else {
|
|
||||||
STFile file = {
|
STFile file = {
|
||||||
.type = TSDB_FTYPE_HEAD,
|
.type = TSDB_FTYPE_HEAD,
|
||||||
.did = pDFileSet->diskId,
|
.did = pDFileSet->diskId,
|
||||||
|
@ -87,77 +89,78 @@ static int32_t tsdbUpgradeHead(STsdb *tsdb, SDFileSet *pDFileSet, SDataFReader *
|
||||||
|
|
||||||
code = tsdbOpenFile(fname, ctx->szPage, TD_FILE_READ | TD_FILE_WRITE, &ctx->fd);
|
code = tsdbOpenFile(fname, ctx->szPage, TD_FILE_READ | TD_FILE_WRITE, &ctx->fd);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
}
|
|
||||||
|
|
||||||
for (int32_t iBlockIdx = 0; iBlockIdx < taosArrayGetSize(ctx->aBlockIdx); ++iBlockIdx) {
|
// convert
|
||||||
SBlockIdx *pBlockIdx = taosArrayGet(ctx->aBlockIdx, iBlockIdx);
|
for (int32_t iBlockIdx = 0; iBlockIdx < taosArrayGetSize(ctx->aBlockIdx); ++iBlockIdx) {
|
||||||
|
SBlockIdx *pBlockIdx = taosArrayGet(ctx->aBlockIdx, iBlockIdx);
|
||||||
|
|
||||||
code = tsdbReadDataBlk(reader, pBlockIdx, ctx->mDataBlk);
|
code = tsdbReadDataBlk(reader, pBlockIdx, ctx->mDataBlk);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
|
|
||||||
for (int32_t iDataBlk = 0; iDataBlk < ctx->mDataBlk->nItem; ++iDataBlk) {
|
|
||||||
SDataBlk dataBlk[1];
|
|
||||||
tMapDataGetItemByIdx(ctx->mDataBlk, iDataBlk, dataBlk, tGetDataBlk);
|
|
||||||
|
|
||||||
SBrinRecord record = {
|
|
||||||
.suid = pBlockIdx->suid,
|
|
||||||
.uid = pBlockIdx->uid,
|
|
||||||
.firstKey = dataBlk->minKey.ts,
|
|
||||||
.firstKeyVer = dataBlk->minKey.version,
|
|
||||||
.lastKey = dataBlk->maxKey.ts,
|
|
||||||
.lastKeyVer = dataBlk->maxKey.version,
|
|
||||||
.minVer = dataBlk->minVer,
|
|
||||||
.maxVer = dataBlk->maxVer,
|
|
||||||
.blockOffset = dataBlk->aSubBlock->offset,
|
|
||||||
.smaOffset = dataBlk->smaInfo.offset,
|
|
||||||
.blockSize = dataBlk->aSubBlock->szBlock,
|
|
||||||
.blockKeySize = dataBlk->aSubBlock->szKey,
|
|
||||||
.smaSize = dataBlk->smaInfo.size,
|
|
||||||
.numRow = dataBlk->nRow,
|
|
||||||
.count = dataBlk->nRow,
|
|
||||||
};
|
|
||||||
|
|
||||||
if (dataBlk->hasDup) {
|
|
||||||
code = tsdbReadDataBlockEx(reader, dataBlk, ctx->blockData);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
|
|
||||||
record.count = 1;
|
|
||||||
for (int32_t i = 1; i < ctx->blockData->nRow; ++i) {
|
|
||||||
if (ctx->blockData->aTSKEY[i] != ctx->blockData->aTSKEY[i - 1]) {
|
|
||||||
record.count++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
code = tBrinBlockPut(ctx->brinBlock, &record);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
if (BRIN_BLOCK_SIZE(ctx->brinBlock) >= ctx->maxRow) {
|
for (int32_t iDataBlk = 0; iDataBlk < ctx->mDataBlk->nItem; ++iDataBlk) {
|
||||||
code = tsdbFileWriteBrinBlock(ctx->fd, ctx->brinBlock, ctx->cmprAlg, &fset->farr[TSDB_FTYPE_HEAD]->f->size,
|
SDataBlk dataBlk[1];
|
||||||
ctx->brinBlkArray, ctx->bufArr);
|
tMapDataGetItemByIdx(ctx->mDataBlk, iDataBlk, dataBlk, tGetDataBlk);
|
||||||
|
|
||||||
|
SBrinRecord record = {
|
||||||
|
.suid = pBlockIdx->suid,
|
||||||
|
.uid = pBlockIdx->uid,
|
||||||
|
.firstKey = dataBlk->minKey.ts,
|
||||||
|
.firstKeyVer = dataBlk->minKey.version,
|
||||||
|
.lastKey = dataBlk->maxKey.ts,
|
||||||
|
.lastKeyVer = dataBlk->maxKey.version,
|
||||||
|
.minVer = dataBlk->minVer,
|
||||||
|
.maxVer = dataBlk->maxVer,
|
||||||
|
.blockOffset = dataBlk->aSubBlock->offset,
|
||||||
|
.smaOffset = dataBlk->smaInfo.offset,
|
||||||
|
.blockSize = dataBlk->aSubBlock->szBlock,
|
||||||
|
.blockKeySize = dataBlk->aSubBlock->szKey,
|
||||||
|
.smaSize = dataBlk->smaInfo.size,
|
||||||
|
.numRow = dataBlk->nRow,
|
||||||
|
.count = dataBlk->nRow,
|
||||||
|
};
|
||||||
|
|
||||||
|
if (dataBlk->hasDup) {
|
||||||
|
code = tsdbReadDataBlockEx(reader, dataBlk, ctx->blockData);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
|
record.count = 1;
|
||||||
|
for (int32_t i = 1; i < ctx->blockData->nRow; ++i) {
|
||||||
|
if (ctx->blockData->aTSKEY[i] != ctx->blockData->aTSKEY[i - 1]) {
|
||||||
|
record.count++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
code = tBrinBlockPut(ctx->brinBlock, &record);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
|
if (BRIN_BLOCK_SIZE(ctx->brinBlock) >= ctx->maxRow) {
|
||||||
|
code = tsdbFileWriteBrinBlock(ctx->fd, ctx->brinBlock, ctx->cmprAlg, &fset->farr[TSDB_FTYPE_HEAD]->f->size,
|
||||||
|
ctx->brinBlkArray, ctx->bufArr);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
if (BRIN_BLOCK_SIZE(ctx->brinBlock) > 0) {
|
if (BRIN_BLOCK_SIZE(ctx->brinBlock) > 0) {
|
||||||
code = tsdbFileWriteBrinBlock(ctx->fd, ctx->brinBlock, ctx->cmprAlg, &fset->farr[TSDB_FTYPE_HEAD]->f->size,
|
code = tsdbFileWriteBrinBlock(ctx->fd, ctx->brinBlock, ctx->cmprAlg, &fset->farr[TSDB_FTYPE_HEAD]->f->size,
|
||||||
ctx->brinBlkArray, ctx->bufArr);
|
ctx->brinBlkArray, ctx->bufArr);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
}
|
||||||
|
|
||||||
|
code = tsdbFileWriteBrinBlk(ctx->fd, ctx->brinBlkArray, ctx->footer->brinBlkPtr,
|
||||||
|
&fset->farr[TSDB_FTYPE_HEAD]->f->size);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
|
code = tsdbFileWriteHeadFooter(ctx->fd, &fset->farr[TSDB_FTYPE_HEAD]->f->size, ctx->footer);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
|
code = tsdbFsyncFile(ctx->fd);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
|
tsdbCloseFile(&ctx->fd);
|
||||||
}
|
}
|
||||||
|
|
||||||
code =
|
|
||||||
tsdbFileWriteBrinBlk(ctx->fd, ctx->brinBlkArray, ctx->footer->brinBlkPtr, &fset->farr[TSDB_FTYPE_HEAD]->f->size);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
|
|
||||||
code = tsdbFileWriteHeadFooter(ctx->fd, &fset->farr[TSDB_FTYPE_HEAD]->f->size, ctx->footer);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
|
|
||||||
code = tsdbFsyncFile(ctx->fd);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
|
|
||||||
tsdbCloseFile(&ctx->fd);
|
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
if (code) {
|
if (code) {
|
||||||
TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code);
|
TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code);
|
||||||
|
@ -313,8 +316,12 @@ static int32_t tsdbUpgradeStt(STsdb *tsdb, SDFileSet *pDFileSet, SDataFReader *r
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
}
|
}
|
||||||
|
|
||||||
code = TARRAY2_APPEND(fset->lvlArr, lvl);
|
if (TARRAY2_SIZE(lvl->fobjArr) > 0) {
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
code = TARRAY2_APPEND(fset->lvlArr, lvl);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
} else {
|
||||||
|
tsdbSttLvlClear(&lvl);
|
||||||
|
}
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
if (code) {
|
if (code) {
|
||||||
|
|
Loading…
Reference in New Issue