From ef524f696513a9088af2772e1e3f527c5a9127df Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Thu, 8 Jun 2023 15:35:43 +0800 Subject: [PATCH] more code --- include/util/tarray2.h | 13 ++ .../vnode/src/tsdb/dev/inc/tsdbSttFileRW.h | 2 +- .../dnode/vnode/src/tsdb/dev/inc/tsdbUtil.h | 16 +- .../dnode/vnode/src/tsdb/dev/tsdbSttFileRW.c | 147 ++++++++++-------- 4 files changed, 104 insertions(+), 74 deletions(-) diff --git a/include/util/tarray2.h b/include/util/tarray2.h index a30184b6e8..ad01c66d88 100644 --- a/include/util/tarray2.h +++ b/include/util/tarray2.h @@ -116,6 +116,19 @@ static FORCE_INLINE int32_t tarray2_make_room( // #define TARRAY2_APPEND(a, e) TARRAY2_INSERT(a, (a)->size, e) #define TARRAY2_APPEND_PTR(a, ep) TARRAY2_APPEND(a, *(ep)) +#define TARRAY2_APPEND_BATCH(a, ep, n) \ + ({ \ + int32_t __ret = 0; \ + if ((a)->size + (n) > (a)->capacity) { \ + __ret = tarray2_make_room((a), (a)->size + (n), sizeof(typeof((a)->data[0]))); \ + } \ + if (!__ret) { \ + memcpy((a)->data + (a)->size, (ep), sizeof(typeof((a)->data[0])) * (n)); \ + (a)->size += (n); \ + } \ + __ret; \ + }) + // return (TYPE *) #define TARRAY2_SEARCH(a, ep, cmp, flag) \ ({ \ diff --git a/source/dnode/vnode/src/tsdb/dev/inc/tsdbSttFileRW.h b/source/dnode/vnode/src/tsdb/dev/inc/tsdbSttFileRW.h index 8526a1143a..4e183830e6 100644 --- a/source/dnode/vnode/src/tsdb/dev/inc/tsdbSttFileRW.h +++ b/source/dnode/vnode/src/tsdb/dev/inc/tsdbSttFileRW.h @@ -71,7 +71,7 @@ struct SSttFileWriterConfig { int32_t maxRow; int32_t szPage; int8_t cmprAlg; - int64_t compactVersion; // compact version + int64_t compactVersion; STFile file; SSkmInfo *skmTb; SSkmInfo *skmRow; diff --git a/source/dnode/vnode/src/tsdb/dev/inc/tsdbUtil.h b/source/dnode/vnode/src/tsdb/dev/inc/tsdbUtil.h index 9ecbe50cbf..9a74cc6494 100644 --- a/source/dnode/vnode/src/tsdb/dev/inc/tsdbUtil.h +++ b/source/dnode/vnode/src/tsdb/dev/inc/tsdbUtil.h @@ -47,13 +47,15 @@ typedef union { } STombBlock; typedef struct { - int32_t numRec; - int32_t size[TOMB_RECORD_ELEM_NUM]; + SFDataPtr dp[1]; TABLEID minTbid; TABLEID maxTbid; int64_t minVer; int64_t maxVer; - SFDataPtr dp[1]; + int32_t numRec; + int32_t size[TOMB_RECORD_ELEM_NUM]; + int8_t cmprAlg; + int8_t rsvd[7]; } STombBlk; #define TOMB_BLOCK_SIZE(db) TARRAY2_SIZE((db)->suid) @@ -94,13 +96,15 @@ typedef union { } STbStatisBlock; typedef struct { - int32_t numRec; - int32_t size[STATIS_RECORD_NUM_ELEM]; + SFDataPtr dp[1]; TABLEID minTbid; TABLEID maxTbid; int64_t minVer; int64_t maxVer; - SFDataPtr dp[1]; + int32_t numRec; + int32_t size[STATIS_RECORD_NUM_ELEM]; + int8_t cmprAlg; + int8_t rsvd[7]; } SStatisBlk; #define STATIS_BLOCK_SIZE(db) TARRAY2_SIZE((db)->suid) diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbSttFileRW.c b/source/dnode/vnode/src/tsdb/dev/tsdbSttFileRW.c index 9c80bc4c43..350fa233d6 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbSttFileRW.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbSttFileRW.c @@ -69,9 +69,9 @@ _exit: static int32_t tsdbSttSegReaderClose(SSttSegReader **reader) { if (reader[0]) { - TARRAY2_DESTROY(reader[0]->sttBlkArray, NULL); TARRAY2_DESTROY(reader[0]->tombBlkArray, NULL); TARRAY2_DESTROY(reader[0]->statisBlkArray, NULL); + TARRAY2_DESTROY(reader[0]->sttBlkArray, NULL); taosMemoryFree(reader[0]); reader[0] = NULL; } @@ -86,11 +86,20 @@ int32_t tsdbSttFileReaderOpen(const char *fname, const SSttFileReaderConfig *con if (reader[0] == NULL) return TSDB_CODE_OUT_OF_MEMORY; reader[0]->config[0] = config[0]; - if (!reader[0]->config->bufArr) reader[0]->config->bufArr = reader[0]->bufArr; + if (reader[0]->config->bufArr == NULL) { + reader[0]->config->bufArr = reader[0]->bufArr; + } // open file - code = tsdbOpenFile(fname, config->szPage, TD_FILE_READ, &reader[0]->fd); - TSDB_CHECK_CODE(code, lino, _exit); + if (fname) { + code = tsdbOpenFile(fname, config->szPage, TD_FILE_READ, &reader[0]->fd); + TSDB_CHECK_CODE(code, lino, _exit); + } else { + char fname1[TSDB_FILENAME_LEN]; + tsdbTFileName(config->tsdb, config->file, fname1); + code = tsdbOpenFile(fname1, config->szPage, TD_FILE_READ, &reader[0]->fd); + TSDB_CHECK_CODE(code, lino, _exit); + } // open each segment reader int64_t size = config->file->size; @@ -256,14 +265,12 @@ int32_t tsdbSttFileReadTombBlock(SSttSegReader *reader, const STombBlk *tombBlk, int64_t size = 0; for (int32_t i = 0; i < ARRAY_SIZE(dData->dataArr); ++i) { code = tsdbDecmprData(reader->reader->config->bufArr[0] + size, tombBlk->size[i], TSDB_DATA_TYPE_BIGINT, - TWO_STAGE_COMP, &reader->reader->config->bufArr[1], sizeof(int64_t) * tombBlk->numRec, + tombBlk->cmprAlg, &reader->reader->config->bufArr[1], sizeof(int64_t) * tombBlk->numRec, &reader->reader->config->bufArr[2]); TSDB_CHECK_CODE(code, lino, _exit); - for (int32_t j = 0; j < tombBlk->numRec; ++j) { - code = TARRAY2_APPEND(&dData->dataArr[i], ((int64_t *)(reader->reader->config->bufArr[1]))[j]); - continue; - } + code = TARRAY2_APPEND_BATCH(&dData->dataArr[i], reader->reader->config->bufArr[1], tombBlk->numRec); + TSDB_CHECK_CODE(code, lino, _exit); size += tombBlk->size[i]; } @@ -292,14 +299,12 @@ int32_t tsdbSttFileReadStatisBlock(SSttSegReader *reader, const SStatisBlk *stat int64_t size = 0; for (int32_t i = 0; i < ARRAY_SIZE(sData->dataArr); ++i) { code = tsdbDecmprData(reader->reader->config->bufArr[0] + size, statisBlk->size[i], TSDB_DATA_TYPE_BIGINT, - TWO_STAGE_COMP, &reader->reader->config->bufArr[1], sizeof(int64_t) * statisBlk->numRec, + statisBlk->cmprAlg, &reader->reader->config->bufArr[1], sizeof(int64_t) * statisBlk->numRec, &reader->reader->config->bufArr[2]); TSDB_CHECK_CODE(code, lino, _exit); - for (int32_t j = 0; j < statisBlk->numRec; ++j) { - code = TARRAY2_APPEND(sData->dataArr + i, ((int64_t *)reader->reader->config->bufArr[1])[j]); - TSDB_CHECK_CODE(code, lino, _exit); - } + code = TARRAY2_APPEND_BATCH(sData->dataArr + i, reader->reader->config->bufArr[1], statisBlk->numRec); + TSDB_CHECK_CODE(code, lino, _exit); size += statisBlk->size[i]; } @@ -333,7 +338,6 @@ struct SSttFileWriter { // helper data SSkmInfo skmTb[1]; SSkmInfo skmRow[1]; - int32_t sizeArr[5]; uint8_t *bufArr[5]; STsdbFD *fd; }; @@ -362,18 +366,19 @@ static int32_t tsdbSttFileDoWriteTSDataBlock(SSttFileWriter *writer) { if (sttBlk->maxVer < writer->bData->aVersion[iRow]) sttBlk->maxVer = writer->bData->aVersion[iRow]; } - code = tCmprBlockData(writer->bData, writer->config->cmprAlg, NULL, NULL, writer->config->bufArr, writer->sizeArr); + int32_t sizeArr[5] = {0}; + code = tCmprBlockData(writer->bData, writer->config->cmprAlg, NULL, NULL, writer->config->bufArr, sizeArr); TSDB_CHECK_CODE(code, lino, _exit); sttBlk->bInfo.offset = writer->file->size; - sttBlk->bInfo.szKey = writer->sizeArr[2] + writer->sizeArr[3]; - sttBlk->bInfo.szBlock = writer->sizeArr[0] + writer->sizeArr[1] + sttBlk->bInfo.szKey; + sttBlk->bInfo.szKey = sizeArr[2] + sizeArr[3]; + sttBlk->bInfo.szBlock = sizeArr[0] + sizeArr[1] + sttBlk->bInfo.szKey; for (int32_t i = 3; i >= 0; i--) { - if (writer->sizeArr[i]) { - code = tsdbWriteFile(writer->fd, writer->file->size, writer->config->bufArr[i], writer->sizeArr[i]); + if (sizeArr[i]) { + code = tsdbWriteFile(writer->fd, writer->file->size, writer->config->bufArr[i], sizeArr[i]); TSDB_CHECK_CODE(code, lino, _exit); - writer->file->size += writer->sizeArr[i]; + writer->file->size += sizeArr[i]; } } @@ -390,13 +395,17 @@ _exit: } static int32_t tsdbSttFileDoWriteStatisBlock(SSttFileWriter *writer) { - if (STATIS_BLOCK_SIZE(writer->sData)) return 0; + if (STATIS_BLOCK_SIZE(writer->sData) == 0) return 0; int32_t code = 0; int32_t lino = 0; SStatisBlk statisBlk[1] = {{ - .numRec = STATIS_BLOCK_SIZE(writer->sData), + .dp[0] = + { + .offset = writer->file->size, + .size = 0, + }, .minTbid = { .suid = TARRAY2_FIRST(writer->sData->suid), @@ -409,6 +418,8 @@ static int32_t tsdbSttFileDoWriteStatisBlock(SSttFileWriter *writer) { }, .minVer = TARRAY2_FIRST(writer->sData->minVer), .maxVer = TARRAY2_FIRST(writer->sData->maxVer), + .numRec = STATIS_BLOCK_SIZE(writer->sData), + .cmprAlg = writer->config->cmprAlg, }}; for (int32_t i = 1; i < STATIS_BLOCK_SIZE(writer->sData); i++) { @@ -416,13 +427,10 @@ static int32_t tsdbSttFileDoWriteStatisBlock(SSttFileWriter *writer) { statisBlk->maxVer = TMAX(statisBlk->maxVer, TARRAY2_GET(writer->sData->maxVer, i)); } - statisBlk->dp->offset = writer->file->size; - statisBlk->dp->size = 0; - for (int32_t i = 0; i < STATIS_RECORD_NUM_ELEM; i++) { int32_t size; code = tsdbCmprData((uint8_t *)TARRAY2_DATA(writer->sData->dataArr + i), - TARRAY2_DATA_LEN(&writer->sData->dataArr[i]), TSDB_DATA_TYPE_BIGINT, TWO_STAGE_COMP, + TARRAY2_DATA_LEN(&writer->sData->dataArr[i]), TSDB_DATA_TYPE_BIGINT, statisBlk->cmprAlg, &writer->config->bufArr[0], 0, &size, &writer->config->bufArr[1]); TSDB_CHECK_CODE(code, lino, _exit); @@ -453,7 +461,11 @@ static int32_t tsdbSttFileDoWriteTombBlock(SSttFileWriter *writer) { int32_t lino = 0; STombBlk tombBlk[1] = {{ - .numRec = TOMB_BLOCK_SIZE(writer->tData), + .dp[0] = + { + .offset = writer->file->size, + .size = 0, + }, .minTbid = { .suid = TARRAY2_FIRST(writer->tData->suid), @@ -466,11 +478,8 @@ static int32_t tsdbSttFileDoWriteTombBlock(SSttFileWriter *writer) { }, .minVer = TARRAY2_FIRST(writer->tData->version), .maxVer = TARRAY2_FIRST(writer->tData->version), - .dp[0] = - { - .offset = writer->file->size, - .size = 0, - }, + .numRec = TOMB_BLOCK_SIZE(writer->tData), + .cmprAlg = writer->config->cmprAlg, }}; for (int32_t i = 1; i < TOMB_BLOCK_SIZE(writer->tData); i++) { @@ -481,7 +490,7 @@ static int32_t tsdbSttFileDoWriteTombBlock(SSttFileWriter *writer) { for (int32_t i = 0; i < ARRAY_SIZE(writer->tData->dataArr); i++) { int32_t size; code = tsdbCmprData((uint8_t *)TARRAY2_DATA(&writer->tData->dataArr[i]), - TARRAY2_DATA_LEN(&writer->tData->dataArr[i]), TSDB_DATA_TYPE_BIGINT, TWO_STAGE_COMP, + TARRAY2_DATA_LEN(&writer->tData->dataArr[i]), TSDB_DATA_TYPE_BIGINT, tombBlk->cmprAlg, &writer->config->bufArr[0], 0, &size, &writer->config->bufArr[1]); TSDB_CHECK_CODE(code, lino, _exit); @@ -509,10 +518,9 @@ static int32_t tsdbSttFileDoWriteSttBlk(SSttFileWriter *writer) { int32_t code = 0; int32_t lino; - writer->footer->sttBlkPtr->offset = writer->file->size; writer->footer->sttBlkPtr->size = TARRAY2_DATA_LEN(writer->sttBlkArray); - if (writer->footer->sttBlkPtr->size) { + writer->footer->sttBlkPtr->offset = writer->file->size; code = tsdbWriteFile(writer->fd, writer->file->size, (const uint8_t *)TARRAY2_DATA(writer->sttBlkArray), writer->footer->sttBlkPtr->size); TSDB_CHECK_CODE(code, lino, _exit); @@ -530,10 +538,9 @@ static int32_t tsdbSttFileDoWriteStatisBlk(SSttFileWriter *writer) { int32_t code = 0; int32_t lino; - writer->footer->statisBlkPtr->offset = writer->file->size; writer->footer->statisBlkPtr->size = TARRAY2_DATA_LEN(writer->statisBlkArray); - if (writer->footer->statisBlkPtr->size) { + writer->footer->statisBlkPtr->offset = writer->file->size; code = tsdbWriteFile(writer->fd, writer->file->size, (const uint8_t *)TARRAY2_DATA(writer->statisBlkArray), writer->footer->statisBlkPtr->size); TSDB_CHECK_CODE(code, lino, _exit); @@ -551,10 +558,9 @@ static int32_t tsdbSttFileDoWriteTombBlk(SSttFileWriter *writer) { int32_t code = 0; int32_t lino = 0; - writer->footer->tombBlkPtr->offset = writer->file->size; writer->footer->tombBlkPtr->size = TARRAY2_DATA_LEN(writer->tombBlkArray); - if (writer->footer->tombBlkPtr->size) { + writer->footer->tombBlkPtr->offset = writer->file->size; code = tsdbWriteFile(writer->fd, writer->file->size, (const uint8_t *)TARRAY2_DATA(writer->tombBlkArray), writer->footer->tombBlkPtr->size); TSDB_CHECK_CODE(code, lino, _exit); @@ -591,7 +597,7 @@ static int32_t tsdbSttFWriterDoOpen(SSttFileWriter *writer) { int32_t flag; char fname[TSDB_FILENAME_LEN]; - if (writer->file->size) { + if (writer->file->size > 0) { flag = TD_FILE_READ | TD_FILE_WRITE; } else { flag = TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC; @@ -601,7 +607,7 @@ static int32_t tsdbSttFWriterDoOpen(SSttFileWriter *writer) { code = tsdbOpenFile(fname, writer->config->szPage, flag, &writer->fd); TSDB_CHECK_CODE(code, lino, _exit); - if (!writer->file->size) { + if (writer->file->size == 0) { uint8_t hdr[TSDB_FHDR_SIZE] = {0}; code = tsdbWriteFile(writer->fd, 0, hdr, sizeof(hdr)); TSDB_CHECK_CODE(code, lino, _exit); @@ -618,9 +624,9 @@ _exit: } static void tsdbSttFWriterDoClose(SSttFileWriter *writer) { - ASSERT(!writer->fd); + ASSERT(writer->fd == NULL); - for (int32_t i = 0; i < ARRAY_SIZE(writer->sizeArr); ++i) { + for (int32_t i = 0; i < ARRAY_SIZE(writer->bufArr); ++i) { tFree(writer->bufArr[i]); } tDestroyTSchema(writer->skmRow->pTSchema); @@ -672,12 +678,21 @@ static int32_t tsdbSttFWriterCloseCommit(SSttFileWriter *writer, TFileOpArray *o tsdbCloseFile(&writer->fd); ASSERT(writer->config->file.size < writer->file->size); - STFileOp op = { - .optype = writer->config->file.size ? TSDB_FOP_MODIFY : TSDB_FOP_CREATE, - .fid = writer->config->file.fid, - .of = writer->config->file, - .nf = writer->file[0], - }; + STFileOp op; + if (writer->config->file.size == 0) { + op = (STFileOp){ + .optype = TSDB_FOP_CREATE, + .fid = writer->config->file.fid, + .nf = writer->file[0], + }; + } else { + op = (STFileOp){ + .optype = TSDB_FOP_MODIFY, + .fid = writer->config->file.fid, + .of = writer->config->file, + .nf = writer->file[0], + }; + } code = TARRAY2_APPEND(opArray, op); TSDB_CHECK_CODE(code, lino, _exit); @@ -702,7 +717,6 @@ static int32_t tsdbSttFWriterCloseAbort(SSttFileWriter *writer) { tsdbCloseFile(&writer->fd); taosRemoveFile(fname); } - return 0; } @@ -770,7 +784,7 @@ int32_t tsdbSttFileWriteTSData(SSttFileWriter *writer, SRowInfo *row) { TSDB_CHECK_CODE(code, lino, _exit); } - STbStatisRecord record[1] = {{ + STbStatisRecord record = { .suid = row->suid, .uid = row->uid, .firstKey = key->ts, @@ -778,18 +792,17 @@ int32_t tsdbSttFileWriteTSData(SSttFileWriter *writer, SRowInfo *row) { .minVer = key->version, .maxVer = key->version, .count = 1, - }}; - code = tStatisBlockPut(writer->sData, record); + }; + code = tStatisBlockPut(writer->sData, &record); TSDB_CHECK_CODE(code, lino, _exit); } else { + ASSERT(key->ts >= TARRAY2_LAST(writer->sData->lastKey)); + TARRAY2_LAST(writer->sData->minVer) = TMIN(TARRAY2_LAST(writer->sData->minVer), key->version); TARRAY2_LAST(writer->sData->maxVer) = TMAX(TARRAY2_LAST(writer->sData->maxVer), key->version); if (key->ts > TARRAY2_LAST(writer->sData->lastKey)) { TARRAY2_LAST(writer->sData->count)++; TARRAY2_LAST(writer->sData->lastKey) = key->ts; - } else if (key->ts == TARRAY2_LAST(writer->sData->lastKey)) { - } else { - ASSERTS(0, "timestamp should be in ascending order"); } } @@ -830,6 +843,7 @@ int32_t tsdbSttFileWriteTSDataBlock(SSttFileWriter *writer, SBlockData *bdata) { int32_t code = 0; int32_t lino = 0; + // TODO: optimize here SRowInfo row[1]; row->suid = bdata->suid; for (int32_t i = 0; i < bdata->nRow; i++) { @@ -854,17 +868,16 @@ int32_t tsdbSttFileWriteTombRecord(SSttFileWriter *writer, const STombRecord *re if (!writer->ctx->opened) { code = tsdbSttFWriterDoOpen(writer); return code; - } + } else { + if (writer->bData->nRow > 0) { + code = tsdbSttFileDoWriteTSDataBlock(writer); + TSDB_CHECK_CODE(code, lino, _exit); + } - // end time-series data write - if (writer->bData->nRow > 0) { - code = tsdbSttFileDoWriteTSDataBlock(writer); - TSDB_CHECK_CODE(code, lino, _exit); - } - - if (STATIS_BLOCK_SIZE(writer->sData) > 0) { - code = tsdbSttFileDoWriteStatisBlock(writer); - TSDB_CHECK_CODE(code, lino, _exit); + if (STATIS_BLOCK_SIZE(writer->sData) > 0) { + code = tsdbSttFileDoWriteStatisBlock(writer); + TSDB_CHECK_CODE(code, lino, _exit); + } } // write SDelRecord