From 442586692b90c204683e1bf2b203da8ff8a32d8b Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Sat, 27 May 2023 19:01:52 +0800 Subject: [PATCH] more code --- .../dnode/vnode/src/tsdb/dev/inc/tsdbFile.h | 2 +- .../src/tsdb/dev/inc/tsdbSttFReaderWriter.h | 2 +- .../dnode/vnode/src/tsdb/dev/inc/tsdbUtil.h | 127 ++++++---- source/dnode/vnode/src/tsdb/dev/tsdbCommit.c | 6 +- source/dnode/vnode/src/tsdb/dev/tsdbFSet.c | 8 +- source/dnode/vnode/src/tsdb/dev/tsdbFile.c | 10 +- source/dnode/vnode/src/tsdb/dev/tsdbMerge.c | 6 +- .../vnode/src/tsdb/dev/tsdbSttFReaderWriter.c | 218 +++++++++--------- source/dnode/vnode/src/tsdb/dev/tsdbUtil.c | 99 ++++---- 9 files changed, 271 insertions(+), 207 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/dev/inc/tsdbFile.h b/source/dnode/vnode/src/tsdb/dev/inc/tsdbFile.h index fc40437088..885560d897 100644 --- a/source/dnode/vnode/src/tsdb/dev/inc/tsdbFile.h +++ b/source/dnode/vnode/src/tsdb/dev/inc/tsdbFile.h @@ -65,7 +65,7 @@ struct STFile { struct { int32_t level; int32_t nseg; - } stt; + } stt[1]; }; }; diff --git a/source/dnode/vnode/src/tsdb/dev/inc/tsdbSttFReaderWriter.h b/source/dnode/vnode/src/tsdb/dev/inc/tsdbSttFReaderWriter.h index 4a5c148751..2935455bad 100644 --- a/source/dnode/vnode/src/tsdb/dev/inc/tsdbSttFReaderWriter.h +++ b/source/dnode/vnode/src/tsdb/dev/inc/tsdbSttFReaderWriter.h @@ -64,7 +64,7 @@ typedef struct SSttFileWriterConfig SSttFileWriterConfig; int32_t tsdbSttFWriterOpen(const SSttFileWriterConfig *config, SSttFileWriter **writer); int32_t tsdbSttFWriterClose(SSttFileWriter **writer, int8_t abort, STFileOp *op); -int32_t tsdbSttFWriteTSData(SSttFileWriter *writer, SRowInfo *pRowInfo); +int32_t tsdbSttFWriteTSData(SSttFileWriter *writer, SRowInfo *row); int32_t tsdbSttFWriteTSDataBlock(SSttFileWriter *writer, SBlockData *pBlockData); int32_t tsdbSttFWriteDLData(SSttFileWriter *writer, TABLEID *tbid, SDelData *pDelData); diff --git a/source/dnode/vnode/src/tsdb/dev/inc/tsdbUtil.h b/source/dnode/vnode/src/tsdb/dev/inc/tsdbUtil.h index ed4de45736..4cd2101dd5 100644 --- a/source/dnode/vnode/src/tsdb/dev/inc/tsdbUtil.h +++ b/source/dnode/vnode/src/tsdb/dev/inc/tsdbUtil.h @@ -23,57 +23,100 @@ extern "C" { #endif // SDelBlock ---------- -typedef struct SDelBlock SDelBlock; -typedef struct SDelBlk SDelBlk; -int32_t tDelBlockCreate(SDelBlock *pDelBlock, int32_t capacity); -int32_t tDelBlockDestroy(SDelBlock *pDelBlock); -int32_t tDelBlockClear(SDelBlock *pDelBlock); -int32_t tDelBlockAppend(SDelBlock *pDelBlock, const TABLEID *tbid, const SDelData *pDelData); +typedef union { + int64_t aData[5]; + struct { + int64_t suid; + int64_t uid; + int64_t version; + int64_t skey; + int64_t ekey; + }; +} SDelRecord; + +typedef union { + TARRAY2(int64_t) aData[5]; + struct { + TARRAY2(int64_t) aSuid[1]; + TARRAY2(int64_t) aUid[1]; + TARRAY2(int64_t) aVer[1]; + TARRAY2(int64_t) aSkey[1]; + TARRAY2(int64_t) aEkey[1]; + }; +} SDelBlock; + +typedef struct SDelBlk { + int32_t nRow; + TABLEID minTid; + TABLEID maxTid; + int64_t minVer; + int64_t maxVer; + SFDataPtr dp; +} SDelBlk; + +#define DEL_BLOCK_SIZE(db) TARRAY2_SIZE((db)->aSuid) + +int32_t tDelBlockInit(SDelBlock *delBlock); +int32_t tDelBlockFree(SDelBlock *delBlock); +int32_t tDelBlockClear(SDelBlock *delBlock); +int32_t tDelBlockPut(SDelBlock *delBlock, const SDelRecord *delRecord); +int32_t tDelBlockEncode(SDelBlock *delBlock, void *buf, int32_t size); +int32_t tDelBlockDecode(const void *buf, SDelBlock *delBlock); // STbStatisBlock ---------- -typedef struct STbStatisBlock STbStatisBlock; -typedef struct STbStatisBlk STbStatisBlk; +typedef union { + int64_t aData[9]; + struct { + int64_t suid; + int64_t uid; + int64_t firstKey; + int64_t firstVer; + int64_t lastKey; + int64_t lastVer; + int64_t minVer; + int64_t maxVer; + int64_t count; + }; +} STbStatisRecord; -int32_t tTbStatisBlockCreate(STbStatisBlock *pTbStatisBlock, int32_t capacity); -int32_t tTbStatisBlockDestroy(STbStatisBlock *pTbStatisBlock); -int32_t tTbStatisBlockClear(STbStatisBlock *pTbStatisBlock); +typedef union { + TARRAY2(int64_t) aData[9]; + struct { + TARRAY2(int64_t) suid[1]; + TARRAY2(int64_t) uid[1]; + TARRAY2(int64_t) firstKey[1]; + TARRAY2(int64_t) firstVer[1]; + TARRAY2(int64_t) lastKey[1]; + TARRAY2(int64_t) lastVer[1]; + TARRAY2(int64_t) minVer[1]; + TARRAY2(int64_t) maxVer[1]; + TARRAY2(int64_t) aCount[1]; + }; +} STbStatisBlock; + +typedef struct STbStatisBlk { + int32_t numRec; + TABLEID minTid; + TABLEID maxTid; + int64_t minVer; + int64_t maxVer; + SFDataPtr dp; +} STbStatisBlk; + +#define STATIS_BLOCK_SIZE(db) TARRAY2_SIZE((db)->suid) + +int32_t tStatisBlockInit(STbStatisBlock *statisBlock); +int32_t tStatisBlockFree(STbStatisBlock *statisBlock); +int32_t tStatisBlockClear(STbStatisBlock *statisBlock); +int32_t tStatisBlockPut(STbStatisBlock *statisBlock, const STbStatisRecord *statisRecord); +int32_t tStatisBlockEncode(STbStatisBlock *statisBlock, void *buf, int32_t size); +int32_t tStatisBlockDecode(const void *buf, STbStatisBlock *statisBlock); // other apis int32_t tsdbUpdateSkmTb(STsdb *pTsdb, const TABLEID *tbid, SSkmInfo *pSkmTb); int32_t tsdbUpdateSkmRow(STsdb *pTsdb, const TABLEID *tbid, int32_t sver, SSkmInfo *pSkmRow); -/* Exposed Structs */ -// -struct SDelBlock { - int32_t capacity; - int32_t nRow; - int64_t *aData[5]; // [suid, uid, version, skey, ekey -}; - -struct SDelBlk { - int32_t nRow; - TABLEID minTid; - TABLEID maxTid; - int64_t minVer; - int64_t maxVer; - SFDataPtr dp; -}; - -struct STbStatisBlock { - int32_t capacity; - int32_t nRow; - int64_t *aData[7]; // [suid, uid, skey, sver, ekey, ever, count] -}; -struct STbStatisBlk { - int32_t nRow; - TABLEID minTid; - TABLEID maxTid; - int64_t minVer; - int64_t maxVer; - SFDataPtr dp; -}; - #ifdef __cplusplus } #endif diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbCommit.c b/source/dnode/vnode/src/tsdb/dev/tsdbCommit.c index 08b77c8587..ac5ee17d8c 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbCommit.c @@ -70,8 +70,8 @@ static int32_t open_writer_with_new_stt(SCommitter *pCommitter) { config.file.fid = pCommitter->fid; config.file.cid = pCommitter->eid; config.file.size = 0; - config.file.stt.level = 0; - config.file.stt.nseg = 0; + config.file.stt->level = 0; + config.file.stt->nseg = 0; code = tsdbSttFWriterOpen(&config, &pCommitter->pWriter); TSDB_CHECK_CODE(code, lino, _exit); @@ -126,7 +126,7 @@ static int32_t open_committer_writer(SCommitter *pCommitter) { ASSERT(TARRAY2_SIZE(&lvl0->farr) > 0); STFileObj *fobj = TARRAY2_LAST(&lvl0->farr); - if (fobj->f.stt.nseg >= pCommitter->sttTrigger) { + if (fobj->f.stt->nseg >= pCommitter->sttTrigger) { return open_writer_with_new_stt(pCommitter); } else { return open_writer_with_exist_stt(pCommitter, &fobj->f); diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbFSet.c b/source/dnode/vnode/src/tsdb/dev/tsdbFSet.c index d4cf89b144..849ac7a663 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbFSet.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbFSet.c @@ -267,9 +267,9 @@ int32_t tsdbTFileSetEdit(STsdb *pTsdb, STFileSet *fset, const STFileOp *op) { if (code) return code; if (fobj->f.type == TSDB_FTYPE_STT) { - SSttLvl *lvl = tsdbTFileSetGetLvl(fset, fobj->f.stt.level); + SSttLvl *lvl = tsdbTFileSetGetLvl(fset, fobj->f.stt->level); if (!lvl) { - code = tsdbSttLvlInit(fobj->f.stt.level, &lvl); + code = tsdbSttLvlInit(fobj->f.stt->level, &lvl); if (code) return code; code = TARRAY2_SORT_INSERT(&fset->lvlArr, lvl, tsdbSttLvlCmprFn); @@ -285,7 +285,7 @@ int32_t tsdbTFileSetEdit(STsdb *pTsdb, STFileSet *fset, const STFileOp *op) { } else if (op->optype == TSDB_FOP_REMOVE) { // delete a file if (op->of.type == TSDB_FTYPE_STT) { - SSttLvl *lvl = tsdbTFileSetGetLvl(fset, op->of.stt.level); + SSttLvl *lvl = tsdbTFileSetGetLvl(fset, op->of.stt->level); ASSERT(lvl); STFileObj tfobj = {.f = {.cid = op->of.cid}}; @@ -305,7 +305,7 @@ int32_t tsdbTFileSetEdit(STsdb *pTsdb, STFileSet *fset, const STFileOp *op) { } } else { if (op->nf.type == TSDB_FTYPE_STT) { - SSttLvl *lvl = tsdbTFileSetGetLvl(fset, op->of.stt.level); + SSttLvl *lvl = tsdbTFileSetGetLvl(fset, op->of.stt->level); ASSERT(lvl); STFileObj tfobj = {.f = {.cid = op->of.cid}}, *tfobjp = &tfobj; diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbFile.c b/source/dnode/vnode/src/tsdb/dev/tsdbFile.c index fdb3a881d8..7330e413c2 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbFile.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbFile.c @@ -130,12 +130,12 @@ static int32_t stt_to_json(const STFile *file, cJSON *json) { if (code) return code; /* lvl */ - if (cJSON_AddNumberToObject(json, "level", file->stt.level) == NULL) { + if (cJSON_AddNumberToObject(json, "level", file->stt->level) == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } /* nseg */ - if (cJSON_AddNumberToObject(json, "nseg", file->stt.nseg) == NULL) { + if (cJSON_AddNumberToObject(json, "nseg", file->stt->nseg) == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } @@ -155,7 +155,7 @@ static int32_t stt_from_json(const cJSON *json, STFile *file) { /* lvl */ item = cJSON_GetObjectItem(json, "level"); if (cJSON_IsNumber(item)) { - file->stt.level = item->valuedouble; + file->stt->level = item->valuedouble; } else { return TSDB_CODE_FILE_CORRUPTED; } @@ -163,7 +163,7 @@ static int32_t stt_from_json(const cJSON *json, STFile *file) { /* nseg */ item = cJSON_GetObjectItem(json, "nseg"); if (cJSON_IsNumber(item)) { - file->stt.nseg = item->valuedouble; + file->stt->nseg = item->valuedouble; } else { return TSDB_CODE_FILE_CORRUPTED; } @@ -290,7 +290,7 @@ bool tsdbIsSameTFile(const STFile *f1, const STFile *f2) { bool tsdbIsTFileChanged(const STFile *f1, const STFile *f2) { if (f1->size != f2->size) return true; - if (f1->type == TSDB_FTYPE_STT && f1->stt.nseg != f2->stt.nseg) return true; + if (f1->type == TSDB_FTYPE_STT && f1->stt->nseg != f2->stt->nseg) return true; return false; } diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbMerge.c b/source/dnode/vnode/src/tsdb/dev/tsdbMerge.c index b4176d9526..d21823bd92 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbMerge.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbMerge.c @@ -189,7 +189,7 @@ static int32_t tsdbMergeFileSetBegin(SMerger *merger) { } fobj = TARRAY2_GET(&lvl->farr, 0); - if (fobj->f.stt.nseg < merger->tsdb->pVnode->config.sttTrigger) { + if (fobj->f.stt->nseg < merger->tsdb->pVnode->config.sttTrigger) { merger->ctx.toData = false; break; } else { @@ -249,7 +249,7 @@ static int32_t tsdbMergeFileSetBegin(SMerger *merger) { .fid = fset->fid, .cid = merger->cid, .size = 0, - .stt = {.level = merger->ctx.level, .nseg = 0}, + .stt = {{.level = merger->ctx.level, .nseg = 0}}, }, }; code = tsdbSttFWriterOpen(&config, &merger->sttWriter); @@ -362,7 +362,7 @@ int32_t tsdbMerge(STsdb *tsdb) { fobj = TARRAY2_GET(&lvl0->farr, 0); - if (fobj->f.stt.nseg >= sttTrigger) { + if (fobj->f.stt->nseg >= sttTrigger) { code = tsdbMergeFileSet(&merger, fset); TSDB_CHECK_CODE(code, lino, _exit); } diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbSttFReaderWriter.c b/source/dnode/vnode/src/tsdb/dev/tsdbSttFReaderWriter.c index 7c76abde0a..9a08302ef6 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbSttFReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbSttFReaderWriter.c @@ -122,7 +122,7 @@ int32_t tsdbSttFReaderOpen(const SSttFileReaderConfig *config, SSttFileReader ** size = segReader->footer->prevFooter; } - ASSERT(TARRAY2_SIZE(&reader[0]->segReaderArray) == config->file.stt.nseg); + ASSERT(TARRAY2_SIZE(&reader[0]->segReaderArray) == config->file.stt->nseg); _exit: if (code) { @@ -250,9 +250,9 @@ struct SSttFileWriter { SSttFileWriterConfig config[1]; struct { bool opened; - } ctx; + } ctx[1]; // file - STFile file; + STFile file[1]; // data TSttBlkArray sttBlkArray[1]; TDelBlkArray delBlkArray[1]; @@ -262,8 +262,8 @@ struct SSttFileWriter { SDelBlock dData[1]; STbStatisBlock sData[1]; // helper data - SSkmInfo skmTb; - SSkmInfo skmRow; + SSkmInfo skmTb[1]; + SSkmInfo skmRow[1]; int32_t aBufSize[5]; uint8_t *aBuf[5]; STsdbFD *fd; @@ -292,15 +292,15 @@ static int32_t tsdbSttFileDoWriteTSDataBlock(SSttFileWriter *writer) { code = tCmprBlockData(writer->bData, writer->config->cmprAlg, NULL, NULL, writer->config->aBuf, writer->aBufSize); TSDB_CHECK_CODE(code, lino, _exit); - sttBlk->bInfo.offset = writer->file.size; + sttBlk->bInfo.offset = writer->file->size; sttBlk->bInfo.szKey = writer->aBufSize[2] + writer->aBufSize[3]; sttBlk->bInfo.szBlock = writer->aBufSize[0] + writer->aBufSize[1] + sttBlk->bInfo.szKey; for (int32_t i = 3; i >= 0; i--) { if (writer->aBufSize[i]) { - code = tsdbWriteFile(writer->fd, writer->file.size, writer->config->aBuf[i], writer->aBufSize[i]); + code = tsdbWriteFile(writer->fd, writer->file->size, writer->config->aBuf[i], writer->aBufSize[i]); TSDB_CHECK_CODE(code, lino, _exit); - writer->file.size += writer->aBufSize[i]; + writer->file->size += writer->aBufSize[i]; } } tBlockDataClear(writer->bData); @@ -317,37 +317,38 @@ _exit: } static int32_t tsdbSttFileDoWriteStatisBlock(SSttFileWriter *writer) { - if (writer->sData->nRow == 0) return 0; + if (STATIS_BLOCK_SIZE(writer->sData)) return 0; int32_t code = 0; int32_t lino = 0; - STbStatisBlk statisBlk[1]; + STbStatisBlk statisBlk[1] = {{ + .numRec = STATIS_BLOCK_SIZE(writer->sData), + .minTid = {.suid = TARRAY2_FIRST(writer->sData->suid), .uid = TARRAY2_FIRST(writer->sData->uid)}, + .maxTid = {.suid = TARRAY2_LAST(writer->sData->suid), .uid = TARRAY2_LAST(writer->sData->uid)}, + // .minVer = TARRAY2_FIRST(writer->sData->aVer), + // .maxVer = TARRAY2_FIRST(writer->sData->aVer), + }}; - statisBlk->nRow = writer->sData->nRow; - statisBlk->minTid.suid = writer->sData->aData[0][0]; - statisBlk->minTid.uid = writer->sData->aData[1][0]; - statisBlk->maxTid.suid = writer->sData->aData[0][writer->sData->nRow - 1]; - statisBlk->maxTid.uid = writer->sData->aData[1][writer->sData->nRow - 1]; - statisBlk->minVer = statisBlk->maxVer = statisBlk->maxVer = writer->sData->aData[2][0]; - for (int32_t iRow = 1; iRow < writer->sData->nRow; iRow++) { - if (statisBlk->minVer > writer->sData->aData[2][iRow]) statisBlk->minVer = writer->sData->aData[2][iRow]; - if (statisBlk->maxVer < writer->sData->aData[2][iRow]) statisBlk->maxVer = writer->sData->aData[2][iRow]; - } + // statisBlk->minVer = statisBlk->maxVer = statisBlk->maxVer = writer->sData->aData[2][0]; + // for (int32_t iRow = 1; iRow < writer->sData->nRow; iRow++) { + // if (statisBlk->minVer > writer->sData->aData[2][iRow]) statisBlk->minVer = writer->sData->aData[2][iRow]; + // if (statisBlk->maxVer < writer->sData->aData[2][iRow]) statisBlk->maxVer = writer->sData->aData[2][iRow]; + // } - statisBlk->dp.offset = writer->file.size; - statisBlk->dp.size = 0; + // statisBlk->dp.offset = writer->file->size; + // statisBlk->dp.size = 0; - // TODO: add compression here - int64_t tsize = sizeof(int64_t) * writer->sData->nRow; - for (int32_t i = 0; i < ARRAY_SIZE(writer->sData->aData); i++) { - code = tsdbWriteFile(writer->fd, writer->file.size, (const uint8_t *)writer->sData->aData[i], tsize); - TSDB_CHECK_CODE(code, lino, _exit); + // // TODO: add compression here + // int64_t tsize = sizeof(int64_t) * writer->sData->nRow; + // for (int32_t i = 0; i < ARRAY_SIZE(writer->sData->aData); i++) { + // code = tsdbWriteFile(writer->fd, writer->file->size, (const uint8_t *)writer->sData->aData[i], tsize); + // TSDB_CHECK_CODE(code, lino, _exit); - statisBlk->dp.size += tsize; - writer->file.size += tsize; - } - tTbStatisBlockClear(writer->sData); + // statisBlk->dp.size += tsize; + // writer->file->size += tsize; + // } + tStatisBlockClear(writer->sData); code = TARRAY2_APPEND_P(writer->statisBlkArray, statisBlk); TSDB_CHECK_CODE(code, lino, _exit); @@ -361,6 +362,7 @@ _exit: } static int32_t tsdbSttFileDoWriteDelBlock(SSttFileWriter *writer) { +#if 0 if (writer->dData->nRow == 0) return 0; int32_t code = 0; @@ -379,16 +381,16 @@ static int32_t tsdbSttFileDoWriteDelBlock(SSttFileWriter *writer) { if (delBlk->maxVer < writer->sData->aData[2][iRow]) delBlk->maxVer = writer->sData->aData[2][iRow]; } - delBlk->dp.offset = writer->file.size; + delBlk->dp.offset = writer->file->size; delBlk->dp.size = 0; // TODO int64_t tsize = sizeof(int64_t) * writer->dData->nRow; for (int32_t i = 0; i < ARRAY_SIZE(writer->dData->aData); i++) { - code = tsdbWriteFile(writer->fd, writer->file.size, (const uint8_t *)writer->dData->aData[i], tsize); + code = tsdbWriteFile(writer->fd, writer->file->size, (const uint8_t *)writer->dData->aData[i], tsize); TSDB_CHECK_CODE(code, lino, _exit); delBlk->dp.size += tsize; - writer->file.size += tsize; + writer->file->size += tsize; } tDelBlockDestroy(writer->dData); @@ -403,21 +405,23 @@ _exit: // tsdbTrace(); } return code; +#endif + return 0; } static int32_t tsdbSttFileDoWriteSttBlk(SSttFileWriter *writer) { int32_t code = 0; int32_t lino; - writer->footer->sttBlkPtr->offset = writer->file.size; + writer->footer->sttBlkPtr->offset = writer->file->size; writer->footer->sttBlkPtr->size = sizeof(SSttBlk) * TARRAY2_SIZE(writer->sttBlkArray); if (writer->footer->sttBlkPtr->size) { - code = tsdbWriteFile(writer->fd, writer->file.size, (const uint8_t *)TARRAY2_DATA(writer->sttBlkArray), + code = tsdbWriteFile(writer->fd, writer->file->size, (const uint8_t *)TARRAY2_DATA(writer->sttBlkArray), writer->footer->sttBlkPtr->size); TSDB_CHECK_CODE(code, lino, _exit); - writer->file.size += writer->footer->sttBlkPtr->size; + writer->file->size += writer->footer->sttBlkPtr->size; } _exit: @@ -432,14 +436,14 @@ static int32_t tsdbSttFileDoWriteStatisBlk(SSttFileWriter *writer) { int32_t code = 0; int32_t lino; - writer->footer->statisBlkPtr->offset = writer->file.size; + writer->footer->statisBlkPtr->offset = writer->file->size; writer->footer->statisBlkPtr->size = sizeof(STbStatisBlock) * TARRAY2_SIZE(writer->statisBlkArray); if (writer->footer->statisBlkPtr->size) { - code = tsdbWriteFile(writer->fd, writer->file.size, (const uint8_t *)TARRAY2_DATA(writer->statisBlkArray), + code = tsdbWriteFile(writer->fd, writer->file->size, (const uint8_t *)TARRAY2_DATA(writer->statisBlkArray), writer->footer->statisBlkPtr->size); TSDB_CHECK_CODE(code, lino, _exit); - writer->file.size += writer->footer->statisBlkPtr->size; + writer->file->size += writer->footer->statisBlkPtr->size; } _exit: @@ -454,14 +458,14 @@ static int32_t tsdbSttFileDoWriteDelBlk(SSttFileWriter *writer) { int32_t code = 0; int32_t lino; - writer->footer->delBlkPtr->offset = writer->file.size; + writer->footer->delBlkPtr->offset = writer->file->size; writer->footer->delBlkPtr->size = sizeof(SDelBlk) * TARRAY2_SIZE(writer->delBlkArray); if (writer->footer->delBlkPtr->size) { - code = tsdbWriteFile(writer->fd, writer->file.size, (const uint8_t *)TARRAY2_DATA(writer->delBlkArray), + code = tsdbWriteFile(writer->fd, writer->file->size, (const uint8_t *)TARRAY2_DATA(writer->delBlkArray), writer->footer->delBlkPtr->size); TSDB_CHECK_CODE(code, lino, _exit); - writer->file.size += writer->footer->delBlkPtr->size; + writer->file->size += writer->footer->delBlkPtr->size; } _exit: @@ -473,8 +477,9 @@ _exit: } static int32_t tsdbSttFileDoWriteFooter(SSttFileWriter *writer) { - int32_t code = tsdbWriteFile(writer->fd, writer->file.size, (const uint8_t *)&writer->footer, sizeof(writer->footer)); - writer->file.size += sizeof(writer->footer); + int32_t code = + tsdbWriteFile(writer->fd, writer->file->size, (const uint8_t *)&writer->footer, sizeof(writer->footer)); + writer->file->size += sizeof(writer->footer); return code; } @@ -484,39 +489,39 @@ static int32_t tsdbSttFWriterDoOpen(SSttFileWriter *writer) { int32_t vid = TD_VID(writer->config->tsdb->pVnode); // set - writer->file = writer->config->file; - writer->file.stt.nseg++; - if (!writer->config->skmTb) writer->config->skmTb = &writer->skmTb; - if (!writer->config->skmRow) writer->config->skmRow = &writer->skmRow; + writer->file[0] = writer->config->file; + writer->file->stt->nseg++; + if (!writer->config->skmTb) writer->config->skmTb = writer->skmTb; + if (!writer->config->skmRow) writer->config->skmRow = writer->skmRow; if (!writer->config->aBuf) writer->config->aBuf = writer->aBuf; // open file int32_t flag; char fname[TSDB_FILENAME_LEN]; - if (writer->file.size) { + if (writer->file->size) { flag = TD_FILE_READ | TD_FILE_WRITE; } else { flag = TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC; } - tsdbTFileName(writer->config->tsdb, &writer->file, fname); + tsdbTFileName(writer->config->tsdb, writer->file, fname); code = tsdbOpenFile(fname, writer->config->szPage, flag, &writer->fd); TSDB_CHECK_CODE(code, lino, _exit); - if (!writer->file.size) { + if (!writer->file->size) { uint8_t hdr[TSDB_FHDR_SIZE] = {0}; code = tsdbWriteFile(writer->fd, 0, hdr, sizeof(hdr)); TSDB_CHECK_CODE(code, lino, _exit); - writer->file.size += sizeof(hdr); + writer->file->size += sizeof(hdr); } _exit: if (code) { tsdbError("vgId:%d %s failed at line %d since %s", vid, __func__, lino, tstrerror(code)); } else { - writer->ctx.opened = true; + writer->ctx->opened = true; } return 0; } @@ -572,11 +577,11 @@ static int32_t tsdbSttFWriterCloseCommit(SSttFileWriter *writer, STFileOp *op) { tsdbCloseFile(&writer->fd); - ASSERT(writer->config->file.size > writer->file.size); + ASSERT(writer->config->file.size > writer->file->size); op->optype = writer->config->file.size ? TSDB_FOP_MODIFY : TSDB_FOP_CREATE; op->fid = writer->config->file.fid; op->of = writer->config->file; - op->nf = writer->file; + op->nf = writer->file[0]; _exit: if (code) { @@ -590,8 +595,8 @@ static int32_t tsdbSttFWriterCloseAbort(SSttFileWriter *writer) { tsdbTFileName(writer->config->tsdb, &writer->config->file, fname); if (writer->config->file.size) { // truncate the file to the original size - ASSERT(writer->config->file.size <= writer->file.size); - if (writer->config->file.size < writer->file.size) { + ASSERT(writer->config->file.size <= writer->file->size); + if (writer->config->file.size < writer->file->size) { taosFtruncateFile(writer->fd->pFD, writer->config->file.size); tsdbCloseFile(&writer->fd); } @@ -604,11 +609,11 @@ static int32_t tsdbSttFWriterCloseAbort(SSttFileWriter *writer) { } int32_t tsdbSttFWriterOpen(const SSttFileWriterConfig *config, SSttFileWriter **writer) { - writer[0] = taosMemoryMalloc(sizeof(*writer[0])); + writer[0] = taosMemoryCalloc(1, sizeof(*writer[0])); if (writer[0] == NULL) return TSDB_CODE_OUT_OF_MEMORY; writer[0]->config[0] = config[0]; - writer[0]->ctx.opened = false; + writer[0]->ctx->opened = false; return 0; } @@ -617,7 +622,7 @@ int32_t tsdbSttFWriterClose(SSttFileWriter **writer, int8_t abort, STFileOp *op) int32_t lino = 0; int32_t vid = TD_VID(writer[0]->config->tsdb->pVnode); - if (!writer[0]->ctx.opened) { + if (!writer[0]->ctx->opened) { op->optype = TSDB_FOP_NONE; } else { if (abort) { @@ -643,65 +648,65 @@ int32_t tsdbSttFWriteTSData(SSttFileWriter *writer, SRowInfo *row) { int32_t code = 0; int32_t lino = 0; - if (!writer->ctx.opened) { + if (!writer->ctx->opened) { code = tsdbSttFWriterDoOpen(writer); TSDB_CHECK_CODE(code, lino, _exit); } - TABLEID *tbid = (TABLEID *)row; TSDBROW *pRow = &row->row; TSDBKEY key = TSDBROW_KEY(pRow); - if (!TABLE_SAME_SCHEMA(writer->bData[0].suid, writer->bData[0].uid, tbid->suid, tbid->uid)) { - if (writer->bData[0].nRow > 0) { - code = tsdbSttFileDoWriteTSDataBlock(writer); - TSDB_CHECK_CODE(code, lino, _exit); - } + if (!TABLE_SAME_SCHEMA(writer->bData->suid, writer->bData->uid, row->suid, row->uid)) { + code = tsdbSttFileDoWriteTSDataBlock(writer); + TSDB_CHECK_CODE(code, lino, _exit); - if (writer->sData[0].nRow >= writer->config->maxRow) { + if (STATIS_BLOCK_SIZE(writer->sData) >= writer->config->maxRow) { code = tsdbSttFileDoWriteStatisBlock(writer); TSDB_CHECK_CODE(code, lino, _exit); } - writer->sData[0].aData[0][writer->sData[0].nRow] = tbid->suid; // suid - writer->sData[0].aData[1][writer->sData[0].nRow] = tbid->uid; // uid - writer->sData[0].aData[2][writer->sData[0].nRow] = key.ts; // skey - writer->sData[0].aData[3][writer->sData[0].nRow] = key.version; // sver - writer->sData[0].aData[4][writer->sData[0].nRow] = key.ts; // ekey - writer->sData[0].aData[5][writer->sData[0].nRow] = key.version; // ever - writer->sData[0].aData[6][writer->sData[0].nRow] = 1; // count - writer->sData[0].nRow++; - - code = tsdbUpdateSkmTb(writer->config->tsdb, tbid, writer->config->skmTb); + STbStatisRecord record[1] = {{ + .suid = row->suid, + .uid = row->uid, + .firstKey = key.ts, + .firstVer = key.version, + .lastKey = key.ts, + .lastVer = key.version, + .minVer = key.version, + .maxVer = key.version, + .count = 1, + }}; + code = tStatisBlockPut(writer->sData, record); TSDB_CHECK_CODE(code, lino, _exit); - TABLEID id = { - .suid = tbid->suid, - .uid = tbid->uid ? 0 : tbid->uid, - }; - code = tBlockDataInit(&writer->bData[0], &id, writer->config->skmTb->pTSchema, NULL, 0); + code = tsdbUpdateSkmTb(writer->config->tsdb, (TABLEID *)row, writer->config->skmTb); + TSDB_CHECK_CODE(code, lino, _exit); + + TABLEID id = {.suid = row->suid, .uid = row->suid ? 0 : row->uid}; + code = tBlockDataInit(writer->bData, &id, writer->config->skmTb->pTSchema, NULL, 0); TSDB_CHECK_CODE(code, lino, _exit); } if (row->row.type == TSDBROW_ROW_FMT) { - code = tsdbUpdateSkmRow(writer->config->tsdb, tbid, TSDBROW_SVERSION(pRow), writer->config->skmRow); + code = tsdbUpdateSkmRow(writer->config->tsdb, (TABLEID *)row, TSDBROW_SVERSION(pRow), writer->config->skmRow); TSDB_CHECK_CODE(code, lino, _exit); } - code = tBlockDataAppendRow(&writer->bData[0], pRow, writer->config->skmRow->pTSchema, tbid->uid); + code = tBlockDataAppendRow(writer->bData, pRow, writer->config->skmRow->pTSchema, row->uid); TSDB_CHECK_CODE(code, lino, _exit); - if (writer->bData[0].nRow >= writer->config->maxRow) { + if (writer->bData->nRow >= writer->config->maxRow) { code = tsdbSttFileDoWriteTSDataBlock(writer); TSDB_CHECK_CODE(code, lino, _exit); } - if (key.ts > writer->sData[0].aData[4][writer->sData[0].nRow - 1]) { - writer->sData[0].aData[4][writer->sData[0].nRow - 1] = key.ts; // ekey - writer->sData[0].aData[5][writer->sData[0].nRow - 1] = key.version; // ever - writer->sData[0].aData[6][writer->sData[0].nRow - 1]++; // count - } else if (key.ts == writer->sData[0].aData[4][writer->sData[0].nRow - 1]) { - writer->sData[0].aData[4][writer->sData[0].nRow - 1] = key.ts; // ekey - writer->sData[0].aData[5][writer->sData[0].nRow - 1] = key.version; // ever + TARRAY2_LAST(writer->sData->minVer) = TMIN(TARRAY2_LAST(writer->sData->minVer), key.version); + TARRAY2_LAST(writer->sData->maxVer) = TMAX(TARRAY2_LAST(writer->sData->maxVer), key.version); + if (key.ts > TARRAY2_LAST(writer->sData->lastKey)) { + TARRAY2_LAST(writer->sData->lastKey) = key.ts; + TARRAY2_LAST(writer->sData->lastVer) = key.version; + TARRAY2_LAST(writer->sData->aCount)++; + } else if (key.ts == TARRAY2_LAST(writer->sData->lastKey)) { + TARRAY2_LAST(writer->sData->lastVer) = key.version; } else { ASSERTS(0, "timestamp should be in ascending order"); } @@ -740,21 +745,22 @@ int32_t tsdbSttFWriteDLData(SSttFileWriter *writer, TABLEID *tbid, SDelData *pDe ASSERTS(0, "TODO: Not implemented yet"); int32_t code; - if (!writer->ctx.opened) { + if (!writer->ctx->opened) { code = tsdbSttFWriterDoOpen(writer); return code; } - writer->dData[0].aData[0][writer->dData[0].nRow] = tbid->suid; // suid - writer->dData[0].aData[1][writer->dData[0].nRow] = tbid->uid; // uid - writer->dData[0].aData[2][writer->dData[0].nRow] = pDelData->version; // version - writer->dData[0].aData[3][writer->dData[0].nRow] = pDelData->sKey; // skey - writer->dData[0].aData[4][writer->dData[0].nRow] = pDelData->eKey; // ekey - writer->dData[0].nRow++; + // writer->dData[0].aData[0][writer->dData[0].nRow] = tbid->suid; // suid + // writer->dData[0].aData[1][writer->dData[0].nRow] = tbid->uid; // uid + // writer->dData[0].aData[2][writer->dData[0].nRow] = pDelData->version; // version + // writer->dData[0].aData[3][writer->dData[0].nRow] = pDelData->sKey; // skey + // writer->dData[0].aData[4][writer->dData[0].nRow] = pDelData->eKey; // ekey + // writer->dData[0].nRow++; - if (writer->dData[0].nRow >= writer->config->maxRow) { - return tsdbSttFileDoWriteDelBlock(writer); - } else { - return 0; - } + // if (writer->dData[0].nRow >= writer->config->maxRow) { + // return tsdbSttFileDoWriteDelBlock(writer); + // } else { + // return 0; + // } + return 0; } \ No newline at end of file diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbUtil.c b/source/dnode/vnode/src/tsdb/dev/tsdbUtil.c index 8acb6a1d72..a709b52a4d 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbUtil.c @@ -16,67 +16,82 @@ #include "dev.h" // SDelBlock ---------- -int32_t tDelBlockCreate(SDelBlock *pDelBlock, int32_t capacity) { - int32_t code; - - memset(pDelBlock, 0, sizeof(*pDelBlock)); - pDelBlock->capacity = capacity; - for (int32_t i = 0; i < ARRAY_SIZE(pDelBlock->aData); ++i) { - if ((code = tRealloc((uint8_t **)&pDelBlock->aData[i], sizeof(int64_t) * capacity))) { - for (i--; i >= 0; --i) tFree(pDelBlock->aData[i]); - return code; - } - } - - return 0; -} - -int32_t tDelBlockDestroy(SDelBlock *pDelBlock) { - for (int32_t i = 0; i < ARRAY_SIZE(pDelBlock->aData); ++i) { - tFree(pDelBlock->aData[i]); +int32_t tDelBlockInit(SDelBlock *delBlock) { + for (int32_t i = 0; i < ARRAY_SIZE(delBlock->aData); ++i) { + TARRAY2_INIT(&delBlock->aData[i]); } return 0; } -int32_t tDelBlockClear(SDelBlock *pDelBlock) { - pDelBlock->nRow = 0; +int32_t tDelBlockFree(SDelBlock *delBlock) { + for (int32_t i = 0; i < ARRAY_SIZE(delBlock->aData); ++i) { + TARRAY2_FREE(&delBlock->aData[i]); + } return 0; } -int32_t tDelBlockAppend(SDelBlock *pDelBlock, const TABLEID *tbid, const SDelData *pDelData) { - ASSERT(pDelBlock->nRow < pDelBlock->capacity); - pDelBlock->aData[0][pDelBlock->nRow] = tbid->suid; - pDelBlock->aData[1][pDelBlock->nRow] = tbid->uid; - pDelBlock->aData[2][pDelBlock->nRow] = pDelData->version; - pDelBlock->aData[3][pDelBlock->nRow] = pDelData->sKey; - pDelBlock->aData[4][pDelBlock->nRow] = pDelData->eKey; - pDelBlock->nRow++; +int32_t tDelBlockClear(SDelBlock *delBlock) { + for (int32_t i = 0; i < ARRAY_SIZE(delBlock->aData); ++i) { + TARRAY2_CLEAR(&delBlock->aData[i], NULL); + } + return 0; +} + +int32_t tDelBlockPut(SDelBlock *delBlock, const SDelRecord *delRecord) { + for (int32_t i = 0; i < ARRAY_SIZE(delBlock->aData); ++i) { + int32_t code = TARRAY2_APPEND(&delBlock->aData[i], delRecord->aData[i]); + if (code) return code; + } + return 0; +} + +int32_t tDelBlockEncode(SDelBlock *delBlock, void *buf, int32_t size) { + // TODO + return 0; +} + +int32_t tDelBlockDecode(const void *buf, SDelBlock *delBlock) { + // TODO return 0; } // STbStatisBlock ---------- - -int32_t tTbStatisBlockCreate(STbStatisBlock *pTbStatisBlock, int32_t capacity) { - memset(pTbStatisBlock, 0, sizeof(*pTbStatisBlock)); - pTbStatisBlock->capacity = capacity; - for (int32_t i = 0; i < ARRAY_SIZE(pTbStatisBlock->aData); ++i) { - if (tRealloc((uint8_t **)&pTbStatisBlock->aData[i], sizeof(int64_t) * capacity)) { - for (i--; i >= 0; --i) tFree(pTbStatisBlock->aData[i]); - return TSDB_CODE_OUT_OF_MEMORY; - } +int32_t tStatisBlockInit(STbStatisBlock *statisBlock) { + for (int32_t i = 0; i < ARRAY_SIZE(statisBlock->aData); ++i) { + TARRAY2_INIT(&statisBlock->aData[i]); } return 0; } -int32_t tTbStatisBlockDestroy(STbStatisBlock *pTbStatisBlock) { - for (int32_t i = 0; i < ARRAY_SIZE(pTbStatisBlock->aData); ++i) { - tFree(pTbStatisBlock->aData[i]); +int32_t tStatisBlockFree(STbStatisBlock *statisBlock) { + for (int32_t i = 0; i < ARRAY_SIZE(statisBlock->aData); ++i) { + TARRAY2_FREE(&statisBlock->aData[i]); } return 0; } -int32_t tTbStatisBlockClear(STbStatisBlock *pTbStatisBlock) { - pTbStatisBlock->nRow = 0; +int32_t tStatisBlockClear(STbStatisBlock *statisBlock) { + for (int32_t i = 0; i < ARRAY_SIZE(statisBlock->aData); ++i) { + TARRAY2_CLEAR(&statisBlock->aData[i], NULL); + } + return 0; +} + +int32_t tStatisBlockPut(STbStatisBlock *statisBlock, const STbStatisRecord *statisRecord) { + for (int32_t i = 0; i < ARRAY_SIZE(statisBlock->aData); ++i) { + int32_t code = TARRAY2_APPEND(&statisBlock->aData[i], statisRecord->aData[i]); + if (code) return code; + } + return 0; +} + +int32_t tStatisBlockEncode(STbStatisBlock *statisBlock, void *buf, int32_t size) { + // TODO + return 0; +} + +int32_t tStatisBlockDecode(const void *buf, STbStatisBlock *statisBlock) { + // TODO return 0; }