From ebc3236c835e5ed65e63d7743c0adfc917ddef18 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Wed, 1 Jun 2022 10:34:17 +0000 Subject: [PATCH] feat: vnode multi-version --- source/dnode/vnode/CMakeLists.txt | 1 - source/dnode/vnode/src/tsdb/tsdbCommit.c | 12 + source/dnode/vnode/src/tsdb/tsdbCommit2.c | 28 --- source/dnode/vnode/src/tsdb/tsdbMemTable2.c | 246 +++++++------------- 4 files changed, 101 insertions(+), 186 deletions(-) delete mode 100644 source/dnode/vnode/src/tsdb/tsdbCommit2.c diff --git a/source/dnode/vnode/CMakeLists.txt b/source/dnode/vnode/CMakeLists.txt index f23ead85df..253723ba10 100644 --- a/source/dnode/vnode/CMakeLists.txt +++ b/source/dnode/vnode/CMakeLists.txt @@ -36,7 +36,6 @@ target_sources( # tsdb "src/tsdb/tsdbCommit.c" - "src/tsdb/tsdbCommit2.c" "src/tsdb/tsdbFile.c" "src/tsdb/tsdbFS.c" "src/tsdb/tsdbOpen.c" diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 0a85cb4638..26bf7f7568 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -88,6 +88,18 @@ static void tsdbLoadAndMergeFromCache(STsdb *pTsdb, SDataCols *pDataCols, int *i SDataCols *pTarget, TSKEY maxKey, int maxRows, int8_t update); int tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf); +int tsdbBegin(STsdb *pTsdb) { + if (!pTsdb) return 0; + + STsdbMemTable *pMem; + + if (tsdbMemTableCreate(pTsdb, &pTsdb->mem) < 0) { + return -1; + } + + return 0; +} + int tsdbApplyRtnOnFSet(STsdb *pRepo, SDFileSet *pSet, SRtn *pRtn) { SDiskID did; SDFileSet nSet = {0}; diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit2.c b/source/dnode/vnode/src/tsdb/tsdbCommit2.c deleted file mode 100644 index 844cfc094b..0000000000 --- a/source/dnode/vnode/src/tsdb/tsdbCommit2.c +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#include "tsdb.h" - -int tsdbBegin(STsdb *pTsdb) { - if (!pTsdb) return 0; - - STsdbMemTable *pMem; - - if (tsdbMemTableCreate(pTsdb, &pTsdb->mem) < 0) { - return -1; - } - - return 0; -} diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable2.c b/source/dnode/vnode/src/tsdb/tsdbMemTable2.c index 94c88a14ff..d1c26f085b 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable2.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable2.c @@ -52,6 +52,8 @@ struct SMemTable { SArray *pArray; // SArray }; +#define SL_MAX_LEVEL 5 + #define SL_NODE_SIZE(l) (sizeof(SMemSkipListNode) + sizeof(SMemSkipListNode *) * (l)*2) #define SL_NODE_HALF_SIZE(l) (sizeof(SMemSkipListNode) + sizeof(SMemSkipListNode *) * (l)) #define SL_NODE_FORWARD(n, l) ((n)->forwards[l]) @@ -66,6 +68,9 @@ static int memDataPCmprFn(const void *p1, const void *p2); static int32_t tPutTSDBRow(uint8_t *p, TSDBROW *pRow); static int32_t tGetTSDBRow(uint8_t *p, TSDBROW *pRow); static int8_t tsdbMemSkipListRandLevel(SMemSkipList *pSl); +static void memDataMovePos(SMemData *pMemData, TSDBROW *pRow, int8_t isForward, SMemSkipListNode **pos); +static int32_t memDataPutRow(SVBufPool *pPool, SMemData *pMemData, TSDBROW *pRow, int8_t isForward, + SMemSkipListNode **pos); // SMemTable ============================================== int32_t tsdbMemTableCreate2(STsdb *pTsdb, SMemTable **ppMemTable) { @@ -109,6 +114,7 @@ int32_t tsdbInsertTableData2(STsdb *pTsdb, int64_t version, SVSubmitBlk *pSubmit TSDBROW row = {.version = version}; ASSERT(pMemTable); + ASSERT(pSubmitBlk->nData > 0); { // check if table exists (todo) @@ -122,38 +128,29 @@ int32_t tsdbInsertTableData2(STsdb *pTsdb, int64_t version, SVSubmitBlk *pSubmit // do insert int32_t nt; - uint8_t *pt; int32_t n = 0; uint8_t *p = pSubmitBlk->pData; - SVBufPool *pPool = pTsdb->pVnode->inUse; - int8_t level; - SMemSkipListNode *pNode; + int32_t nRow = 0; + SMemSkipListNode *pos[SL_MAX_LEVEL] = {0}; + + for (int8_t iLevel = 0; iLevel < SL_MAX_LEVEL; iLevel++) { + pos[iLevel] = pMemData->sl.pTail; + } while (n < pSubmitBlk->nData) { nt = tGetTSRow(p + n, &row.tsRow); n += nt; ASSERT(n <= pSubmitBlk->nData); - // build the node - level = tsdbMemSkipListRandLevel(&pMemData->sl); - pNode = (SMemSkipListNode *)vnodeBufPoolMalloc(pPool, SL_NODE_SIZE(level) + nt + sizeof(version)); - if (pNode == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; + memDataMovePos(pMemData, &row, nRow ? 1 : 0, pos); + code = memDataPutRow(pTsdb->pVnode->inUse, pMemData, &row, nRow ? 1 : 0, pos); + if (code) { goto _err; } - pNode->level = level; - tPutTSDBRow((uint8_t *)SL_NODE_DATA(pNode), &row); - // put the node (todo) - - // set info - if (tsdbKeyCmprFn(&row, &pMemData->minKey) < 0) pMemData->minKey = *(TSDBKEY *)&row; - if (tsdbKeyCmprFn(&row, &pMemData->maxKey) > 0) pMemData->maxKey = *(TSDBKEY *)&row; + nRow++; } - if (tsdbKeyCmprFn(&pMemTable->minKey, &pMemData->minKey) < 0) pMemTable->minKey = pMemData->minKey; - if (tsdbKeyCmprFn(&pMemTable->maxKey, &pMemData->maxKey) > 0) pMemTable->maxKey = pMemData->maxKey; - return code; _err: @@ -313,41 +310,81 @@ static FORCE_INLINE int8_t tsdbMemSkipListRandLevel(SMemSkipList *pSl) { return level; } +static void memDataMovePos(SMemData *pMemData, TSDBROW *pRow, int8_t isForward, SMemSkipListNode **pos) { + TSDBKEY *pKey; + int c; + + if (isForward) { + for (int8_t iLevel = pMemData->sl.maxLevel - 1; iLevel >= 0; iLevel--) { + if (iLevel < pMemData->sl.level) { + SMemSkipListNode *px = pos[iLevel]; + SMemSkipListNode *p = SL_NODE_FORWARD(px, iLevel); + + while (p != pMemData->sl.pTail) { + pKey = (TSDBKEY *)SL_NODE_DATA(p); + + c = tsdbKeyCmprFn(pKey, pRow); + if (c >= 0) { + break; + } else { + px = p; + p = SL_NODE_FORWARD(px, iLevel); + } + } + + pos[iLevel] = px; + } + } + } else { + for (int8_t iLevel = pMemData->sl.maxLevel - 1; iLevel >= 0; iLevel--) { + if (iLevel < pMemData->sl.level) { + SMemSkipListNode *px = pos[iLevel]; + SMemSkipListNode *p = SL_NODE_BACKWARD(px, iLevel); + + while (p != pMemData->sl.pHead) { + pKey = (TSDBKEY *)SL_NODE_DATA(p); + + c = tsdbKeyCmprFn(pKey, pRow); + if (c <= 0) { + break; + } else { + px = p; + p = SL_NODE_BACKWARD(px, iLevel); + } + } + + pos[iLevel] = px; + } + } + } +} + +static int32_t memDataPutRow(SVBufPool *pPool, SMemData *pMemData, TSDBROW *pRow, int8_t isForward, + SMemSkipListNode **pos) { + int32_t code = 0; + int8_t level; + SMemSkipListNode *pNode; + + level = tsdbMemSkipListRandLevel(&pMemData->sl); + pNode = (SMemSkipListNode *)vnodeBufPoolMalloc(pPool, SL_NODE_SIZE(level) + tPutTSDBRow(NULL, pRow)); + if (pNode == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + + // do the read put + if (isForward) { + // TODO + } else { + // TODO + } + +_exit: + return code; +} + #if 0 //==================================================================================== -#define SL_MAX_LEVEL 5 - -struct SMemSkipListCurosr { - SMemSkipList *pSl; - SMemSkipListNode *pNodes[SL_MAX_LEVEL]; -}; - -typedef struct { - int64_t version; - uint32_t szRow; - const STSRow *pRow; -} STsdbRow; - -#define HASH_BUCKET(SUID, UID, NBUCKET) (TABS((SUID) + (UID)) % (NBUCKET)) - -#define SL_HEAD_NODE(sl) ((sl)->pHead) -#define SL_TAIL_NODE(sl) ((SMemSkipListNode *)&SL_NODE_FORWARD(SL_HEAD_NODE(sl), (sl)->maxLevel)) -#define SL_HEAD_NODE_FORWARD(n, l) SL_NODE_FORWARD(n, l) -#define SL_TAIL_NODE_BACKWARD(n, l) SL_NODE_FORWARD(n, l) - -static int8_t tsdbMemSkipListRandLevel(SMemSkipList *pSl); -static int32_t tsdbEncodeRow(SEncoder *pEncoder, const STsdbRow *pRow); -static int32_t tsdbDecodeRow(SDecoder *pDecoder, STsdbRow *pRow); -static int32_t tsdbMemSkipListCursorCreate(int8_t maxLevel, SMemSkipListCurosr **ppSlc); -static void tsdbMemSkipListCursorDestroy(SMemSkipListCurosr *pSlc); -static void tsdbMemSkipListCursorInit(SMemSkipListCurosr *pSlc, SMemSkipList *pSl); -static void tsdbMemSkipListCursorPut(SMemSkipListCurosr *pSlc, SMemSkipListNode *pNode); -static int32_t tsdbMemSkipListCursorMoveTo(SMemSkipListCurosr *pSlc, int64_t version, TSKEY ts, int32_t flags); -static void tsdbMemSkipListCursorMoveToFirst(SMemSkipListCurosr *pSlc); -static void tsdbMemSkipListCursorMoveToLast(SMemSkipListCurosr *pSlc); -static int32_t tsdbMemSkipListCursorMoveToNext(SMemSkipListCurosr *pSlc); -static int32_t tsdbMemSkipListCursorMoveToPrev(SMemSkipListCurosr *pSlc); -static SMemSkipListNode *tsdbMemSkipListNodeCreate(SVBufPool *pPool, SMemSkipList *pSl, const STsdbRow *pTRow); // SMemTable ======================== int32_t tsdbInsertData2(SMemTable *pMemTb, int64_t version, const SVSubmitBlk *pSubmitBlk) { @@ -460,28 +497,6 @@ int32_t tsdbInsertData2(SMemTable *pMemTb, int64_t version, const SVSubmitBlk *p return 0; } -static FORCE_INLINE int32_t tsdbEncodeRow(SEncoder *pEncoder, const STsdbRow *pRow) { - if (tEncodeI64(pEncoder, pRow->version) < 0) return -1; - if (tEncodeBinary(pEncoder, (const uint8_t *)pRow->pRow, pRow->szRow) < 0) return -1; - return 0; -} - -static FORCE_INLINE int32_t tsdbDecodeRow(SDecoder *pDecoder, STsdbRow *pRow) { - if (tDecodeI64(pDecoder, &pRow->version) < 0) return -1; - if (tDecodeBinary(pDecoder, (uint8_t **)&pRow->pRow, &pRow->szRow) < 0) return -1; - return 0; -} - -static int32_t tsdbMemSkipListCursorCreate(int8_t maxLevel, SMemSkipListCurosr **ppSlc) { - *ppSlc = (SMemSkipListCurosr *)taosMemoryCalloc(1, sizeof(**ppSlc) + sizeof(SMemSkipListNode *) * maxLevel); - if (*ppSlc == NULL) { - return -1; - } - return 0; -} - -static void tsdbMemSkipListCursorDestroy(SMemSkipListCurosr *pSlc) { taosMemoryFree(pSlc); } - static void tsdbMemSkipListCursorInit(SMemSkipListCurosr *pSlc, SMemSkipList *pSl) { SMemSkipListNode *pHead = SL_HEAD_NODE(pSl); pSlc->pSl = pSl; @@ -490,87 +505,4 @@ static void tsdbMemSkipListCursorInit(SMemSkipListCurosr *pSlc, SMemSkipList *pS // } } -static void tsdbMemSkipListCursorPut(SMemSkipListCurosr *pSlc, SMemSkipListNode *pNode) { - SMemSkipList *pSl = pSlc->pSl; - SMemSkipListNode *pNodeNext; - - for (int8_t iLevel = 0; iLevel < pNode->level; iLevel++) { - // todo - - ASSERT(0); - } - - if (pSl->level < pNode->level) { - pSl->level = pNode->level; - } - - pSl->size += 1; -} - -static int32_t tsdbMemSkipListCursorMoveTo(SMemSkipListCurosr *pSlc, int64_t version, TSKEY ts, int32_t flags) { - SMemSkipListNode **pForwards = NULL; - SMemSkipList *pSl = pSlc->pSl; - int8_t maxLevel = pSl->maxLevel; - SMemSkipListNode *pHead = SL_HEAD_NODE(pSl); - SMemSkipListNode *pTail = SL_TAIL_NODE(pSl); - - if (pSl->size == 0) { - for (int8_t iLevel = 0; iLevel < pSl->maxLevel; iLevel++) { - pForwards[iLevel] = pHead; - } - } - - return 0; -} - -static void tsdbMemSkipListCursorMoveToFirst(SMemSkipListCurosr *pSlc) { - SMemSkipList *pSl = pSlc->pSl; - SMemSkipListNode *pHead = SL_HEAD_NODE(pSl); - - for (int8_t iLevel = 0; iLevel < pSl->maxLevel; iLevel++) { - pSlc->pNodes[iLevel] = pHead; - } - - tsdbMemSkipListCursorMoveToNext(pSlc); -} - -static void tsdbMemSkipListCursorMoveToLast(SMemSkipListCurosr *pSlc) { - SMemSkipList *pSl = pSlc->pSl; - SMemSkipListNode *pTail = SL_TAIL_NODE(pSl); - - for (int8_t iLevel = 0; iLevel < pSl->maxLevel; iLevel++) { - pSlc->pNodes[iLevel] = pTail; - } - - tsdbMemSkipListCursorMoveToPrev(pSlc); -} - -static int32_t tsdbMemSkipListCursorMoveToNext(SMemSkipListCurosr *pSlc) { - // TODO - return 0; -} - -static int32_t tsdbMemSkipListCursorMoveToPrev(SMemSkipListCurosr *pSlc) { - // TODO - return 0; -} - -static SMemSkipListNode *tsdbMemSkipListNodeCreate(SVBufPool *pPool, SMemSkipList *pSl, const STsdbRow *pTRow) { - int32_t tsize; - int32_t ret; - int8_t level = tsdbMemSkipListRandLevel(pSl); - SMemSkipListNode *pNode = NULL; - SEncoder ec = {0}; - - tEncodeSize(tsdbEncodeRow, pTRow, tsize, ret); - pNode = vnodeBufPoolMalloc(pPool, tsize + SL_NODE_SIZE(level)); - if (pNode) { - pNode->level = level; - tEncoderInit(&ec, (uint8_t *)SL_NODE_DATA(pNode), tsize); - tsdbEncodeRow(&ec, pTRow); - tEncoderClear(&ec); - } - - return pNode; -} #endif \ No newline at end of file