From bb0f20f6e69d5b21f2f2143bc78c8538945b48ef Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Tue, 14 Jun 2022 03:38:20 +0000 Subject: [PATCH] more work --- source/dnode/vnode/src/inc/tsdb.h | 3 +- source/dnode/vnode/src/tsdb/tsdbCommit.c | 38 +++++++++++----------- source/dnode/vnode/src/tsdb/tsdbMemTable.c | 14 +++++++- source/dnode/vnode/src/tsdb/tsdbUtil.c | 15 +++++++++ 4 files changed, 49 insertions(+), 21 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index cb4c04eec5..bd6d6ff856 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -59,7 +59,7 @@ void tsdbGetTbDataFromMemTable(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t // STbDataIter int32_t tsdbTbDataIterCreate(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, STbDataIter **ppIter); void *tsdbTbDataIterDestroy(STbDataIter *pIter); -void tsdbTbDataIterOpen(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, STbDataIter *pIter); +bool tsdbTbDataIterOpen(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, STbDataIter *pIter); bool tsdbTbDataIterNext(STbDataIter *pIter); bool tsdbTbDataIterGet(STbDataIter *pIter, TSDBROW *pRow); @@ -172,6 +172,7 @@ int32_t tPutBlockIdx(uint8_t *p, SBlockIdx *pBlockIdx); int32_t tGetBlockIdx(uint8_t *p, SBlockIdx *pBlockIdx); // SBlock +int32_t tBlockCmprFn(const void *p1, const void *p2); // SDelIdx int32_t tDelIdxClear(SDelIdx *pDelIdx); diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 46dd2e1c89..7fb337a7fe 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -39,6 +39,8 @@ struct SCommitter { SMapData oBlockIdx; SMapData nBlockIdx; // commit table data + STbDataIter iter; + STbDataIter *pIter; SBlockIdx *pBlockIdx; SMapData oBlock; SMapData nBlock; @@ -503,7 +505,8 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) { } } - if (pTbData && pTbData->sl.size == 0) { + if (pTbData && !tsdbTbDataIterOpen(pTbData, &(TSDBKEY){.ts = pCommitter->minKey, .version = 0}), 0, + &pCommitter->iter) { pTbData = NULL; } @@ -613,26 +616,23 @@ static int32_t tsdbCommitTableDataImpl(SCommitter *pCommitter) { if (code) goto _err; } - // merge loop - for (;;) { - tsdbTbDataIterGet(pIter, pRow); - - code = tsdbColDataBlockAppend(&pCommitter->nColDataBlock, pRow, pTSchema); - if (code) goto _err; - - if (pCommitter->nColDataBlock.nRow >= pCommitter->maxRow) { - code = tsdbWriteColDataBlock(pCommitter->pWriter, &pCommitter->nColDataBlock, NULL); - if (code) goto _err; - - tsdbColDataBlockReset(&pCommitter->nColDataBlock); - } - - if (!tsdbTbDataIterNext(pIter)) break; + if (iBlock < nBlock) { + pBlock = █ + } else { + pBlock = NULL; } - if (pCommitter->nColDataBlock.nRow) { - code = tsdbWriteColDataBlock(pCommitter->pWriter, &pCommitter->nColDataBlock, NULL); - if (code) goto _err; + tsdbTbDataIterGet(pIter, pRow); + + // loop to merge memory data and disk data + for (; pBlock == NULL || (pRow && pRow->pTSRow->ts <= pCommitter->maxKey);) { + if (pRow == NULL || pRow->pTSRow->ts > pCommitter->maxKey) { + // only has block data, then move to new index file + } else if (0) { + // only commit memory data + } else { + // merge memory and block data + } } tsdbTbDataIterDestroy(pIter); diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index 04990f6396..007c78cae5 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -211,9 +211,15 @@ void *tsdbTbDataIterDestroy(STbDataIter *pIter) { return NULL; } -void tsdbTbDataIterOpen(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, STbDataIter *pIter) { +bool tsdbTbDataIterOpen(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, STbDataIter *pIter) { SMemSkipListNode *pos[SL_MAX_LEVEL]; + SMemSkipListNode *pHead; + SMemSkipListNode *pTail; + if (pTbData == NULL) return false; + + pHead = pTbData->sl.pHead; + pTail = pTbData->sl.pTail; pIter->pTbData = pTbData; pIter->backward = backward; if (pFrom == NULL) { @@ -233,6 +239,12 @@ void tsdbTbDataIterOpen(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, STbDa pIter->pNode = SL_NODE_FORWARD(pos[0], 0); } } + + if ((backward && pIter->pNode == pHead) || (!backward && pIter->pNode == pTail)) { + return false; + } + + return true; } bool tsdbTbDataIterNext(STbDataIter *pIter) { diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index e378d7dd5a..0f399a8be8 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -484,6 +484,21 @@ int32_t tGetBlockIdx(uint8_t *p, SBlockIdx *pBlockIdx) { return n; } +// SBlock ====================================================== +int32_t tBlockCmprFn(const void *p1, const void *p2) { + int32_t c; + SBlock *pBlock1 = (SBlock *)p1; + SBlock *pBlock2 = (SBlock *)p2; + + if (tsdbKeyCmprFn(&pBlock1->maxKey, &pBlock2->minKey) < 0) { + return -1; + } else if (tsdbKeyCmprFn(&pBlock1->minKey, &pBlock2->maxKey) > 0) { + return 1 + } + + return 0; +} + // SDelIdx ====================================================== int32_t tDelIdxClear(SDelIdx *pDelIdx) { int32_t code = 0;