From 5b1492cb4b6fdc626b5f805cf7356be8f96634f3 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Sun, 25 Jun 2023 15:09:15 +0800 Subject: [PATCH 1/8] refact code --- source/dnode/vnode/src/tsdb/tsdbDataFileRW.c | 172 ++++++------- source/dnode/vnode/src/tsdb/tsdbSttFileRW.c | 252 +++++++++++-------- source/dnode/vnode/src/tsdb/tsdbUpgrade.c | 13 +- 3 files changed, 230 insertions(+), 207 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c b/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c index a1c1de3d73..47d46ed6e2 100644 --- a/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c +++ b/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c @@ -25,6 +25,11 @@ typedef struct { SFDataPtr rsrvd[2]; } STombFooter; +extern int32_t tsdbFileDoWriteTombBlock(STsdbFD *fd, STombBlock *tombBlock, int8_t cmprAlg, int64_t *fileSize, + TTombBlkArray *tombBlkArray, uint8_t **bufArr); +extern int32_t tsdbFileDoWriteTombBlk(STsdbFD *fd, const TTombBlkArray *tombBlkArray, SFDataPtr *ptr, + int64_t *fileSize); + // SDataFileReader ============================================= struct SDataFileReader { SDataFileReaderConfig config[1]; @@ -644,81 +649,89 @@ _exit: return code; } -static int32_t tsdbDataFileWriteBrinBlock(SDataFileWriter *writer) { - if (BRIN_BLOCK_SIZE(writer->brinBlock) == 0) return 0; +int32_t tsdbFileWriteBrinBlock(STsdbFD *fd, SBrinBlock *brinBlock, int8_t cmprAlg, int64_t *fileSize, + TBrinBlkArray *brinBlkArray, uint8_t **bufArr) { + if (BRIN_BLOCK_SIZE(brinBlock) == 0) return 0; - int32_t code = 0; - int32_t lino = 0; + int32_t code; // get SBrinBlk SBrinBlk brinBlk[1] = { { .dp[0] = { - .offset = writer->files[TSDB_FTYPE_HEAD].size, + .offset = *fileSize, .size = 0, }, .minTbid = { - .suid = TARRAY2_FIRST(writer->brinBlock->suid), - .uid = TARRAY2_FIRST(writer->brinBlock->uid), + .suid = TARRAY2_FIRST(brinBlock->suid), + .uid = TARRAY2_FIRST(brinBlock->uid), }, .maxTbid = { - .suid = TARRAY2_LAST(writer->brinBlock->suid), - .uid = TARRAY2_LAST(writer->brinBlock->uid), + .suid = TARRAY2_LAST(brinBlock->suid), + .uid = TARRAY2_LAST(brinBlock->uid), }, - .minVer = TARRAY2_FIRST(writer->brinBlock->minVer), - .maxVer = TARRAY2_FIRST(writer->brinBlock->minVer), - .numRec = BRIN_BLOCK_SIZE(writer->brinBlock), - .cmprAlg = writer->config->cmprAlg, + .minVer = TARRAY2_FIRST(brinBlock->minVer), + .maxVer = TARRAY2_FIRST(brinBlock->minVer), + .numRec = BRIN_BLOCK_SIZE(brinBlock), + .cmprAlg = cmprAlg, }, }; - for (int32_t i = 1; i < BRIN_BLOCK_SIZE(writer->brinBlock); i++) { - if (brinBlk->minVer > TARRAY2_GET(writer->brinBlock->minVer, i)) { - brinBlk->minVer = TARRAY2_GET(writer->brinBlock->minVer, i); + for (int32_t i = 1; i < BRIN_BLOCK_SIZE(brinBlock); i++) { + if (brinBlk->minVer > TARRAY2_GET(brinBlock->minVer, i)) { + brinBlk->minVer = TARRAY2_GET(brinBlock->minVer, i); } - if (brinBlk->maxVer < TARRAY2_GET(writer->brinBlock->maxVer, i)) { - brinBlk->maxVer = TARRAY2_GET(writer->brinBlock->maxVer, i); + if (brinBlk->maxVer < TARRAY2_GET(brinBlock->maxVer, i)) { + brinBlk->maxVer = TARRAY2_GET(brinBlock->maxVer, i); } } // write to file - for (int32_t i = 0; i < ARRAY_SIZE(writer->brinBlock->dataArr1); i++) { - code = tsdbCmprData((uint8_t *)TARRAY2_DATA(writer->brinBlock->dataArr1 + i), - TARRAY2_DATA_LEN(writer->brinBlock->dataArr1 + i), TSDB_DATA_TYPE_BIGINT, brinBlk->cmprAlg, - &writer->config->bufArr[0], 0, &brinBlk->size[i], &writer->config->bufArr[1]); - TSDB_CHECK_CODE(code, lino, _exit); + for (int32_t i = 0; i < ARRAY_SIZE(brinBlock->dataArr1); i++) { + code = tsdbCmprData((uint8_t *)TARRAY2_DATA(brinBlock->dataArr1 + i), TARRAY2_DATA_LEN(brinBlock->dataArr1 + i), + TSDB_DATA_TYPE_BIGINT, brinBlk->cmprAlg, &bufArr[0], 0, &brinBlk->size[i], &bufArr[1]); + if (code) return code; - code = tsdbWriteFile(writer->fd[TSDB_FTYPE_HEAD], writer->files[TSDB_FTYPE_HEAD].size, writer->config->bufArr[0], - brinBlk->size[i]); - TSDB_CHECK_CODE(code, lino, _exit); + code = tsdbWriteFile(fd, *fileSize, bufArr[0], brinBlk->size[i]); + if (code) return code; brinBlk->dp->size += brinBlk->size[i]; - writer->files[TSDB_FTYPE_HEAD].size += brinBlk->size[i]; + *fileSize += brinBlk->size[i]; } - for (int32_t i = 0, j = ARRAY_SIZE(writer->brinBlock->dataArr1); i < ARRAY_SIZE(writer->brinBlock->dataArr2); - i++, j++) { - code = tsdbCmprData((uint8_t *)TARRAY2_DATA(writer->brinBlock->dataArr2 + i), - TARRAY2_DATA_LEN(writer->brinBlock->dataArr2 + i), TSDB_DATA_TYPE_INT, brinBlk->cmprAlg, - &writer->config->bufArr[0], 0, &brinBlk->size[j], &writer->config->bufArr[1]); - TSDB_CHECK_CODE(code, lino, _exit); + for (int32_t i = 0, j = ARRAY_SIZE(brinBlock->dataArr1); i < ARRAY_SIZE(brinBlock->dataArr2); i++, j++) { + code = tsdbCmprData((uint8_t *)TARRAY2_DATA(brinBlock->dataArr2 + i), TARRAY2_DATA_LEN(brinBlock->dataArr2 + i), + TSDB_DATA_TYPE_INT, brinBlk->cmprAlg, &bufArr[0], 0, &brinBlk->size[j], &bufArr[1]); + if (code) return code; - code = tsdbWriteFile(writer->fd[TSDB_FTYPE_HEAD], writer->files[TSDB_FTYPE_HEAD].size, writer->config->bufArr[0], - brinBlk->size[j]); - TSDB_CHECK_CODE(code, lino, _exit); + code = tsdbWriteFile(fd, *fileSize, bufArr[0], brinBlk->size[j]); + if (code) return code; brinBlk->dp->size += brinBlk->size[j]; - writer->files[TSDB_FTYPE_HEAD].size += brinBlk->size[j]; + *fileSize += brinBlk->size[j]; } // append to brinBlkArray - code = TARRAY2_APPEND_PTR(writer->brinBlkArray, brinBlk); - TSDB_CHECK_CODE(code, lino, _exit); + code = TARRAY2_APPEND_PTR(brinBlkArray, brinBlk); + if (code) return code; - tBrinBlockClear(writer->brinBlock); + tBrinBlockClear(brinBlock); + + return 0; +} + +static int32_t tsdbDataFileWriteBrinBlock(SDataFileWriter *writer) { + if (BRIN_BLOCK_SIZE(writer->brinBlock) == 0) return 0; + + int32_t code = 0; + int32_t lino = 0; + + code = tsdbFileWriteBrinBlock(writer->fd[TSDB_FTYPE_HEAD], writer->brinBlock, writer->config->cmprAlg, + &writer->files[TSDB_FTYPE_HEAD].size, writer->brinBlkArray, writer->config->bufArr); + TSDB_CHECK_CODE(code, lino, _exit); _exit: if (code) { @@ -1154,53 +1167,10 @@ static int32_t tsdbDataFileDoWriteTombBlock(SDataFileWriter *writer) { int32_t code = 0; int32_t lino = 0; - STombBlk tombBlk[1] = {{ - .numRec = TOMB_BLOCK_SIZE(writer->tombBlock), - .minTbid = - { - .suid = TARRAY2_FIRST(writer->tombBlock->suid), - .uid = TARRAY2_FIRST(writer->tombBlock->uid), - }, - .maxTbid = - { - .suid = TARRAY2_LAST(writer->tombBlock->suid), - .uid = TARRAY2_LAST(writer->tombBlock->uid), - }, - .minVer = TARRAY2_FIRST(writer->tombBlock->version), - .maxVer = TARRAY2_FIRST(writer->tombBlock->version), - .dp[0] = - { - .offset = writer->files[TSDB_FTYPE_TOMB].size, - .size = 0, - }, - }}; - - for (int32_t i = 1; i < TOMB_BLOCK_SIZE(writer->tombBlock); i++) { - tombBlk->minVer = TMIN(tombBlk->minVer, TARRAY2_GET(writer->tombBlock->version, i)); - tombBlk->maxVer = TMAX(tombBlk->maxVer, TARRAY2_GET(writer->tombBlock->version, i)); - } - - for (int32_t i = 0; i < ARRAY_SIZE(writer->tombBlock->dataArr); i++) { - int32_t size; - code = tsdbCmprData((uint8_t *)TARRAY2_DATA(&writer->tombBlock->dataArr[i]), - TARRAY2_DATA_LEN(&writer->tombBlock->dataArr[i]), TSDB_DATA_TYPE_BIGINT, TWO_STAGE_COMP, - &writer->config->bufArr[0], 0, &size, &writer->config->bufArr[1]); - TSDB_CHECK_CODE(code, lino, _exit); - - code = tsdbWriteFile(writer->fd[TSDB_FTYPE_TOMB], writer->files[TSDB_FTYPE_TOMB].size, writer->config->bufArr[0], - size); - TSDB_CHECK_CODE(code, lino, _exit); - - tombBlk->size[i] = size; - tombBlk->dp[0].size += size; - writer->files[TSDB_FTYPE_TOMB].size += size; - } - - code = TARRAY2_APPEND_PTR(writer->tombBlkArray, tombBlk); + code = tsdbFileDoWriteTombBlock(writer->fd[TSDB_FTYPE_TOMB], writer->tombBlock, writer->config->cmprAlg, + &writer->files[TSDB_FTYPE_TOMB].size, writer->tombBlkArray, writer->config->bufArr); TSDB_CHECK_CODE(code, lino, _exit); - tTombBlockClear(writer->tombBlock); - _exit: if (code) { TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code); @@ -1214,14 +1184,9 @@ static int32_t tsdbDataFileDoWriteTombBlk(SDataFileWriter *writer) { int32_t code = 0; int32_t lino = 0; - int32_t ftype = TSDB_FTYPE_TOMB; - writer->tombFooter->tombBlkPtr->offset = writer->files[ftype].size; - writer->tombFooter->tombBlkPtr->size = TARRAY2_DATA_LEN(writer->tombBlkArray); - - code = tsdbWriteFile(writer->fd[ftype], writer->tombFooter->tombBlkPtr->offset, - (const uint8_t *)TARRAY2_DATA(writer->tombBlkArray), writer->tombFooter->tombBlkPtr->size); + code = tsdbFileDoWriteTombBlk(writer->fd[TSDB_FTYPE_TOMB], writer->tombBlkArray, writer->tombFooter->tombBlkPtr, + &writer->files[TSDB_FTYPE_TOMB].size); TSDB_CHECK_CODE(code, lino, _exit); - writer->files[ftype].size += writer->tombFooter->tombBlkPtr->size; _exit: if (code) { @@ -1306,20 +1271,25 @@ _exit: return code; } -static int32_t tsdbDataFileWriteBrinBlk(SDataFileWriter *writer) { - ASSERT(TARRAY2_SIZE(writer->brinBlkArray) > 0); +int32_t tsdbFileWriteBrinBlk(STsdbFD *fd, TBrinBlkArray *brinBlkArray, SFDataPtr *ptr, int64_t *fileSize) { + ASSERT(TARRAY2_SIZE(brinBlkArray) > 0); + ptr->offset = *fileSize; + ptr->size = TARRAY2_DATA_LEN(brinBlkArray); + int32_t code = tsdbWriteFile(fd, ptr->offset, (uint8_t *)TARRAY2_DATA(brinBlkArray), ptr->size); + if (code) return code; + + *fileSize += ptr->size; + return 0; +} + +static int32_t tsdbDataFileWriteBrinBlk(SDataFileWriter *writer) { int32_t code = 0; int32_t lino = 0; - int32_t ftype = TSDB_FTYPE_HEAD; - writer->headFooter->brinBlkPtr->offset = writer->files[ftype].size; - writer->headFooter->brinBlkPtr->size = TARRAY2_DATA_LEN(writer->brinBlkArray); - - code = tsdbWriteFile(writer->fd[ftype], writer->headFooter->brinBlkPtr->offset, - (uint8_t *)TARRAY2_DATA(writer->brinBlkArray), writer->headFooter->brinBlkPtr->size); + code = tsdbFileWriteBrinBlk(writer->fd[TSDB_FTYPE_HEAD], writer->brinBlkArray, writer->headFooter->brinBlkPtr, + &writer->files[TSDB_FTYPE_HEAD].size); TSDB_CHECK_CODE(code, lino, _exit); - writer->files[ftype].size += writer->headFooter->brinBlkPtr->size; _exit: if (code) { diff --git a/source/dnode/vnode/src/tsdb/tsdbSttFileRW.c b/source/dnode/vnode/src/tsdb/tsdbSttFileRW.c index 97a2a8b478..326550bac2 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSttFileRW.c +++ b/source/dnode/vnode/src/tsdb/tsdbSttFileRW.c @@ -402,51 +402,64 @@ struct SSttFileWriter { uint8_t *bufArr[5]; }; +int32_t tsdbFileDoWriteBlockData(STsdbFD *fd, SBlockData *blockData, int8_t cmprAlg, int64_t *fileSize, + TSttBlkArray *sttBlkArray, uint8_t **bufArr) { + if (blockData->nRow == 0) return 0; + + int32_t code = 0; + + SSttBlk sttBlk[1] = {{ + .suid = blockData->suid, + .minUid = blockData->uid ? blockData->uid : blockData->aUid[0], + .maxUid = blockData->uid ? blockData->uid : blockData->aUid[blockData->nRow - 1], + .minKey = blockData->aTSKEY[0], + .maxKey = blockData->aTSKEY[0], + .minVer = blockData->aVersion[0], + .maxVer = blockData->aVersion[0], + .nRow = blockData->nRow, + }}; + + for (int32_t iRow = 1; iRow < blockData->nRow; iRow++) { + if (sttBlk->minKey > blockData->aTSKEY[iRow]) sttBlk->minKey = blockData->aTSKEY[iRow]; + if (sttBlk->maxKey < blockData->aTSKEY[iRow]) sttBlk->maxKey = blockData->aTSKEY[iRow]; + if (sttBlk->minVer > blockData->aVersion[iRow]) sttBlk->minVer = blockData->aVersion[iRow]; + if (sttBlk->maxVer < blockData->aVersion[iRow]) sttBlk->maxVer = blockData->aVersion[iRow]; + } + + int32_t sizeArr[5] = {0}; + code = tCmprBlockData(blockData, cmprAlg, NULL, NULL, bufArr, sizeArr); + if (code) return code; + + sttBlk->bInfo.offset = *fileSize; + sttBlk->bInfo.szKey = sizeArr[2] + sizeArr[3]; + sttBlk->bInfo.szBlock = sizeArr[0] + sizeArr[1] + sttBlk->bInfo.szKey; + + for (int32_t i = 3; i >= 0; i--) { + if (sizeArr[i]) { + code = tsdbWriteFile(fd, *fileSize, bufArr[i], sizeArr[i]); + if (code) return code; + *fileSize += sizeArr[i]; + } + } + + code = TARRAY2_APPEND_PTR(sttBlkArray, sttBlk); + if (code) return code; + + tBlockDataClear(blockData); + + return 0; +} + static int32_t tsdbSttFileDoWriteBlockData(SSttFileWriter *writer) { if (writer->blockData->nRow == 0) return 0; int32_t code = 0; int32_t lino = 0; - SSttBlk sttBlk[1] = {{ - .suid = writer->blockData->suid, - .minUid = writer->blockData->uid ? writer->blockData->uid : writer->blockData->aUid[0], - .maxUid = writer->blockData->uid ? writer->blockData->uid : writer->blockData->aUid[writer->blockData->nRow - 1], - .minKey = writer->blockData->aTSKEY[0], - .maxKey = writer->blockData->aTSKEY[0], - .minVer = writer->blockData->aVersion[0], - .maxVer = writer->blockData->aVersion[0], - .nRow = writer->blockData->nRow, - }}; - - for (int32_t iRow = 1; iRow < writer->blockData->nRow; iRow++) { - if (sttBlk->minKey > writer->blockData->aTSKEY[iRow]) sttBlk->minKey = writer->blockData->aTSKEY[iRow]; - if (sttBlk->maxKey < writer->blockData->aTSKEY[iRow]) sttBlk->maxKey = writer->blockData->aTSKEY[iRow]; - if (sttBlk->minVer > writer->blockData->aVersion[iRow]) sttBlk->minVer = writer->blockData->aVersion[iRow]; - if (sttBlk->maxVer < writer->blockData->aVersion[iRow]) sttBlk->maxVer = writer->blockData->aVersion[iRow]; - } - - int32_t sizeArr[5] = {0}; - code = tCmprBlockData(writer->blockData, writer->config->cmprAlg, NULL, NULL, writer->config->bufArr, sizeArr); + code = tsdbFileDoWriteBlockData(writer->fd, writer->blockData, writer->config->cmprAlg, &writer->file->size, + writer->sttBlkArray, writer->config->bufArr); TSDB_CHECK_CODE(code, lino, _exit); - sttBlk->bInfo.offset = writer->file->size; - sttBlk->bInfo.szKey = sizeArr[2] + sizeArr[3]; - sttBlk->bInfo.szBlock = sizeArr[0] + sizeArr[1] + sttBlk->bInfo.szKey; - - for (int32_t i = 3; i >= 0; i--) { - if (sizeArr[i]) { - code = tsdbWriteFile(writer->fd, writer->file->size, writer->config->bufArr[i], sizeArr[i]); - TSDB_CHECK_CODE(code, lino, _exit); - writer->file->size += sizeArr[i]; - } - } - - code = TARRAY2_APPEND_PTR(writer->sttBlkArray, sttBlk); - TSDB_CHECK_CODE(code, lino, _exit); - - tBlockDataClear(writer->blockData); - _exit: if (code) { TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code); @@ -516,61 +529,72 @@ _exit: return code; } +int32_t tsdbFileDoWriteTombBlock(STsdbFD *fd, STombBlock *tombBlock, int8_t cmprAlg, int64_t *fileSize, + TTombBlkArray *tombBlkArray, uint8_t **bufArr) { + int32_t code; + + if (TOMB_BLOCK_SIZE(tombBlock) == 0) return 0; + + STombBlk tombBlk[1] = {{ + .dp[0] = + { + .offset = *fileSize, + .size = 0, + }, + .minTbid = + { + .suid = TARRAY2_FIRST(tombBlock->suid), + .uid = TARRAY2_FIRST(tombBlock->uid), + }, + .maxTbid = + { + .suid = TARRAY2_LAST(tombBlock->suid), + .uid = TARRAY2_LAST(tombBlock->uid), + }, + .minVer = TARRAY2_FIRST(tombBlock->version), + .maxVer = TARRAY2_FIRST(tombBlock->version), + .numRec = TOMB_BLOCK_SIZE(tombBlock), + .cmprAlg = cmprAlg, + }}; + + for (int32_t i = 1; i < TOMB_BLOCK_SIZE(tombBlock); i++) { + if (tombBlk->minVer > TARRAY2_GET(tombBlock->version, i)) { + tombBlk->minVer = TARRAY2_GET(tombBlock->version, i); + } + if (tombBlk->maxVer < TARRAY2_GET(tombBlock->version, i)) { + tombBlk->maxVer = TARRAY2_GET(tombBlock->version, i); + } + } + + for (int32_t i = 0; i < ARRAY_SIZE(tombBlock->dataArr); i++) { + code = tsdbCmprData((uint8_t *)TARRAY2_DATA(&tombBlock->dataArr[i]), TARRAY2_DATA_LEN(&tombBlock->dataArr[i]), + TSDB_DATA_TYPE_BIGINT, tombBlk->cmprAlg, &bufArr[0], 0, &tombBlk->size[i], &bufArr[1]); + if (code) return code; + + code = tsdbWriteFile(fd, *fileSize, bufArr[0], tombBlk->size[i]); + if (code) return code; + + tombBlk->dp->size += tombBlk->size[i]; + *fileSize += tombBlk->size[i]; + } + + code = TARRAY2_APPEND_PTR(tombBlkArray, tombBlk); + if (code) return code; + + tTombBlockClear(tombBlock); + return 0; +} + static int32_t tsdbSttFileDoWriteTombBlock(SSttFileWriter *writer) { if (TOMB_BLOCK_SIZE(writer->tombBlock) == 0) return 0; int32_t code = 0; int32_t lino = 0; - STombBlk tombBlk[1] = {{ - .dp[0] = - { - .offset = writer->file->size, - .size = 0, - }, - .minTbid = - { - .suid = TARRAY2_FIRST(writer->tombBlock->suid), - .uid = TARRAY2_FIRST(writer->tombBlock->uid), - }, - .maxTbid = - { - .suid = TARRAY2_LAST(writer->tombBlock->suid), - .uid = TARRAY2_LAST(writer->tombBlock->uid), - }, - .minVer = TARRAY2_FIRST(writer->tombBlock->version), - .maxVer = TARRAY2_FIRST(writer->tombBlock->version), - .numRec = TOMB_BLOCK_SIZE(writer->tombBlock), - .cmprAlg = writer->config->cmprAlg, - }}; - - for (int32_t i = 1; i < TOMB_BLOCK_SIZE(writer->tombBlock); i++) { - if (tombBlk->minVer > TARRAY2_GET(writer->tombBlock->version, i)) { - tombBlk->minVer = TARRAY2_GET(writer->tombBlock->version, i); - } - if (tombBlk->maxVer < TARRAY2_GET(writer->tombBlock->version, i)) { - tombBlk->maxVer = TARRAY2_GET(writer->tombBlock->version, i); - } - } - - for (int32_t i = 0; i < ARRAY_SIZE(writer->tombBlock->dataArr); i++) { - code = tsdbCmprData((uint8_t *)TARRAY2_DATA(&writer->tombBlock->dataArr[i]), - TARRAY2_DATA_LEN(&writer->tombBlock->dataArr[i]), TSDB_DATA_TYPE_BIGINT, tombBlk->cmprAlg, - &writer->config->bufArr[0], 0, &tombBlk->size[i], &writer->config->bufArr[1]); - TSDB_CHECK_CODE(code, lino, _exit); - - code = tsdbWriteFile(writer->fd, writer->file->size, writer->config->bufArr[0], tombBlk->size[i]); - TSDB_CHECK_CODE(code, lino, _exit); - - tombBlk->dp->size += tombBlk->size[i]; - writer->file->size += tombBlk->size[i]; - } - - code = TARRAY2_APPEND_PTR(writer->tombBlkArray, tombBlk); + code = tsdbFileDoWriteTombBlock(writer->fd, writer->tombBlock, writer->config->cmprAlg, &writer->file->size, + writer->tombBlkArray, writer->config->bufArr); TSDB_CHECK_CODE(code, lino, _exit); - tTombBlockClear(writer->tombBlock); - _exit: if (code) { TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code); @@ -578,18 +602,27 @@ _exit: return code; } +int32_t tsdbFileDoWriteSttBlk(STsdbFD *fd, const TSttBlkArray *sttBlkArray, SFDataPtr *ptr, int64_t *fileSize) { + ptr->size = TARRAY2_DATA_LEN(sttBlkArray); + if (ptr->size > 0) { + ptr->offset = *fileSize; + + int32_t code = tsdbWriteFile(fd, *fileSize, (const uint8_t *)TARRAY2_DATA(sttBlkArray), ptr->size); + if (code) { + return code; + } + + *fileSize += ptr->size; + } + return 0; +} + static int32_t tsdbSttFileDoWriteSttBlk(SSttFileWriter *writer) { int32_t code = 0; int32_t lino; - writer->footer->sttBlkPtr->size = TARRAY2_DATA_LEN(writer->sttBlkArray); - if (writer->footer->sttBlkPtr->size) { - writer->footer->sttBlkPtr->offset = writer->file->size; - code = tsdbWriteFile(writer->fd, writer->file->size, (const uint8_t *)TARRAY2_DATA(writer->sttBlkArray), - writer->footer->sttBlkPtr->size); - TSDB_CHECK_CODE(code, lino, _exit); - writer->file->size += writer->footer->sttBlkPtr->size; - } + code = tsdbFileDoWriteSttBlk(writer->fd, writer->sttBlkArray, writer->footer->sttBlkPtr, &writer->file->size); + TSDB_CHECK_CODE(code, lino, _exit); _exit: if (code) { @@ -618,18 +651,27 @@ _exit: return code; } +int32_t tsdbFileDoWriteTombBlk(STsdbFD *fd, const TTombBlkArray *tombBlkArray, SFDataPtr *ptr, int64_t *fileSize) { + ptr->size = TARRAY2_DATA_LEN(tombBlkArray); + if (ptr->size > 0) { + ptr->offset = *fileSize; + + int32_t code = tsdbWriteFile(fd, *fileSize, (const uint8_t *)TARRAY2_DATA(tombBlkArray), ptr->size); + if (code) { + return code; + } + + *fileSize += ptr->size; + } + return 0; +} + static int32_t tsdbSttFileDoWriteTombBlk(SSttFileWriter *writer) { int32_t code = 0; int32_t lino = 0; - writer->footer->tombBlkPtr->size = TARRAY2_DATA_LEN(writer->tombBlkArray); - if (writer->footer->tombBlkPtr->size) { - writer->footer->tombBlkPtr->offset = writer->file->size; - code = tsdbWriteFile(writer->fd, writer->file->size, (const uint8_t *)TARRAY2_DATA(writer->tombBlkArray), - writer->footer->tombBlkPtr->size); - TSDB_CHECK_CODE(code, lino, _exit); - writer->file->size += writer->footer->tombBlkPtr->size; - } + code = tsdbFileDoWriteTombBlk(writer->fd, writer->tombBlkArray, writer->footer->tombBlkPtr, &writer->file->size); + TSDB_CHECK_CODE(code, lino, _exit); _exit: if (code) { @@ -638,13 +680,17 @@ _exit: return code; } -static int32_t tsdbSttFileDoWriteFooter(SSttFileWriter *writer) { - int32_t code = tsdbWriteFile(writer->fd, writer->file->size, (const uint8_t *)writer->footer, sizeof(writer->footer)); +int32_t tsdbSttFileDoWriteFooterImpl(STsdbFD *fd, const SSttFooter *footer, int64_t *fileSize) { + int32_t code = tsdbWriteFile(fd, *fileSize, (const uint8_t *)footer, sizeof(*footer)); if (code) return code; - writer->file->size += sizeof(writer->footer); + *fileSize += sizeof(*footer); return 0; } +static int32_t tsdbSttFileDoWriteFooter(SSttFileWriter *writer) { + return tsdbSttFileDoWriteFooterImpl(writer->fd, writer->footer, &writer->file->size); +} + static int32_t tsdbSttFWriterDoOpen(SSttFileWriter *writer) { int32_t code = 0; int32_t lino = 0; diff --git a/source/dnode/vnode/src/tsdb/tsdbUpgrade.c b/source/dnode/vnode/src/tsdb/tsdbUpgrade.c index e348e60e74..de78426da1 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUpgrade.c +++ b/source/dnode/vnode/src/tsdb/tsdbUpgrade.c @@ -202,11 +202,15 @@ static int32_t tsdbDumpTombDataToFSet(STsdb *tsdb, SDelFReader *reader, SArray * } if (fd) { + // write footer + + // sync and close code = tsdbFsyncFile(fd); TSDB_CHECK_CODE(code, lino, _exit); - tsdbCloseFile(&fd); } + + // clear TARRAY2_DESTROY(tombBlkArray, NULL); tTombBlockDestroy(tombBlock); taosArrayDestroy(aDelData); @@ -260,10 +264,11 @@ static int32_t tsdbDoUpgradeFileSystem(STsdb *tsdb, int8_t rollback) { TFileSetArray fileSetArray[1] = {0}; - // load old file system and convert + // open old file system code = tsdbFSOpen(tsdb, rollback); TSDB_CHECK_CODE(code, lino, _exit); + // upgrade each file set for (int32_t i = 0; i < taosArrayGetSize(tsdb->fs.aDFileSet); i++) { SDFileSet *pDFileSet = taosArrayGet(tsdb->fs.aDFileSet, i); @@ -271,21 +276,23 @@ static int32_t tsdbDoUpgradeFileSystem(STsdb *tsdb, int8_t rollback) { TSDB_CHECK_CODE(code, lino, _exit); } + // upgrade tomb file if (tsdb->fs.pDelFile != NULL) { code = tsdbUpgradeTombFile(tsdb, tsdb->fs.pDelFile, fileSetArray); TSDB_CHECK_CODE(code, lino, _exit); } + // close file system code = tsdbFSClose(tsdb); TSDB_CHECK_CODE(code, lino, _exit); // save new file system char fname[TSDB_FILENAME_LEN]; current_fname(tsdb, fname, TSDB_FCURRENT); - code = save_fs(fileSetArray, fname); TSDB_CHECK_CODE(code, lino, _exit); + // clear TARRAY2_DESTROY(fileSetArray, tsdbTFileSetClear); _exit: From 014bfc4ad43ce457574b2246f4d70cce072dec1b Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Sun, 25 Jun 2023 16:27:14 +0800 Subject: [PATCH 2/8] more code --- source/dnode/vnode/src/tsdb/tsdbDataFileRW.c | 22 +- source/dnode/vnode/src/tsdb/tsdbDataFileRW.h | 10 + source/dnode/vnode/src/tsdb/tsdbUpgrade.c | 321 +++++++++++++------ source/dnode/vnode/src/tsdb/tsdbUpgrade.h | 1 + 4 files changed, 247 insertions(+), 107 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c b/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c index 47d46ed6e2..16559c83ab 100644 --- a/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c +++ b/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c @@ -15,16 +15,6 @@ #include "tsdbDataFileRW.h" -typedef struct { - SFDataPtr brinBlkPtr[1]; - SFDataPtr rsrvd[2]; -} SHeadFooter; - -typedef struct { - SFDataPtr tombBlkPtr[1]; - SFDataPtr rsrvd[2]; -} STombFooter; - extern int32_t tsdbFileDoWriteTombBlock(STsdbFD *fd, STombBlock *tombBlock, int8_t cmprAlg, int64_t *fileSize, TTombBlkArray *tombBlkArray, uint8_t **bufArr); extern int32_t tsdbFileDoWriteTombBlk(STsdbFD *fd, const TTombBlkArray *tombBlkArray, SFDataPtr *ptr, @@ -1144,15 +1134,19 @@ _exit: return code; } +int32_t tsdbFileWriteHeadFooter(STsdbFD *fd, int64_t *fileSize, const SHeadFooter *footer) { + int32_t code = tsdbWriteFile(fd, *fileSize, (const uint8_t *)footer, sizeof(*footer)); + if (code) return code; + *fileSize += sizeof(*footer); + return 0; +} + static int32_t tsdbDataFileWriteHeadFooter(SDataFileWriter *writer) { int32_t code = 0; int32_t lino = 0; - int32_t ftype = TSDB_FTYPE_HEAD; - code = tsdbWriteFile(writer->fd[ftype], writer->files[ftype].size, (const uint8_t *)writer->headFooter, - sizeof(SHeadFooter)); + code = tsdbFileWriteHeadFooter(writer->fd[TSDB_FTYPE_HEAD], &writer->files[TSDB_FTYPE_HEAD].size, writer->headFooter); TSDB_CHECK_CODE(code, lino, _exit); - writer->files[ftype].size += sizeof(SHeadFooter); _exit: if (code) { diff --git a/source/dnode/vnode/src/tsdb/tsdbDataFileRW.h b/source/dnode/vnode/src/tsdb/tsdbDataFileRW.h index bd89cd7e2d..827b58fb4a 100644 --- a/source/dnode/vnode/src/tsdb/tsdbDataFileRW.h +++ b/source/dnode/vnode/src/tsdb/tsdbDataFileRW.h @@ -29,6 +29,16 @@ typedef TARRAY2(SBlockIdx) TBlockIdxArray; typedef TARRAY2(SDataBlk) TDataBlkArray; typedef TARRAY2(SColumnDataAgg) TColumnDataAggArray; +typedef struct { + SFDataPtr brinBlkPtr[1]; + SFDataPtr rsrvd[2]; +} SHeadFooter; + +typedef struct { + SFDataPtr tombBlkPtr[1]; + SFDataPtr rsrvd[2]; +} STombFooter; + // SDataFileReader ============================================= typedef struct SDataFileReader SDataFileReader; typedef struct SDataFileReaderConfig { diff --git a/source/dnode/vnode/src/tsdb/tsdbUpgrade.c b/source/dnode/vnode/src/tsdb/tsdbUpgrade.c index de78426da1..83db3717ae 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUpgrade.c +++ b/source/dnode/vnode/src/tsdb/tsdbUpgrade.c @@ -16,106 +16,231 @@ #include "tsdbUpgrade.h" // old -extern void tsdbGetCurrentFName(STsdb *pTsdb, char *current, char *current_t); +extern void tsdbGetCurrentFName(STsdb *pTsdb, char *current, char *current_t); +extern int32_t tsdbReadDataBlockEx(SDataFReader *pReader, SDataBlk *pDataBlk, SBlockData *pBlockData); // new extern int32_t save_fs(const TFileSetArray *arr, const char *fname); extern int32_t current_fname(STsdb *pTsdb, char *fname, EFCurrentT ftype); +extern int32_t tsdbFileWriteBrinBlock(STsdbFD *fd, SBrinBlock *brinBlock, int8_t cmprAlg, int64_t *fileSize, + TBrinBlkArray *brinBlkArray, uint8_t **bufArr); +extern int32_t tsdbFileWriteBrinBlk(STsdbFD *fd, TBrinBlkArray *brinBlkArray, SFDataPtr *ptr, int64_t *fileSize); +extern int32_t tsdbFileWriteHeadFooter(STsdbFD *fd, int64_t *fileSize, const SHeadFooter *footer); + +static int32_t tsdbUpgradeHead(STsdb *tsdb, SDFileSet *pDFileSet, SDataFReader *reader, STFileSet *fset) { + int32_t code = 0; + int32_t lino = 0; + + struct { + // config + int32_t maxRow; + int8_t cmprAlg; + int32_t szPage; + uint8_t *bufArr[8]; + // reader + SArray *aBlockIdx; + SMapData mDataBlk[1]; + SBlockData blockData[1]; + // writer + STsdbFD *fd; + SBrinBlock brinBlock[1]; + TBrinBlkArray brinBlkArray[1]; + SHeadFooter footer[1]; + } ctx[1] = {{ + .maxRow = tsdb->pVnode->config.tsdbCfg.maxRows, + .cmprAlg = tsdb->pVnode->config.tsdbCfg.compression, + .szPage = tsdb->pVnode->config.tsdbPageSize, + }}; + + if ((ctx->aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx))) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + + code = tsdbReadBlockIdx(reader, ctx->aBlockIdx); + TSDB_CHECK_CODE(code, lino, _exit); + + if (taosArrayGetSize(ctx->aBlockIdx) == 0) { + goto _exit; + } else { + STFile file = { + .type = TSDB_FTYPE_HEAD, + .did = pDFileSet->diskId, + .fid = fset->fid, + .cid = pDFileSet->pHeadF->commitID, + .size = pDFileSet->pHeadF->size, + }; + + code = tsdbTFileObjInit(tsdb, &file, &fset->farr[TSDB_FTYPE_HEAD]); + TSDB_CHECK_CODE(code, lino, _exit); + + // open fd + char fname[TSDB_FILENAME_LEN]; + tsdbTFileName(tsdb, &file, fname); + + code = tsdbOpenFile(fname, ctx->szPage, TD_FILE_READ | TD_FILE_WRITE, &ctx->fd); + TSDB_CHECK_CODE(code, lino, _exit); + } + + for (int32_t iBlockIdx = 0; iBlockIdx < taosArrayGetSize(ctx->aBlockIdx); ++iBlockIdx) { + SBlockIdx *pBlockIdx = taosArrayGet(ctx->aBlockIdx, iBlockIdx); + + code = tsdbReadDataBlk(reader, pBlockIdx, ctx->mDataBlk); + TSDB_CHECK_CODE(code, lino, _exit); + + for (int32_t iDataBlk = 0; iDataBlk < ctx->mDataBlk->nItem; ++iDataBlk) { + SDataBlk dataBlk[1]; + tMapDataGetItemByIdx(ctx->mDataBlk, iDataBlk, dataBlk, tGetDataBlk); + + SBrinRecord record = { + .suid = pBlockIdx->suid, + .uid = pBlockIdx->uid, + .firstKey = dataBlk->minKey.ts, + .firstKeyVer = dataBlk->minKey.version, + .lastKey = dataBlk->maxKey.ts, + .lastKeyVer = dataBlk->maxKey.version, + .minVer = dataBlk->minVer, + .maxVer = dataBlk->maxVer, + .blockOffset = dataBlk->aSubBlock->offset, + .smaOffset = dataBlk->smaInfo.offset, + .blockSize = dataBlk->aSubBlock->szBlock, + .blockKeySize = dataBlk->aSubBlock->szKey, + .smaSize = dataBlk->smaInfo.size, + .numRow = dataBlk->nRow, + .count = dataBlk->nRow, + }; + + if (dataBlk->hasDup) { + code = tsdbReadDataBlockEx(reader, dataBlk, ctx->blockData); + TSDB_CHECK_CODE(code, lino, _exit); + + record.count = 1; + for (int32_t i = 1; i < ctx->blockData->nRow; ++i) { + if (ctx->blockData->aTSKEY[i] != ctx->blockData->aTSKEY[i - 1]) { + record.count++; + } + } + } + + code = tBrinBlockPut(ctx->brinBlock, &record); + TSDB_CHECK_CODE(code, lino, _exit); + + if (BRIN_BLOCK_SIZE(ctx->brinBlock) >= ctx->maxRow) { + code = tsdbFileWriteBrinBlock(ctx->fd, ctx->brinBlock, ctx->cmprAlg, &fset->farr[TSDB_FTYPE_HEAD]->f->size, + ctx->brinBlkArray, ctx->bufArr); + TSDB_CHECK_CODE(code, lino, _exit); + } + } + } + + if (BRIN_BLOCK_SIZE(ctx->brinBlock) > 0) { + code = tsdbFileWriteBrinBlock(ctx->fd, ctx->brinBlock, ctx->cmprAlg, &fset->farr[TSDB_FTYPE_HEAD]->f->size, + ctx->brinBlkArray, ctx->bufArr); + TSDB_CHECK_CODE(code, lino, _exit); + } + + code = + tsdbFileWriteBrinBlk(ctx->fd, ctx->brinBlkArray, ctx->footer->brinBlkPtr, &fset->farr[TSDB_FTYPE_HEAD]->f->size); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tsdbFileWriteHeadFooter(ctx->fd, &fset->farr[TSDB_FTYPE_HEAD]->f->size, ctx->footer); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tsdbFsyncFile(ctx->fd); + TSDB_CHECK_CODE(code, lino, _exit); + + tsdbCloseFile(&ctx->fd); + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code); + } + TARRAY2_DESTROY(ctx->brinBlkArray, NULL); + tBrinBlockDestroy(ctx->brinBlock); + tBlockDataDestroy(ctx->blockData); + tMapDataClear(ctx->mDataBlk); + taosArrayDestroy(ctx->aBlockIdx); + for (int32_t i = 0; i < ARRAY_SIZE(ctx->bufArr); ++i) { + tFree(ctx->bufArr[i]); + } + return code; +} + +static int32_t tsdbUpgradeData(STsdb *tsdb, SDFileSet *pDFileSet, SDataFReader *reader, STFileSet *fset) { + int32_t code = 0; + int32_t lino = 0; + + // TODO + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code); + } + return code; +} + +static int32_t tsdbUpgradeSma(STsdb *tsdb, SDFileSet *pDFileSet, SDataFReader *reader, STFileSet *fset) { + int32_t code = 0; + int32_t lino = 0; + + // TODO + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code); + } + return code; +} + +static int32_t tsdbUpgradeStt(STsdb *tsdb, SDFileSet *pDFileSet, SDataFReader *reader, STFileSet *fset) { + int32_t code = 0; + int32_t lino = 0; + + // TODO + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code); + } + return code; +} static int32_t tsdbUpgradeFileSet(STsdb *tsdb, SDFileSet *pDFileSet, TFileSetArray *fileSetArray) { int32_t code = 0; int32_t lino = 0; SDataFReader *reader; + STFileSet *fset; + + code = tsdbTFileSetInit(pDFileSet->fid, &fset); + TSDB_CHECK_CODE(code, lino, _exit); code = tsdbDataFReaderOpen(&reader, tsdb, pDFileSet); TSDB_CHECK_CODE(code, lino, _exit); // .head - { - SArray *aBlockIdx = NULL; - SMapData mDataBlk[1] = {0}; - SBrinBlock brinBlock[1] = {0}; - TBrinBlkArray brinBlkArray[1] = {0}; - - if ((aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx))) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); - } - - code = tsdbReadBlockIdx(reader, aBlockIdx); - TSDB_CHECK_CODE(code, lino, _exit); - - for (int32_t i = 0; i < taosArrayGetSize(aBlockIdx); ++i) { - SBlockIdx *pBlockIdx = taosArrayGet(aBlockIdx, i); - - code = tsdbReadDataBlk(reader, pBlockIdx, mDataBlk); - TSDB_CHECK_CODE(code, lino, _exit); - - for (int32_t j = 0; j < mDataBlk->nItem; ++j) { - SDataBlk dataBlk[1]; - - tMapDataGetItemByIdx(mDataBlk, j, dataBlk, tGetDataBlk); - - SBrinRecord record = { - .suid = pBlockIdx->suid, - .uid = pBlockIdx->uid, - .firstKey = dataBlk->minKey.ts, - .firstKeyVer = dataBlk->minKey.version, - .lastKey = dataBlk->maxKey.ts, - .lastKeyVer = dataBlk->maxKey.version, - .minVer = dataBlk->minVer, - .maxVer = dataBlk->maxVer, - .blockOffset = dataBlk->aSubBlock->offset, - .smaOffset = dataBlk->smaInfo.offset, - .blockSize = dataBlk->aSubBlock->szBlock, - .blockKeySize = dataBlk->aSubBlock->szKey, - .smaSize = dataBlk->smaInfo.size, - .numRow = dataBlk->nRow, - .count = dataBlk->nRow, - }; - - if (dataBlk->hasDup) { - ASSERT(0); - // TODO: need to get count - // record.count = 0; - } - - code = tBrinBlockPut(brinBlock, &record); - TSDB_CHECK_CODE(code, lino, _exit); - - if (BRIN_BLOCK_SIZE(brinBlock) >= tsdb->pVnode->config.tsdbCfg.maxRows) { - // TODO - tBrinBlockClear(brinBlock); - } - } - } - - if (BRIN_BLOCK_SIZE(brinBlock) > 0) { - // TODO - ASSERT(0); - } - - // TODO - ASSERT(0); - - TARRAY2_DESTROY(brinBlkArray, NULL); - tBrinBlockDestroy(brinBlock); - taosArrayDestroy(aBlockIdx); - tMapDataClear(mDataBlk); - } + code = tsdbUpgradeHead(tsdb, pDFileSet, reader, fset); + TSDB_CHECK_CODE(code, lino, _exit); // .data + code = tsdbUpgradeData(tsdb, pDFileSet, reader, fset); + TSDB_CHECK_CODE(code, lino, _exit); // .sma + code = tsdbUpgradeSma(tsdb, pDFileSet, reader, fset); + TSDB_CHECK_CODE(code, lino, _exit); // .stt - for (int32_t i = 0; i < pDFileSet->nSttF; ++i) { - // TODO + if (pDFileSet->nSttF > 0) { + code = tsdbUpgradeStt(tsdb, pDFileSet, reader, fset); + TSDB_CHECK_CODE(code, lino, _exit); } tsdbDataFReaderClose(&reader); + code = TARRAY2_APPEND(fileSetArray, fset); + TSDB_CHECK_CODE(code, lino, _exit); + _exit: if (code) { TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code); @@ -258,7 +383,30 @@ _exit: return code; } -static int32_t tsdbDoUpgradeFileSystem(STsdb *tsdb, int8_t rollback) { +static int32_t tsdbDoUpgradeFileSystem(STsdb *tsdb, TFileSetArray *fileSetArray) { + int32_t code = 0; + int32_t lino = 0; + + // upgrade each file set + for (int32_t i = 0; i < taosArrayGetSize(tsdb->fs.aDFileSet); i++) { + code = tsdbUpgradeFileSet(tsdb, taosArrayGet(tsdb->fs.aDFileSet, i), fileSetArray); + TSDB_CHECK_CODE(code, lino, _exit); + } + + // upgrade tomb file + if (tsdb->fs.pDelFile != NULL) { + code = tsdbUpgradeTombFile(tsdb, tsdb->fs.pDelFile, fileSetArray); + TSDB_CHECK_CODE(code, lino, _exit); + } + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code); + } + return code; +} + +static int32_t tsdbUpgradeFileSystem(STsdb *tsdb, int8_t rollback) { int32_t code = 0; int32_t lino = 0; @@ -268,19 +416,8 @@ static int32_t tsdbDoUpgradeFileSystem(STsdb *tsdb, int8_t rollback) { code = tsdbFSOpen(tsdb, rollback); TSDB_CHECK_CODE(code, lino, _exit); - // upgrade each file set - for (int32_t i = 0; i < taosArrayGetSize(tsdb->fs.aDFileSet); i++) { - SDFileSet *pDFileSet = taosArrayGet(tsdb->fs.aDFileSet, i); - - code = tsdbUpgradeFileSet(tsdb, pDFileSet, fileSetArray); - TSDB_CHECK_CODE(code, lino, _exit); - } - - // upgrade tomb file - if (tsdb->fs.pDelFile != NULL) { - code = tsdbUpgradeTombFile(tsdb, tsdb->fs.pDelFile, fileSetArray); - TSDB_CHECK_CODE(code, lino, _exit); - } + code = tsdbDoUpgradeFileSystem(tsdb, fileSetArray); + TSDB_CHECK_CODE(code, lino, _exit); // close file system code = tsdbFSClose(tsdb); @@ -292,13 +429,11 @@ static int32_t tsdbDoUpgradeFileSystem(STsdb *tsdb, int8_t rollback) { code = save_fs(fileSetArray, fname); TSDB_CHECK_CODE(code, lino, _exit); - // clear - TARRAY2_DESTROY(fileSetArray, tsdbTFileSetClear); - _exit: if (code) { TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code); } + TARRAY2_DESTROY(fileSetArray, tsdbTFileSetClear); return code; } @@ -308,7 +443,7 @@ int32_t tsdbCheckAndUpgradeFileSystem(STsdb *tsdb, int8_t rollback) { tsdbGetCurrentFName(tsdb, fname, NULL); if (!taosCheckExistFile(fname)) return 0; - int32_t code = tsdbDoUpgradeFileSystem(tsdb, rollback); + int32_t code = tsdbUpgradeFileSystem(tsdb, rollback); if (code) return code; taosRemoveFile(fname); diff --git a/source/dnode/vnode/src/tsdb/tsdbUpgrade.h b/source/dnode/vnode/src/tsdb/tsdbUpgrade.h index 4dec009613..f9aac94e00 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUpgrade.h +++ b/source/dnode/vnode/src/tsdb/tsdbUpgrade.h @@ -14,6 +14,7 @@ */ #include "tsdb.h" +#include "tsdbDataFileRW.h" #include "tsdbDef.h" #include "tsdbFS2.h" #include "tsdbUtil2.h" From cc0727aee31652dc95a33336dffd114d5c284a86 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Sun, 25 Jun 2023 17:13:12 +0800 Subject: [PATCH 3/8] more code --- source/dnode/vnode/src/tsdb/tsdbFSet2.c | 2 +- source/dnode/vnode/src/tsdb/tsdbSttFileRW.c | 15 +-- source/dnode/vnode/src/tsdb/tsdbSttFileRW.h | 7 ++ source/dnode/vnode/src/tsdb/tsdbUpgrade.c | 120 +++++++++++++++++++- 4 files changed, 129 insertions(+), 15 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbFSet2.c b/source/dnode/vnode/src/tsdb/tsdbFSet2.c index 7cbbfcfef7..02edd6550c 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFSet2.c +++ b/source/dnode/vnode/src/tsdb/tsdbFSet2.c @@ -15,7 +15,7 @@ #include "tsdbFSet2.h" -static int32_t tsdbSttLvlInit(int32_t level, SSttLvl **lvl) { +int32_t tsdbSttLvlInit(int32_t level, SSttLvl **lvl) { if (!(lvl[0] = taosMemoryMalloc(sizeof(SSttLvl)))) return TSDB_CODE_OUT_OF_MEMORY; lvl[0]->level = level; TARRAY2_INIT(lvl[0]->fobjArr); diff --git a/source/dnode/vnode/src/tsdb/tsdbSttFileRW.c b/source/dnode/vnode/src/tsdb/tsdbSttFileRW.c index 326550bac2..674802a6ca 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSttFileRW.c +++ b/source/dnode/vnode/src/tsdb/tsdbSttFileRW.c @@ -15,13 +15,6 @@ #include "tsdbSttFileRW.h" -typedef struct { - SFDataPtr sttBlkPtr[1]; - SFDataPtr statisBlkPtr[1]; - SFDataPtr tombBlkPtr[1]; - SFDataPtr rsrvd[2]; -} SSttFooter; - // SSttFReader ============================================================ struct SSttFileReader { SSttFileReaderConfig config[1]; @@ -602,7 +595,7 @@ _exit: return code; } -int32_t tsdbFileDoWriteSttBlk(STsdbFD *fd, const TSttBlkArray *sttBlkArray, SFDataPtr *ptr, int64_t *fileSize) { +int32_t tsdbFileWriteSttBlk(STsdbFD *fd, const TSttBlkArray *sttBlkArray, SFDataPtr *ptr, int64_t *fileSize) { ptr->size = TARRAY2_DATA_LEN(sttBlkArray); if (ptr->size > 0) { ptr->offset = *fileSize; @@ -621,7 +614,7 @@ static int32_t tsdbSttFileDoWriteSttBlk(SSttFileWriter *writer) { int32_t code = 0; int32_t lino; - code = tsdbFileDoWriteSttBlk(writer->fd, writer->sttBlkArray, writer->footer->sttBlkPtr, &writer->file->size); + code = tsdbFileWriteSttBlk(writer->fd, writer->sttBlkArray, writer->footer->sttBlkPtr, &writer->file->size); TSDB_CHECK_CODE(code, lino, _exit); _exit: @@ -680,7 +673,7 @@ _exit: return code; } -int32_t tsdbSttFileDoWriteFooterImpl(STsdbFD *fd, const SSttFooter *footer, int64_t *fileSize) { +int32_t tsdbFileWriteSttFooter(STsdbFD *fd, const SSttFooter *footer, int64_t *fileSize) { int32_t code = tsdbWriteFile(fd, *fileSize, (const uint8_t *)footer, sizeof(*footer)); if (code) return code; *fileSize += sizeof(*footer); @@ -688,7 +681,7 @@ int32_t tsdbSttFileDoWriteFooterImpl(STsdbFD *fd, const SSttFooter *footer, int6 } static int32_t tsdbSttFileDoWriteFooter(SSttFileWriter *writer) { - return tsdbSttFileDoWriteFooterImpl(writer->fd, writer->footer, &writer->file->size); + return tsdbFileWriteSttFooter(writer->fd, writer->footer, &writer->file->size); } static int32_t tsdbSttFWriterDoOpen(SSttFileWriter *writer) { diff --git a/source/dnode/vnode/src/tsdb/tsdbSttFileRW.h b/source/dnode/vnode/src/tsdb/tsdbSttFileRW.h index bc9b784e16..242b55795c 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSttFileRW.h +++ b/source/dnode/vnode/src/tsdb/tsdbSttFileRW.h @@ -26,6 +26,13 @@ extern "C" { typedef TARRAY2(SSttBlk) TSttBlkArray; typedef TARRAY2(SStatisBlk) TStatisBlkArray; +typedef struct { + SFDataPtr sttBlkPtr[1]; + SFDataPtr statisBlkPtr[1]; + SFDataPtr tombBlkPtr[1]; + SFDataPtr rsrvd[2]; +} SSttFooter; + // SSttFileReader ========================================== typedef struct SSttFileReader SSttFileReader; typedef struct SSttFileReaderConfig SSttFileReaderConfig; diff --git a/source/dnode/vnode/src/tsdb/tsdbUpgrade.c b/source/dnode/vnode/src/tsdb/tsdbUpgrade.c index 83db3717ae..fdb3183c20 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUpgrade.c +++ b/source/dnode/vnode/src/tsdb/tsdbUpgrade.c @@ -26,6 +26,9 @@ extern int32_t tsdbFileWriteBrinBlock(STsdbFD *fd, SBrinBlock *brinBlock, int8_t TBrinBlkArray *brinBlkArray, uint8_t **bufArr); extern int32_t tsdbFileWriteBrinBlk(STsdbFD *fd, TBrinBlkArray *brinBlkArray, SFDataPtr *ptr, int64_t *fileSize); extern int32_t tsdbFileWriteHeadFooter(STsdbFD *fd, int64_t *fileSize, const SHeadFooter *footer); +extern int32_t tsdbSttLvlInit(int32_t level, SSttLvl **lvl); +extern int32_t tsdbFileWriteSttBlk(STsdbFD *fd, const TSttBlkArray *sttBlkArray, SFDataPtr *ptr, int64_t *fileSize); +extern int32_t tsdbFileWriteSttFooter(STsdbFD *fd, const SSttFooter *footer, int64_t *fileSize); static int32_t tsdbUpgradeHead(STsdb *tsdb, SDFileSet *pDFileSet, SDataFReader *reader, STFileSet *fset) { int32_t code = 0; @@ -170,7 +173,20 @@ static int32_t tsdbUpgradeData(STsdb *tsdb, SDFileSet *pDFileSet, SDataFReader * int32_t code = 0; int32_t lino = 0; - // TODO + if (fset->farr[TSDB_FTYPE_HEAD] == NULL) { + return 0; + } + + STFile file = { + .type = TSDB_FTYPE_DATA, + .did = pDFileSet->diskId, + .fid = fset->fid, + .cid = pDFileSet->pDataF->commitID, + .size = pDFileSet->pDataF->size, + }; + + code = tsdbTFileObjInit(tsdb, &file, &fset->farr[TSDB_FTYPE_DATA]); + TSDB_CHECK_CODE(code, lino, _exit); _exit: if (code) { @@ -183,7 +199,20 @@ static int32_t tsdbUpgradeSma(STsdb *tsdb, SDFileSet *pDFileSet, SDataFReader *r int32_t code = 0; int32_t lino = 0; - // TODO + if (fset->farr[TSDB_FTYPE_HEAD] == NULL) { + return 0; + } + + STFile file = { + .type = TSDB_FTYPE_SMA, + .did = pDFileSet->diskId, + .fid = fset->fid, + .cid = pDFileSet->pSmaF->commitID, + .size = pDFileSet->pSmaF->size, + }; + + code = tsdbTFileObjInit(tsdb, &file, &fset->farr[TSDB_FTYPE_SMA]); + TSDB_CHECK_CODE(code, lino, _exit); _exit: if (code) { @@ -192,11 +221,96 @@ _exit: return code; } +static int32_t tsdbUpgradeSttFile(STsdb *tsdb, SDFileSet *pDFileSet, SDataFReader *reader, STFileSet *fset, + int32_t iStt, SSttLvl *lvl) { + int32_t code = 0; + int32_t lino = 0; + + SArray *aSttBlk = taosArrayInit(0, sizeof(SSttBlk)); + if (aSttBlk == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + + code = tsdbReadSttBlk(reader, iStt, aSttBlk); + TSDB_CHECK_CODE(code, lino, _exit); + + if (taosArrayGetSize(aSttBlk) > 0) { + SSttFile *pSttF = pDFileSet->aSttF[iStt]; + STFileObj *fobj; + struct { + int32_t szPage; + // writer + STsdbFD *fd; + TSttBlkArray sttBlkArray[1]; + SSttFooter footer[1]; + } ctx[1] = {{ + .szPage = tsdb->pVnode->config.tsdbPageSize, + }}; + + STFile file = { + .type = TSDB_FTYPE_STT, + .did = pDFileSet->diskId, + .fid = fset->fid, + .cid = pSttF->commitID, + .size = pSttF->size, + }; + code = tsdbTFileObjInit(tsdb, &file, &fobj); + TSDB_CHECK_CODE(code, lino, _exit1); + + code = tsdbOpenFile(fobj->fname, ctx->szPage, TD_FILE_READ | TD_FILE_WRITE, &ctx->fd); + TSDB_CHECK_CODE(code, lino, _exit1); + + for (int32_t iSttBlk = 0; iSttBlk < taosArrayGetSize(aSttBlk); iSttBlk++) { + code = TARRAY2_APPEND_PTR(ctx->sttBlkArray, (SSttBlk *)taosArrayGet(aSttBlk, iSttBlk)); + TSDB_CHECK_CODE(code, lino, _exit1); + } + + code = tsdbFileWriteSttBlk(ctx->fd, ctx->sttBlkArray, ctx->footer->sttBlkPtr, &fobj->f->size); + TSDB_CHECK_CODE(code, lino, _exit1); + + code = tsdbFileWriteSttFooter(ctx->fd, ctx->footer, &fobj->f->size); + TSDB_CHECK_CODE(code, lino, _exit1); + + code = tsdbFsyncFile(ctx->fd); + TSDB_CHECK_CODE(code, lino, _exit1); + + tsdbCloseFile(&ctx->fd); + + code = TARRAY2_APPEND(lvl->fobjArr, fobj); + TSDB_CHECK_CODE(code, lino, _exit1); + + _exit1: + TARRAY2_DESTROY(ctx->sttBlkArray, NULL); + } + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code); + } + taosArrayDestroy(aSttBlk); + return code; +} + static int32_t tsdbUpgradeStt(STsdb *tsdb, SDFileSet *pDFileSet, SDataFReader *reader, STFileSet *fset) { int32_t code = 0; int32_t lino = 0; - // TODO + if (pDFileSet->nSttF == 0) { + return 0; + } + + SSttLvl *lvl; + code = tsdbSttLvlInit(0, &lvl); + TSDB_CHECK_CODE(code, lino, _exit); + + for (int32_t iStt = 0; iStt < pDFileSet->nSttF; ++iStt) { + code = tsdbUpgradeSttFile(tsdb, pDFileSet, reader, fset, iStt, lvl); + TSDB_CHECK_CODE(code, lino, _exit); + } + + code = TARRAY2_APPEND(fset->lvlArr, lvl); + TSDB_CHECK_CODE(code, lino, _exit); _exit: if (code) { From b881b87218ccdacff1cc305c47c8d08d524e784b Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Sun, 25 Jun 2023 18:02:31 +0800 Subject: [PATCH 4/8] more code --- source/dnode/vnode/src/tsdb/tsdbDataFileRW.c | 26 +-- source/dnode/vnode/src/tsdb/tsdbSttFileRW.c | 12 +- source/dnode/vnode/src/tsdb/tsdbUpgrade.c | 190 +++++++++++++------ 3 files changed, 150 insertions(+), 78 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c b/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c index 16559c83ab..7f5d9c83f4 100644 --- a/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c +++ b/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c @@ -15,10 +15,9 @@ #include "tsdbDataFileRW.h" -extern int32_t tsdbFileDoWriteTombBlock(STsdbFD *fd, STombBlock *tombBlock, int8_t cmprAlg, int64_t *fileSize, - TTombBlkArray *tombBlkArray, uint8_t **bufArr); -extern int32_t tsdbFileDoWriteTombBlk(STsdbFD *fd, const TTombBlkArray *tombBlkArray, SFDataPtr *ptr, - int64_t *fileSize); +extern int32_t tsdbFileWriteTombBlock(STsdbFD *fd, STombBlock *tombBlock, int8_t cmprAlg, int64_t *fileSize, + TTombBlkArray *tombBlkArray, uint8_t **bufArr); +extern int32_t tsdbFileWriteTombBlk(STsdbFD *fd, const TTombBlkArray *tombBlkArray, SFDataPtr *ptr, int64_t *fileSize); // SDataFileReader ============================================= struct SDataFileReader { @@ -1161,8 +1160,8 @@ static int32_t tsdbDataFileDoWriteTombBlock(SDataFileWriter *writer) { int32_t code = 0; int32_t lino = 0; - code = tsdbFileDoWriteTombBlock(writer->fd[TSDB_FTYPE_TOMB], writer->tombBlock, writer->config->cmprAlg, - &writer->files[TSDB_FTYPE_TOMB].size, writer->tombBlkArray, writer->config->bufArr); + code = tsdbFileWriteTombBlock(writer->fd[TSDB_FTYPE_TOMB], writer->tombBlock, writer->config->cmprAlg, + &writer->files[TSDB_FTYPE_TOMB].size, writer->tombBlkArray, writer->config->bufArr); TSDB_CHECK_CODE(code, lino, _exit); _exit: @@ -1178,8 +1177,8 @@ static int32_t tsdbDataFileDoWriteTombBlk(SDataFileWriter *writer) { int32_t code = 0; int32_t lino = 0; - code = tsdbFileDoWriteTombBlk(writer->fd[TSDB_FTYPE_TOMB], writer->tombBlkArray, writer->tombFooter->tombBlkPtr, - &writer->files[TSDB_FTYPE_TOMB].size); + code = tsdbFileWriteTombBlk(writer->fd[TSDB_FTYPE_TOMB], writer->tombBlkArray, writer->tombFooter->tombBlkPtr, + &writer->files[TSDB_FTYPE_TOMB].size); TSDB_CHECK_CODE(code, lino, _exit); _exit: @@ -1189,14 +1188,19 @@ _exit: return code; } +int32_t tsdbFileWriteTombFooter(STsdbFD *fd, const STombFooter *footer, int64_t *fileSize) { + int32_t code = tsdbWriteFile(fd, *fileSize, (const uint8_t *)footer, sizeof(*footer)); + if (code) return code; + *fileSize += sizeof(*footer); + return 0; +} + static int32_t tsdbDataFileWriteTombFooter(SDataFileWriter *writer) { int32_t code = 0; int32_t lino = 0; - code = tsdbWriteFile(writer->fd[TSDB_FTYPE_TOMB], writer->files[TSDB_FTYPE_TOMB].size, - (const uint8_t *)writer->tombFooter, sizeof(STombFooter)); + code = tsdbFileWriteTombFooter(writer->fd[TSDB_FTYPE_TOMB], writer->tombFooter, &writer->files[TSDB_FTYPE_TOMB].size); TSDB_CHECK_CODE(code, lino, _exit); - writer->files[TSDB_FTYPE_TOMB].size += sizeof(STombFooter); _exit: if (code) { diff --git a/source/dnode/vnode/src/tsdb/tsdbSttFileRW.c b/source/dnode/vnode/src/tsdb/tsdbSttFileRW.c index 674802a6ca..b09ab71e08 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSttFileRW.c +++ b/source/dnode/vnode/src/tsdb/tsdbSttFileRW.c @@ -522,8 +522,8 @@ _exit: return code; } -int32_t tsdbFileDoWriteTombBlock(STsdbFD *fd, STombBlock *tombBlock, int8_t cmprAlg, int64_t *fileSize, - TTombBlkArray *tombBlkArray, uint8_t **bufArr) { +int32_t tsdbFileWriteTombBlock(STsdbFD *fd, STombBlock *tombBlock, int8_t cmprAlg, int64_t *fileSize, + TTombBlkArray *tombBlkArray, uint8_t **bufArr) { int32_t code; if (TOMB_BLOCK_SIZE(tombBlock) == 0) return 0; @@ -584,8 +584,8 @@ static int32_t tsdbSttFileDoWriteTombBlock(SSttFileWriter *writer) { int32_t code = 0; int32_t lino = 0; - code = tsdbFileDoWriteTombBlock(writer->fd, writer->tombBlock, writer->config->cmprAlg, &writer->file->size, - writer->tombBlkArray, writer->config->bufArr); + code = tsdbFileWriteTombBlock(writer->fd, writer->tombBlock, writer->config->cmprAlg, &writer->file->size, + writer->tombBlkArray, writer->config->bufArr); TSDB_CHECK_CODE(code, lino, _exit); _exit: @@ -644,7 +644,7 @@ _exit: return code; } -int32_t tsdbFileDoWriteTombBlk(STsdbFD *fd, const TTombBlkArray *tombBlkArray, SFDataPtr *ptr, int64_t *fileSize) { +int32_t tsdbFileWriteTombBlk(STsdbFD *fd, const TTombBlkArray *tombBlkArray, SFDataPtr *ptr, int64_t *fileSize) { ptr->size = TARRAY2_DATA_LEN(tombBlkArray); if (ptr->size > 0) { ptr->offset = *fileSize; @@ -663,7 +663,7 @@ static int32_t tsdbSttFileDoWriteTombBlk(SSttFileWriter *writer) { int32_t code = 0; int32_t lino = 0; - code = tsdbFileDoWriteTombBlk(writer->fd, writer->tombBlkArray, writer->footer->tombBlkPtr, &writer->file->size); + code = tsdbFileWriteTombBlk(writer->fd, writer->tombBlkArray, writer->footer->tombBlkPtr, &writer->file->size); TSDB_CHECK_CODE(code, lino, _exit); _exit: diff --git a/source/dnode/vnode/src/tsdb/tsdbUpgrade.c b/source/dnode/vnode/src/tsdb/tsdbUpgrade.c index fdb3183c20..8efe87ef62 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUpgrade.c +++ b/source/dnode/vnode/src/tsdb/tsdbUpgrade.c @@ -29,6 +29,10 @@ extern int32_t tsdbFileWriteHeadFooter(STsdbFD *fd, int64_t *fileSize, const SHe extern int32_t tsdbSttLvlInit(int32_t level, SSttLvl **lvl); extern int32_t tsdbFileWriteSttBlk(STsdbFD *fd, const TSttBlkArray *sttBlkArray, SFDataPtr *ptr, int64_t *fileSize); extern int32_t tsdbFileWriteSttFooter(STsdbFD *fd, const SSttFooter *footer, int64_t *fileSize); +extern int32_t tsdbFileWriteTombBlock(STsdbFD *fd, STombBlock *tombBlock, int8_t cmprAlg, int64_t *fileSize, + TTombBlkArray *tombBlkArray, uint8_t **bufArr); +extern int32_t tsdbFileWriteTombBlk(STsdbFD *fd, const TTombBlkArray *tombBlkArray, SFDataPtr *ptr, int64_t *fileSize); +extern int32_t tsdbFileWriteTombFooter(STsdbFD *fd, const STombFooter *footer, int64_t *fileSize); static int32_t tsdbUpgradeHead(STsdb *tsdb, SDFileSet *pDFileSet, SDataFReader *reader, STFileSet *fset) { int32_t code = 0; @@ -362,35 +366,102 @@ _exit: return code; } +static int32_t tsdbUpgradeOpenTombFile(STsdb *tsdb, STFileSet *fset, STsdbFD **fd, STFileObj **fobj, bool *toStt) { + int32_t code = 0; + int32_t lino = 0; + + if (TARRAY2_SIZE(fset->lvlArr) == 0) { // to .tomb file + *toStt = false; + + STFile file = { + .type = TSDB_FTYPE_TOMB, + .did = fset->farr[TSDB_FTYPE_HEAD]->f->did, + .fid = fset->fid, + .cid = 0, + .size = 0, + }; + + code = tsdbTFileObjInit(tsdb, &file, fobj); + TSDB_CHECK_CODE(code, lino, _exit); + + fset->farr[TSDB_FTYPE_TOMB] = *fobj; + } else { // to .stt file + *toStt = true; + SSttLvl *lvl = TARRAY2_GET(fset->lvlArr, 0); + + STFile file = { + .type = TSDB_FTYPE_STT, + .did = TARRAY2_GET(lvl->fobjArr, 0)->f->did, + .fid = fset->fid, + .cid = 0, + .size = 0, + }; + + code = tsdbTFileObjInit(tsdb, &file, fobj); + TSDB_CHECK_CODE(code, lino, _exit); + + code = TARRAY2_APPEND(lvl->fobjArr, fobj[0]); + TSDB_CHECK_CODE(code, lino, _exit); + } + + char fname[TSDB_FILENAME_LEN] = {0}; + code = tsdbOpenFile(fobj[0]->fname, tsdb->pVnode->config.tsdbPageSize, + TD_FILE_READ | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_CREATE, fd); + TSDB_CHECK_CODE(code, lino, _exit); + + uint8_t hdr[TSDB_FHDR_SIZE] = {0}; + code = tsdbWriteFile(fd[0], 0, hdr, TSDB_FHDR_SIZE); + TSDB_CHECK_CODE(code, lino, _exit); + fobj[0]->f->size += TSDB_FHDR_SIZE; + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code); + } + return code; +} + static int32_t tsdbDumpTombDataToFSet(STsdb *tsdb, SDelFReader *reader, SArray *aDelIdx, STFileSet *fset) { int32_t code = 0; int32_t lino = 0; - SArray *aDelData = NULL; - int64_t minKey, maxKey; - STombBlock tombBlock[1] = {0}; - TTombBlkArray tombBlkArray[1] = {0}; - STsdbFD *fd = NULL; + struct { + // context + bool toStt; + int8_t cmprAlg; + int32_t maxRow; + int64_t minKey; + int64_t maxKey; + uint8_t *bufArr[8]; + // reader + SArray *aDelData; + // writer + STsdbFD *fd; + STFileObj *fobj; + STombBlock tombBlock[1]; + TTombBlkArray tombBlkArray[1]; + STombFooter tombFooter[1]; + SSttFooter sttFooter[1]; + } ctx[1] = {{ + .maxRow = tsdb->pVnode->config.tsdbCfg.maxRows, + .cmprAlg = tsdb->pVnode->config.tsdbCfg.compression, + }}; - tsdbFidKeyRange(fset->fid, tsdb->keepCfg.days, tsdb->keepCfg.precision, &minKey, &maxKey); + tsdbFidKeyRange(fset->fid, tsdb->keepCfg.days, tsdb->keepCfg.precision, &ctx->minKey, &ctx->maxKey); - if ((aDelData = taosArrayInit(0, sizeof(SDelData))) == NULL) { + if ((ctx->aDelData = taosArrayInit(0, sizeof(SDelData))) == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; TSDB_CHECK_CODE(code, lino, _exit); } - for (int32_t i = 0; i < taosArrayGetSize(aDelIdx); ++i) { - SDelIdx *pDelIdx = taosArrayGet(aDelIdx, i); + for (int32_t iDelIdx = 0; iDelIdx < taosArrayGetSize(aDelIdx); iDelIdx++) { + SDelIdx *pDelIdx = (SDelIdx *)taosArrayGet(aDelIdx, iDelIdx); - code = tsdbReadDelData(reader, pDelIdx, aDelData); + code = tsdbReadDelData(reader, pDelIdx, ctx->aDelData); TSDB_CHECK_CODE(code, lino, _exit); - for (int32_t j = 0; j < taosArrayGetSize(aDelData); ++j) { - SDelData *pDelData = taosArrayGet(aDelData, j); - - if (pDelData->sKey > maxKey || pDelData->eKey < minKey) { - continue; - } + for (int32_t iDelData = 0; iDelData < taosArrayGetSize(ctx->aDelData); iDelData++) { + SDelData *pDelData = (SDelData *)taosArrayGet(ctx->aDelData, iDelData); STombRecord record = { .suid = pDelIdx->suid, @@ -400,64 +471,62 @@ static int32_t tsdbDumpTombDataToFSet(STsdb *tsdb, SDelFReader *reader, SArray * .ekey = pDelData->eKey, }; - code = tTombBlockPut(tombBlock, &record); + code = tTombBlockPut(ctx->tombBlock, &record); TSDB_CHECK_CODE(code, lino, _exit); - if (TOMB_BLOCK_SIZE(tombBlock) >= tsdb->pVnode->config.tsdbCfg.maxRows) { - if (fd == NULL) { - STFile file = { - .type = TSDB_FTYPE_TOMB, - .did = {0}, // TODO - .fid = fset->fid, - .cid = 0, // TODO - }; - - code = tsdbTFileObjInit(tsdb, &file, &fset->farr[TSDB_FTYPE_TOMB]); + if (TOMB_BLOCK_SIZE(ctx->tombBlock) > ctx->maxRow) { + if (ctx->fd == NULL) { + code = tsdbUpgradeOpenTombFile(tsdb, fset, &ctx->fd, &ctx->fobj, &ctx->toStt); TSDB_CHECK_CODE(code, lino, _exit); - - code = tsdbOpenFile(fset->farr[TSDB_FTYPE_TOMB]->fname, tsdb->pVnode->config.tsdbPageSize, - TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC, &fd); - TSDB_CHECK_CODE(code, lino, _exit); - - uint8_t hdr[TSDB_FHDR_SIZE] = {0}; - code = tsdbWriteFile(fd, 0, hdr, TSDB_FHDR_SIZE); - TSDB_CHECK_CODE(code, lino, _exit); - fset->farr[TSDB_FTYPE_TOMB]->f->size += sizeof(hdr); } - - // TODO - tTombBlockClear(tombBlock); + code = tsdbFileWriteTombBlock(ctx->fd, ctx->tombBlock, ctx->cmprAlg, &ctx->fobj->f->size, ctx->tombBlkArray, + ctx->bufArr); + TSDB_CHECK_CODE(code, lino, _exit); } } } - if (TOMB_BLOCK_SIZE(tombBlock) > 0) { - // TODO - tTombBlockClear(tombBlock); - } - - if (TARRAY2_SIZE(tombBlkArray) > 0) { - // TODO - } - - if (fd) { - // write footer - - // sync and close - code = tsdbFsyncFile(fd); + if (TOMB_BLOCK_SIZE(ctx->tombBlock) > 0) { + if (ctx->fd == NULL) { + code = tsdbUpgradeOpenTombFile(tsdb, fset, &ctx->fd, &ctx->fobj, &ctx->toStt); + TSDB_CHECK_CODE(code, lino, _exit); + } + code = tsdbFileWriteTombBlock(ctx->fd, ctx->tombBlock, ctx->cmprAlg, &ctx->fobj->f->size, ctx->tombBlkArray, + ctx->bufArr); TSDB_CHECK_CODE(code, lino, _exit); - tsdbCloseFile(&fd); } - // clear - TARRAY2_DESTROY(tombBlkArray, NULL); - tTombBlockDestroy(tombBlock); - taosArrayDestroy(aDelData); + if (ctx->fd != NULL) { + if (ctx->toStt) { + code = tsdbFileWriteTombBlk(ctx->fd, ctx->tombBlkArray, ctx->sttFooter->tombBlkPtr, &ctx->fobj->f->size); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tsdbFileWriteSttFooter(ctx->fd, ctx->sttFooter, &ctx->fobj->f->size); + TSDB_CHECK_CODE(code, lino, _exit); + } else { + code = tsdbFileWriteTombBlk(ctx->fd, ctx->tombBlkArray, ctx->tombFooter->tombBlkPtr, &ctx->fobj->f->size); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tsdbFileWriteTombFooter(ctx->fd, ctx->tombFooter, &ctx->fobj->f->size); + TSDB_CHECK_CODE(code, lino, _exit); + } + + code = tsdbFsyncFile(ctx->fd); + TSDB_CHECK_CODE(code, lino, _exit); + + tsdbCloseFile(&ctx->fd); + } _exit: if (code) { TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code); } + for (int32_t i = 0; i < ARRAY_SIZE(ctx->bufArr); i++) { + tFree(ctx->bufArr[i]); + } + TARRAY2_DESTROY(ctx->tombBlkArray, NULL); + tTombBlockDestroy(ctx->tombBlock); + taosArrayDestroy(ctx->aDelData); return code; } @@ -487,13 +556,12 @@ static int32_t tsdbUpgradeTombFile(STsdb *tsdb, SDelFile *pDelFile, TFileSetArra } } - tsdbDelFReaderClose(&reader); - taosArrayDestroy(aDelIdx); - _exit: if (code) { TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code); } + tsdbDelFReaderClose(&reader); + taosArrayDestroy(aDelIdx); return code; } From 27e17048ac3008bd38520c66b739885f192a0deb Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Mon, 26 Jun 2023 09:06:59 +0800 Subject: [PATCH 5/8] fix: retention coredump --- source/dnode/vnode/src/tsdb/tsdbRetention.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRetention.c b/source/dnode/vnode/src/tsdb/tsdbRetention.c index 6580b89dad..c8236e8f52 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRetention.c +++ b/source/dnode/vnode/src/tsdb/tsdbRetention.c @@ -23,7 +23,7 @@ typedef struct { int64_t cid; TFileSetArray *fsetArr; - TFileOpArray *fopArr; + TFileOpArray fopArr[1]; struct { int32_t fsetArrIdx; From 713deac25ea5e569d908d2c79ccb909d878bb48b Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Mon, 26 Jun 2023 13:10:01 +0800 Subject: [PATCH 6/8] fix update code --- source/common/src/tdataformat.c | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/source/common/src/tdataformat.c b/source/common/src/tdataformat.c index 0b12177754..7c6939635a 100644 --- a/source/common/src/tdataformat.c +++ b/source/common/src/tdataformat.c @@ -2245,15 +2245,18 @@ static int32_t tColDataUpdateValue72(SColData *pColData, uint8_t *pData, uint32_ } return 0; } +static FORCE_INLINE int32_t tColDataUpdateNothing(SColData *pColData, uint8_t *pData, uint32_t nData, bool forward) { + return 0; +} static int32_t (*tColDataUpdateValueImpl[8][3])(SColData *pColData, uint8_t *pData, uint32_t nData, bool forward) = { - {NULL, NULL, NULL}, // 0 - {tColDataUpdateValue10, NULL, tColDataUpdateValue12}, // HAS_NONE - {tColDataUpdateValue20, NULL, NULL}, // HAS_NULL - {tColDataUpdateValue30, NULL, tColDataUpdateValue32}, // HAS_NULL|HAS_NONE - {tColDataUpdateValue40, NULL, tColDataUpdateValue42}, // HAS_VALUE - {tColDataUpdateValue50, NULL, tColDataUpdateValue52}, // HAS_VALUE|HAS_NONE - {tColDataUpdateValue60, NULL, tColDataUpdateValue62}, // HAS_VALUE|HAS_NULL - {tColDataUpdateValue70, NULL, tColDataUpdateValue72}, // HAS_VALUE|HAS_NULL|HAS_NONE + {NULL, NULL, NULL}, // 0 + {tColDataUpdateValue10, tColDataUpdateNothing, tColDataUpdateValue12}, // HAS_NONE + {tColDataUpdateValue20, tColDataUpdateNothing, tColDataUpdateNothing}, // HAS_NULL + {tColDataUpdateValue30, tColDataUpdateNothing, tColDataUpdateValue32}, // HAS_NULL|HAS_NONE + {tColDataUpdateValue40, tColDataUpdateNothing, tColDataUpdateValue42}, // HAS_VALUE + {tColDataUpdateValue50, tColDataUpdateNothing, tColDataUpdateValue52}, // HAS_VALUE|HAS_NONE + {tColDataUpdateValue60, tColDataUpdateNothing, tColDataUpdateValue62}, // HAS_VALUE|HAS_NULL + {tColDataUpdateValue70, tColDataUpdateNothing, tColDataUpdateValue72}, // HAS_VALUE|HAS_NULL|HAS_NONE // VALUE NONE NULL }; From e79683a684dd249ba250902a55bfe9f7412f47e9 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Mon, 26 Jun 2023 13:57:18 +0800 Subject: [PATCH 7/8] fix more --- source/dnode/vnode/src/inc/tsdb.h | 4 ++-- source/dnode/vnode/src/tsdb/tsdbIter.c | 6 ++++++ source/dnode/vnode/src/tsdb/tsdbMerge.c | 2 +- source/dnode/vnode/src/tsdb/tsdbSnapshot.c | 2 +- 4 files changed, 10 insertions(+), 4 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index ee7ebfcd4c..180421fec1 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -202,7 +202,7 @@ int32_t tMapDataToArray(SMapData *pMapData, int32_t itemSize, int32_t (*tGetItem // other int32_t tsdbKeyFid(TSKEY key, int32_t minutes, int8_t precision); void tsdbFidKeyRange(int32_t fid, int32_t minutes, int8_t precision, TSKEY *minKey, TSKEY *maxKey); -int32_t tsdbFidLevel(int32_t fid, STsdbKeepCfg *pKeepCfg, int64_t now); +int32_t tsdbFidLevel(int32_t fid, STsdbKeepCfg *pKeepCfg, int64_t nowSec); int32_t tsdbBuildDeleteSkyline(SArray *aDelData, int32_t sidx, int32_t eidx, SArray *aSkyline); int32_t tPutColumnDataAgg(uint8_t *p, SColumnDataAgg *pColAgg); int32_t tGetColumnDataAgg(uint8_t *p, SColumnDataAgg *pColAgg); @@ -707,7 +707,7 @@ typedef struct SSttBlockLoadInfo { SArray *aSttBlk; SArray *pTombBlockArray; // tomb block array list - int32_t blockIndex[2]; // to denote the loaded block in the corresponding position. + int32_t blockIndex[2]; // to denote the loaded block in the corresponding position. int32_t currentLoadBlockIndex; int32_t loadBlocks; double elapsedTime; diff --git a/source/dnode/vnode/src/tsdb/tsdbIter.c b/source/dnode/vnode/src/tsdb/tsdbIter.c index 69289d5a4f..9780cc6be6 100644 --- a/source/dnode/vnode/src/tsdb/tsdbIter.c +++ b/source/dnode/vnode/src/tsdb/tsdbIter.c @@ -629,6 +629,12 @@ static int32_t tsdbIterSkipTableData(STsdbIter *iter, const TABLEID *tbid) { return tsdbDataIterNext(iter, tbid); case TSDB_ITER_TYPE_MEMT: return tsdbMemTableIterNext(iter, tbid); + case TSDB_ITER_TYPE_STT_TOMB: + return tsdbSttTombIterNext(iter, tbid); + case TSDB_ITER_TYPE_DATA_TOMB: + return tsdbDataTombIterNext(iter, tbid); + case TSDB_ITER_TYPE_MEMT_TOMB: + return tsdbMemTombIterNext(iter, tbid); default: ASSERT(false); } diff --git a/source/dnode/vnode/src/tsdb/tsdbMerge.c b/source/dnode/vnode/src/tsdb/tsdbMerge.c index 3b6f3a71bc..c4458a12c5 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMerge.c +++ b/source/dnode/vnode/src/tsdb/tsdbMerge.c @@ -52,7 +52,7 @@ typedef struct { } SMerger; static int32_t tsdbMergerOpen(SMerger *merger) { - merger->ctx->now = taosGetTimestampMs(); + merger->ctx->now = taosGetTimestampSec(); merger->maxRow = merger->tsdb->pVnode->config.tsdbCfg.maxRows; merger->minRow = merger->tsdb->pVnode->config.tsdbCfg.minRows; merger->szPage = merger->tsdb->pVnode->config.tsdbPageSize; diff --git a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c index a294d24f2d..8a6aa7162d 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c +++ b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c @@ -766,7 +766,7 @@ static int32_t tsdbSnapWriteFileSetBegin(STsdbSnapWriter* writer, int32_t fid) { writer->ctx->fid = fid; writer->ctx->fset = TARRAY2_SEARCH_EX(writer->fsetArr, &fset, tsdbTFileSetCmprFn, TD_EQ); - int32_t level = tsdbFidLevel(fid, &writer->tsdb->keepCfg, writer->now); + int32_t level = tsdbFidLevel(fid, &writer->tsdb->keepCfg, taosGetTimestampSec()); if (tfsAllocDisk(writer->tsdb->pVnode->pTfs, level, &writer->ctx->did)) { code = TSDB_CODE_NO_AVAIL_DISK; TSDB_CHECK_CODE(code, lino, _exit); From 5ad9bd938fa07b36e89b6dce98b714d8bbd6ece2 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Mon, 26 Jun 2023 15:47:36 +0800 Subject: [PATCH 8/8] change default stt trigger as 2 --- include/util/tdef.h | 46 +++++++++++++-------------- source/dnode/vnode/src/vnd/vnodeCfg.c | 15 ++++----- 2 files changed, 30 insertions(+), 31 deletions(-) diff --git a/include/util/tdef.h b/include/util/tdef.h index 69b012ecea..3ed20c6455 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -191,16 +191,16 @@ typedef enum ELogicConditionType { #define TSDB_MAX_COLUMNS 4096 #define TSDB_MIN_COLUMNS 2 // PRIMARY COLUMN(timestamp) + other columns -#define TSDB_NODE_NAME_LEN 64 -#define TSDB_TABLE_NAME_LEN 193 // it is a null-terminated string -#define TSDB_TOPIC_NAME_LEN 193 // it is a null-terminated string -#define TSDB_CGROUP_LEN 193 // it is a null-terminated string -#define TSDB_OFFSET_LEN 64 // it is a null-terminated string -#define TSDB_USER_CGROUP_LEN (TSDB_USER_LEN + TSDB_CGROUP_LEN) // it is a null-terminated string -#define TSDB_STREAM_NAME_LEN 193 // it is a null-terminated string -#define TSDB_DB_NAME_LEN 65 -#define TSDB_DB_FNAME_LEN (TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN + TSDB_NAME_DELIMITER_LEN) -#define TSDB_PRIVILEDGE_CONDITION_LEN 200 +#define TSDB_NODE_NAME_LEN 64 +#define TSDB_TABLE_NAME_LEN 193 // it is a null-terminated string +#define TSDB_TOPIC_NAME_LEN 193 // it is a null-terminated string +#define TSDB_CGROUP_LEN 193 // it is a null-terminated string +#define TSDB_OFFSET_LEN 64 // it is a null-terminated string +#define TSDB_USER_CGROUP_LEN (TSDB_USER_LEN + TSDB_CGROUP_LEN) // it is a null-terminated string +#define TSDB_STREAM_NAME_LEN 193 // it is a null-terminated string +#define TSDB_DB_NAME_LEN 65 +#define TSDB_DB_FNAME_LEN (TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN + TSDB_NAME_DELIMITER_LEN) +#define TSDB_PRIVILEDGE_CONDITION_LEN 200 #define TSDB_FUNC_NAME_LEN 65 #define TSDB_FUNC_COMMENT_LEN 1024 * 1024 @@ -249,15 +249,15 @@ typedef enum ELogicConditionType { #define TSDB_LABEL_LEN 8 #define TSDB_JOB_STATUS_LEN 32 -#define TSDB_CLUSTER_ID_LEN 40 -#define TSDB_FQDN_LEN 128 -#define TSDB_EP_LEN (TSDB_FQDN_LEN + 6) -#define TSDB_IPv4ADDR_LEN 16 -#define TSDB_FILENAME_LEN 128 -#define TSDB_SHOW_SQL_LEN 2048 +#define TSDB_CLUSTER_ID_LEN 40 +#define TSDB_FQDN_LEN 128 +#define TSDB_EP_LEN (TSDB_FQDN_LEN + 6) +#define TSDB_IPv4ADDR_LEN 16 +#define TSDB_FILENAME_LEN 128 +#define TSDB_SHOW_SQL_LEN 2048 #define TSDB_SHOW_SCHEMA_JSON_LEN TSDB_MAX_COLUMNS * 256 -#define TSDB_SLOW_QUERY_SQL_LEN 512 -#define TSDB_SHOW_SUBQUERY_LEN 1000 +#define TSDB_SLOW_QUERY_SQL_LEN 512 +#define TSDB_SHOW_SUBQUERY_LEN 1000 #define TSDB_TRANS_STAGE_LEN 12 #define TSDB_TRANS_TYPE_LEN 16 @@ -370,7 +370,7 @@ typedef enum ELogicConditionType { #define TSDB_DEFAULT_DB_SCHEMALESS TSDB_DB_SCHEMALESS_OFF #define TSDB_MIN_STT_TRIGGER 1 #define TSDB_MAX_STT_TRIGGER 16 -#define TSDB_DEFAULT_SST_TRIGGER 1 +#define TSDB_DEFAULT_SST_TRIGGER 2 #define TSDB_MIN_HASH_PREFIX (2 - TSDB_TABLE_NAME_LEN) #define TSDB_MAX_HASH_PREFIX (TSDB_TABLE_NAME_LEN - 2) #define TSDB_DEFAULT_HASH_PREFIX 0 @@ -410,10 +410,10 @@ typedef enum ELogicConditionType { #define TSDB_EXPLAIN_RESULT_ROW_SIZE (16 * 1024) #define TSDB_EXPLAIN_RESULT_COLUMN_NAME "QUERY_PLAN" -#define TSDB_MAX_FIELD_LEN 65519 // 16384:65519 -#define TSDB_MAX_BINARY_LEN TSDB_MAX_FIELD_LEN // 16384-8:65519 -#define TSDB_MAX_NCHAR_LEN TSDB_MAX_FIELD_LEN // 16384-8:65519 -#define TSDB_MAX_GEOMETRY_LEN TSDB_MAX_FIELD_LEN // 16384-8:65519 +#define TSDB_MAX_FIELD_LEN 65519 // 16384:65519 +#define TSDB_MAX_BINARY_LEN TSDB_MAX_FIELD_LEN // 16384-8:65519 +#define TSDB_MAX_NCHAR_LEN TSDB_MAX_FIELD_LEN // 16384-8:65519 +#define TSDB_MAX_GEOMETRY_LEN TSDB_MAX_FIELD_LEN // 16384-8:65519 #define PRIMARYKEY_TIMESTAMP_COL_ID 1 #define COL_REACH_END(colId, maxColId) ((colId) > (maxColId)) diff --git a/source/dnode/vnode/src/vnd/vnodeCfg.c b/source/dnode/vnode/src/vnd/vnodeCfg.c index efe82e1783..2e161d728f 100644 --- a/source/dnode/vnode/src/vnd/vnodeCfg.c +++ b/source/dnode/vnode/src/vnd/vnodeCfg.c @@ -49,7 +49,7 @@ const SVnodeCfg vnodeCfgDefault = {.vgId = -1, .hashBegin = 0, .hashEnd = 0, .hashMethod = 0, - .sttTrigger = TSDB_DEFAULT_STT_FILE, + .sttTrigger = TSDB_DEFAULT_SST_TRIGGER, .tsdbPageSize = TSDB_DEFAULT_PAGE_SIZE}; int vnodeCheckCfg(const SVnodeCfg *pCfg) { @@ -57,7 +57,7 @@ int vnodeCheckCfg(const SVnodeCfg *pCfg) { return 0; } -const char* vnodeRoleToStr(ESyncRole role) { +const char *vnodeRoleToStr(ESyncRole role) { switch (role) { case TAOS_SYNC_ROLE_VOTER: return "true"; @@ -68,11 +68,11 @@ const char* vnodeRoleToStr(ESyncRole role) { } } -const ESyncRole vnodeStrToRole(char* str) { - if(strcmp(str, "true") == 0){ +const ESyncRole vnodeStrToRole(char *str) { + if (strcmp(str, "true") == 0) { return TAOS_SYNC_ROLE_VOTER; } - if(strcmp(str, "false") == 0){ + if (strcmp(str, "false") == 0) { return TAOS_SYNC_ROLE_LEARNER; } @@ -295,10 +295,9 @@ int vnodeDecodeConfig(const SJson *pJson, void *pObj) { char role[10] = {0}; code = tjsonGetStringValue(info, "isReplica", role); if (code < 0) return -1; - if(strlen(role) != 0){ + if (strlen(role) != 0) { pNode->nodeRole = vnodeStrToRole(role); - } - else{ + } else { pNode->nodeRole = TAOS_SYNC_ROLE_VOTER; } vDebug("vgId:%d, decode config, replica:%d ep:%s:%u dnode:%d", pCfg->vgId, i, pNode->nodeFqdn, pNode->nodePort,