diff --git a/source/dnode/vnode/CMakeLists.txt b/source/dnode/vnode/CMakeLists.txt index ba01a57fea..8dc3f46ae3 100644 --- a/source/dnode/vnode/CMakeLists.txt +++ b/source/dnode/vnode/CMakeLists.txt @@ -39,8 +39,6 @@ target_sources( # tsdb "src/tsdb/tsdbCommit.c" - "src/tsdb/tsdbCommit2.c" - "src/tsdb/tsdbMerge.c" "src/tsdb/tsdbFile.c" "src/tsdb/tsdbFS.c" "src/tsdb/tsdbOpen.c" diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit2.c b/source/dnode/vnode/src/tsdb/dev/tsdbCommit2.c similarity index 53% rename from source/dnode/vnode/src/tsdb/tsdbCommit2.c rename to source/dnode/vnode/src/tsdb/dev/tsdbCommit2.c index 77495e59ac..04a29fe76e 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit2.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbCommit2.c @@ -18,7 +18,6 @@ typedef struct { STsdb *pTsdb; // config - int64_t commitID; int32_t minutes; int8_t precision; int32_t minRow; @@ -27,11 +26,12 @@ typedef struct { int8_t sttTrigger; SArray *aTbDataP; // context - TSKEY nextKey; // reset by each table commit + TSKEY nextKey; int32_t fid; int32_t expLevel; TSKEY minKey; TSKEY maxKey; + int64_t cid; // commit id SSkmInfo skmTable; SSkmInfo skmRow; SBlockData bData; @@ -48,13 +48,89 @@ static int32_t tsdbRowIsDeleted(SCommitter *pCommitter, TSDBROW *pRow) { static int32_t tsdbCommitTimeSeriesData(SCommitter *pCommitter) { int32_t code = 0; - int32_t lino = 0; + int32_t lino; - // TODO + SMemTable *pMem = pCommitter->pTsdb->imem; + + if (pMem->nRow == 0) goto _exit; + + for (int32_t iTbData = 0; iTbData < taosArrayGetSize(pCommitter->aTbDataP); iTbData++) { + STbData *pTbData = (STbData *)taosArrayGetP(pCommitter->aTbDataP, iTbData); + + // TODO: prepare commit next table + + STbDataIter iter; + TSDBKEY from = {.ts = pCommitter->minKey, .version = VERSION_MIN}; + tsdbTbDataIterOpen(pTbData, &from, 0, &iter); + + for (TSDBROW *pRow; (pRow = tsdbTbDataIterGet(&iter)) != NULL; tsdbTbDataIterNext(&iter)) { + TSDBKEY rowKey = TSDBROW_KEY(pRow); + + if (rowKey.ts > pCommitter->maxKey) { + pCommitter->nextKey = TMIN(rowKey.ts, pCommitter->nextKey); + break; + } + + if (pRow->type == TSDBROW_ROW_FMT) { + // code = tsdbUpdateSkmInfo(&pCommitter->skmRow, pTbData->suid, pTbData->uid, TSDBROW_SVERSION(pRow)); + TSDB_CHECK_CODE(code, lino, _exit); + } + + code = tBlockDataAppendRow(&pCommitter->bData, pRow, pCommitter->skmRow.pTSchema, pTbData->uid); + TSDB_CHECK_CODE(code, lino, _exit); + + if (pCommitter->bData.nRow >= pCommitter->maxRow) { + // code = tsdbWriteSttBlock(pCommitter); + TSDB_CHECK_CODE(code, lino, _exit); + + tBlockDataClear(&pCommitter->bData); + } + } + } _exit: if (code) { tsdbError("vgId:%d failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), lino, tstrerror(code)); + } else { + tsdbDebug("vgId:%d %s done, fid:%d nRow:%" PRId64, TD_VID(pCommitter->pTsdb->pVnode), __func__, pCommitter->fid, + pMem->nRow); + } + return code; +} + +static int32_t tsdbCommitTombstoneData(SCommitter *pCommitter) { + int32_t code = 0; + int32_t lino; + + SMemTable *pMem = pCommitter->pTsdb->imem; + + if (pMem->nDel == 0) goto _exit; + + for (int32_t iTbData = 0; iTbData < taosArrayGetSize(pCommitter->aTbDataP); iTbData++) { + STbData *pTbData = (STbData *)taosArrayGetP(pCommitter->aTbDataP, iTbData); + + if (pTbData->pHead == NULL) continue; + + for (SDelData *pDelData = pTbData->pHead; pDelData; pDelData = pDelData->pNext) { + if (pDelData->sKey > pCommitter->maxKey || pDelData->eKey < pCommitter->minKey) continue; + + // code = tsdbAppendDelData(pCommitter, pTbData->suid, pTbData->uid, TMAX(pDelData->sKey, pCommitter->minKey), + // TMIN(pDelData->eKey, pCommitter->maxKey), pDelData->version); + TSDB_CHECK_CODE(code, lino, _exit); + + if (/* TODO */ 0 > pCommitter->maxRow) { + // code = tsdbWriteDelBlock(pCommitter); + TSDB_CHECK_CODE(code, lino, _exit); + } + } + } + +_exit: + if (code) { + tsdbError("vgId:%d failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), lino, tstrerror(code)); + } else { + tsdbDebug("vgId:%d %s done, fid:%d nDel:%" PRId64, TD_VID(pCommitter->pTsdb->pVnode), __func__, pCommitter->fid, + pMem->nDel); } return code; } @@ -72,19 +148,62 @@ _exit: return code; } -static int32_t tsdbCommitNextFSet(SCommitter *pCommitter, int8_t *done) { +static int32_t tsdbCommitFSetStart(SCommitter *pCommitter) { + int32_t code = 0; + int32_t lino = 0; + + pCommitter->fid = tsdbKeyFid(pCommitter->nextKey, pCommitter->minutes, pCommitter->precision); + tsdbFidKeyRange(pCommitter->fid, pCommitter->minutes, pCommitter->precision, &pCommitter->minKey, + &pCommitter->maxKey); + pCommitter->expLevel = tsdbFidLevel(pCommitter->fid, &pCommitter->pTsdb->keepCfg, taosGetTimestampSec()); +#if 0 + // pCommitter->cid = tsdbFileSetNextCid(STsdb * pTsdb, pCommitter->fid); +#else + pCommitter->cid = 0; +#endif + + // TODO + +_exit: + if (code) { + tsdbError("vgId:%d failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), lino, tstrerror(code)); + } + return code; +} + +static int32_t tsdbCommitFSetEnd(SCommitter *pCommitter) { + int32_t code = 0; + int32_t lino = 0; + + // TODO + +_exit: + if (code) { + tsdbError("vgId:%d failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), lino, tstrerror(code)); + } + return code; +} + +static int32_t tsdbCommitNextFSet(SCommitter *pCommitter) { int32_t code = 0; int32_t lino = 0; STsdb *pTsdb = pCommitter->pTsdb; - // fset commit start (TODO) + // fset commit start + code = tsdbCommitFSetStart(pCommitter); + TSDB_CHECK_CODE(code, lino, _exit); // commit fset code = tsdbCommitTimeSeriesData(pCommitter); TSDB_CHECK_CODE(code, lino, _exit); - // fset commit end (TODO) + code = tsdbCommitTombstoneData(pCommitter); + TSDB_CHECK_CODE(code, lino, _exit); + + // fset commit end + code = tsdbCommitFSetEnd(pCommitter); + TSDB_CHECK_CODE(code, lino, _exit); _exit: if (code) { @@ -140,12 +259,11 @@ int32_t tsdbCommitBegin(STsdb *pTsdb, SCommitInfo *pInfo) { tsdbUnrefMemTable(pMem, NULL, true); } else { SCommitter committer; - int8_t done = 0; code = tsdbCommitterOpen(pTsdb, pInfo, &committer); TSDB_CHECK_CODE(code, lino, _exit); - while (!done && (code = tsdbCommitNextFSet(&committer, &done))) { + while (committer.nextKey != TSKEY_MAX && (code = tsdbCommitNextFSet(&committer))) { } code = tsdbCommitterClose(&committer, code); diff --git a/source/dnode/vnode/src/tsdb/tsdbMerge.c b/source/dnode/vnode/src/tsdb/dev/tsdbMerge.c similarity index 100% rename from source/dnode/vnode/src/tsdb/tsdbMerge.c rename to source/dnode/vnode/src/tsdb/dev/tsdbMerge.c diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbReaderWriter2.c b/source/dnode/vnode/src/tsdb/dev/tsdbReaderWriter2.c new file mode 100644 index 0000000000..1f78d23580 --- /dev/null +++ b/source/dnode/vnode/src/tsdb/dev/tsdbReaderWriter2.c @@ -0,0 +1,63 @@ +#include "tsdb.h" + +typedef struct SSttFWriter SSttFWriter; +typedef struct SSttFReader SSttFReader; + +extern int32_t tsdbOpenFile(const char *path, int32_t szPage, int32_t flag, STsdbFD **ppFD); +extern void tsdbCloseFile(STsdbFD **ppFD); +struct SSttFWriter { + STsdb *pTsdb; + STsdbFD *pFd; + SSttFile file; +}; + +int32_t tsdbSttFWriterOpen(STsdb *pTsdb, SSttFile *pSttFile, SSttFWriter **ppWritter) { + int32_t code = 0; + int32_t lino = 0; + + int32_t szPage = pTsdb->pVnode->config.tsdbPageSize; + int32_t flag = TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC; // TODO + + ppWritter[0] = taosMemoryCalloc(1, sizeof(SSttFWriter)); + if (ppWritter[0] == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + + ppWritter[0]->pTsdb = pTsdb; + ppWritter[0]->file = pSttFile[0]; + + code = tsdbOpenFile(NULL, szPage, flag, &ppWritter[0]->pFd); + TSDB_CHECK_CODE(code, lino, _exit); + +_exit: + if (code) { + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); + } + return 0; +} + +int32_t tsdbSttFWriterClose(SSttFWriter *pWritter) { + // TODO + return 0; +} + +int32_t tsdbWriteSttBlockData(SSttFWriter *pWritter, SBlockData *pBlockData, SSttBlk *pSttBlk) { + // TODO + return 0; +} + +int32_t tsdbWriteSttBlockIdx(SSttFWriter *pWriter, SArray *aSttBlk) { + // TODO + return 0; +} + +int32_t tsdbWriteSttDelData(SSttFWriter *pWriter) { + // TODO + return 0; +} + +int32_t tsdbWriteSttDelIdx(SSttFWriter *pWriter) { + // TODO + return 0; +} \ No newline at end of file diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index 50fd9d7aa7..75d12a3b32 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -16,7 +16,7 @@ #include "tsdb.h" // =============== PAGE-WISE FILE =============== -static int32_t tsdbOpenFile(const char *path, int32_t szPage, int32_t flag, STsdbFD **ppFD) { +int32_t tsdbOpenFile(const char *path, int32_t szPage, int32_t flag, STsdbFD **ppFD) { int32_t code = 0; STsdbFD *pFD = NULL; @@ -68,7 +68,7 @@ _exit: return code; } -static void tsdbCloseFile(STsdbFD **ppFD) { +void tsdbCloseFile(STsdbFD **ppFD) { STsdbFD *pFD = *ppFD; if (pFD) { taosMemoryFree(pFD->pBuf);