From ae9698e1c71b8509b291145b68094dac07121039 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Thu, 2 Jun 2022 12:52:16 +0000 Subject: [PATCH] feat: vnode multi-version --- source/dnode/vnode/src/inc/tsdb.h | 2 + source/dnode/vnode/src/tsdb/tsdbCommit.c | 107 +++++++------- source/dnode/vnode/src/tsdb/tsdbMemTable2.c | 154 ++++++++------------ 3 files changed, 120 insertions(+), 143 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index a62b4c4409..ab23b7506e 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -43,6 +43,8 @@ typedef struct SMemTable SMemTable; int32_t tsdbMemTableCreate2(STsdb *pTsdb, SMemTable **ppMemTable); void tsdbMemTableDestroy2(SMemTable *pMemTable); +int32_t tsdbInsertTableData2(STsdb *pTsdb, int64_t version, SVSubmitBlk *pSubmitBlk); +int32_t tsdbDeleteTableData2(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey); // tsdbMemTable ================ typedef struct STsdbRow STsdbRow; diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 86929fe6d5..9e5a5a751b 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -87,6 +87,7 @@ static bool tsdbCanAddSubBlock(SCommitH *pCommith, SBlock *pBlock, SMergeInfo *p static void tsdbLoadAndMergeFromCache(STsdb *pTsdb, SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget, TSKEY maxKey, int maxRows, int8_t update); int tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf); +static int tsdbApplyRtnOnFSet(STsdb *pRepo, SDFileSet *pSet, SRtn *pRtn); int tsdbBegin(STsdb *pTsdb) { if (!pTsdb) return 0; @@ -100,57 +101,6 @@ int tsdbBegin(STsdb *pTsdb) { return 0; } -int tsdbApplyRtnOnFSet(STsdb *pRepo, SDFileSet *pSet, SRtn *pRtn) { - SDiskID did; - SDFileSet nSet = {0}; - STsdbFS *pfs = REPO_FS(pRepo); - int level; - - ASSERT(pSet->fid >= pRtn->minFid); - - level = tsdbGetFidLevel(pSet->fid, pRtn); - - if (tfsAllocDisk(pRepo->pVnode->pTfs, level, &did) < 0) { - terrno = TSDB_CODE_TDB_NO_AVAIL_DISK; - return -1; - } - - if (did.level > TSDB_FSET_LEVEL(pSet)) { - // Need to move the FSET to higher level - tsdbInitDFileSet(pRepo, &nSet, did, pSet->fid, FS_TXN_VERSION(pfs)); - - if (tsdbCopyDFileSet(pSet, &nSet) < 0) { - tsdbError("vgId:%d, failed to copy FSET %d from level %d to level %d since %s", REPO_ID(pRepo), pSet->fid, - TSDB_FSET_LEVEL(pSet), did.level, tstrerror(terrno)); - return -1; - } - - if (tsdbUpdateDFileSet(pfs, &nSet) < 0) { - return -1; - } - - tsdbInfo("vgId:%d, FSET %d is copied from level %d disk id %d to level %d disk id %d", REPO_ID(pRepo), pSet->fid, - TSDB_FSET_LEVEL(pSet), TSDB_FSET_ID(pSet), did.level, did.id); - } else { - // On a correct level - if (tsdbUpdateDFileSet(pfs, pSet) < 0) { - return -1; - } - } - - return 0; -} - -int tsdbPrepareCommit(STsdb *pTsdb) { - if (pTsdb->mem == NULL) return 0; - - ASSERT(pTsdb->imem == NULL); - - pTsdb->imem = pTsdb->mem; - pTsdb->mem = NULL; - return 0; -} - int tsdbCommit(STsdb *pRepo) { SCommitH commith = {0}; SDFileSet *pSet = NULL; @@ -223,6 +173,57 @@ int tsdbCommit(STsdb *pRepo) { return 0; } +static int tsdbApplyRtnOnFSet(STsdb *pRepo, SDFileSet *pSet, SRtn *pRtn) { + SDiskID did; + SDFileSet nSet = {0}; + STsdbFS *pfs = REPO_FS(pRepo); + int level; + + ASSERT(pSet->fid >= pRtn->minFid); + + level = tsdbGetFidLevel(pSet->fid, pRtn); + + if (tfsAllocDisk(pRepo->pVnode->pTfs, level, &did) < 0) { + terrno = TSDB_CODE_TDB_NO_AVAIL_DISK; + return -1; + } + + if (did.level > TSDB_FSET_LEVEL(pSet)) { + // Need to move the FSET to higher level + tsdbInitDFileSet(pRepo, &nSet, did, pSet->fid, FS_TXN_VERSION(pfs)); + + if (tsdbCopyDFileSet(pSet, &nSet) < 0) { + tsdbError("vgId:%d, failed to copy FSET %d from level %d to level %d since %s", REPO_ID(pRepo), pSet->fid, + TSDB_FSET_LEVEL(pSet), did.level, tstrerror(terrno)); + return -1; + } + + if (tsdbUpdateDFileSet(pfs, &nSet) < 0) { + return -1; + } + + tsdbInfo("vgId:%d, FSET %d is copied from level %d disk id %d to level %d disk id %d", REPO_ID(pRepo), pSet->fid, + TSDB_FSET_LEVEL(pSet), TSDB_FSET_ID(pSet), did.level, did.id); + } else { + // On a correct level + if (tsdbUpdateDFileSet(pfs, pSet) < 0) { + return -1; + } + } + + return 0; +} + +int tsdbPrepareCommit(STsdb *pTsdb) { + if (pTsdb->mem == NULL) return 0; + + ASSERT(pTsdb->imem == NULL); + + pTsdb->imem = pTsdb->mem; + pTsdb->mem = NULL; + return 0; +} + void tsdbGetRtnSnap(STsdb *pRepo, SRtn *pRtn) { STsdbKeepCfg *pCfg = REPO_KEEP_CFG(pRepo); TSKEY minKey, midKey, maxKey, now; @@ -543,8 +544,8 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid return -1; } - tsdbDebug("vgId:%d, FSET %d at level %d disk id %d is opened to read to commit", REPO_ID(pRepo), TSDB_FSET_FID(pSet), - TSDB_FSET_LEVEL(pSet), TSDB_FSET_ID(pSet)); + tsdbDebug("vgId:%d, FSET %d at level %d disk id %d is opened to read to commit", REPO_ID(pRepo), + TSDB_FSET_FID(pSet), TSDB_FSET_LEVEL(pSet), TSDB_FSET_ID(pSet)); } else { pCommith->isRFileSet = false; } diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable2.c b/source/dnode/vnode/src/tsdb/tsdbMemTable2.c index f4d4d94142..3bad5fdd92 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable2.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable2.c @@ -64,9 +64,10 @@ static int memDataPCmprFn(const void *p1, const void *p2); static int32_t tPutTSDBRow(uint8_t *p, TSDBROW *pRow); static int32_t tGetTSDBRow(uint8_t *p, TSDBROW *pRow); static int8_t tsdbMemSkipListRandLevel(SMemSkipList *pSl); -static void memDataMovePos(SMemData *pMemData, TSDBROW *pRow, int8_t isForward, SMemSkipListNode **pos); -static int32_t memDataPutRow(SVBufPool *pPool, SMemData *pMemData, TSDBROW *pRow, int8_t isForward, - SMemSkipListNode **pos); +static void memDataGetPosToPut(SMemData *pMemData, SMemSkipListNode **backward, TSDBROW *pRow); +static int32_t memDataDoPut(SMemData *pMemData, SMemSkipListNode **pos, TSDBROW *pRow, int8_t isForward); +static int32_t tsdbInsertTableDataImpl(SMemTable *pMemTable, SMemData *pMemData, int64_t version, + SVSubmitBlk *pSubmitBlk); // SMemTable ============================================== int32_t tsdbMemTableCreate2(STsdb *pTsdb, SMemTable **ppMemTable) { @@ -123,28 +124,9 @@ int32_t tsdbInsertTableData2(STsdb *pTsdb, int64_t version, SVSubmitBlk *pSubmit } // do insert - int32_t nt; - int32_t n = 0; - uint8_t *p = pSubmitBlk->pData; - int32_t nRow = 0; - SMemSkipListNode *pos[SL_MAX_LEVEL] = {0}; - - for (int8_t iLevel = 0; iLevel < SL_MAX_LEVEL; iLevel++) { - pos[iLevel] = pMemData->sl.pTail; - } - while (n < pSubmitBlk->nData) { - nt = tGetTSRow(p + n, &row.tsRow); - n += nt; - - ASSERT(n <= pSubmitBlk->nData); - - memDataMovePos(pMemData, &row, nRow ? 1 : 0, pos); - code = memDataPutRow(pTsdb->pVnode->inUse, pMemData, &row, nRow ? 1 : 0, pos); - if (code) { - goto _err; - } - - nRow++; + code = tsdbInsertTableDataImpl(pMemTable, pMemData, version, pSubmitBlk); + if (code) { + goto _err; } return code; @@ -310,87 +292,79 @@ static FORCE_INLINE int8_t tsdbMemSkipListRandLevel(SMemSkipList *pSl) { return level; } -static void memDataMovePos(SMemData *pMemData, TSDBROW *pRow, int8_t isForward, SMemSkipListNode **pos) { - TSDBKEY *pKey; - int c; +static int32_t tsdbInsertTableDataImpl(SMemTable *pMemTable, SMemData *pMemData, int64_t version, + SVSubmitBlk *pSubmitBlk) { + int32_t code = 0; + int32_t n = 0; + uint8_t *p = pSubmitBlk->pData; + int32_t nRow = 0; + TSDBROW row = {.version = version}; + + SMemSkipListNode *backward[SL_MAX_LEVEL]; + SMemSkipListNode *forward[SL_MAX_LEVEL]; + + ASSERT(pSubmitBlk->nData); + + // backward put first data + n += tGetTSRow(p + n, &row.tsRow); + ASSERT(n <= pSubmitBlk->nData); + + memDataGetPosToPut(pMemData, backward, &row); + code = memDataDoPut(pMemData, backward, &row, 0); + if (code) { + goto _exit; + } + nRow++; + + // forward put rest + while (n < pSubmitBlk->nData) { + n += tGetTSRow(p + n, &row.tsRow); + ASSERT(n <= pSubmitBlk->nData); - if (isForward) { // TODO - } else { - SMemSkipListNode *px = pMemData->sl.pTail; + nRow++; + } + +_exit: + return code; +} + +static void memDataGetPosToPut(SMemData *pMemData, SMemSkipListNode **backward, TSDBROW *pRow) { + SMemSkipListNode *px; + SMemSkipListNode *pn; + TSDBKEY *pKey; + int c; + + for (int8_t iLevel = 0; iLevel < pMemData->sl.maxLevel - 1; iLevel++) { + backward[iLevel] = pMemData->sl.pTail; + } + + if (0) { + } else { + px = pMemData->sl.pTail; for (int8_t iLevel = pMemData->sl.maxLevel - 1; iLevel >= 0; iLevel--) { if (iLevel < pMemData->sl.level) { - SMemSkipListNode *p = SL_NODE_BACKWARD(px, iLevel); - - while (p != pMemData->sl.pHead) { - pKey = (TSDBKEY *)SL_NODE_DATA(p); + pn = SL_NODE_BACKWARD(px, iLevel); + while (pn != pMemData->sl.pHead) { + pKey = (TSDBKEY *)SL_NODE_DATA(pn); c = tsdbKeyCmprFn(pKey, pRow); if (c <= 0) { break; } else { - px = p; - p = SL_NODE_BACKWARD(px, iLevel); + px = pn; + pn = SL_NODE_BACKWARD(px, iLevel); } } - - pos[iLevel] = px; } + backward[iLevel] = px; } } } -static void memMovePosFrom(SMemData *pMemData, SMemSkipListNode *pNode, TSDBROW *pRow, int8_t isForward, - SMemSkipListNode **pos) { - SMemSkipListNode *px = pNode; - TSDBKEY *pKey; - SMemSkipListNode *p; - int c; - - if (isForward) { - } else { - ASSERT(pNode != pMemData->sl.pHead); - - for (int8_t iLevel = pMemData->sl.maxLevel - 1; iLevel >= 0; iLevel--) { - p = SL_NODE_BACKWARD(px, iLevel); - while (p != pMemData->sl.pHead) { - pKey = (TSDBKEY *)SL_NODE_DATA(p); - - c = tsdbKeyCmprFn(pKey, pRow); - if (c <= 0) { - break; - } else { - px = p; - p = SL_NODE_BACKWARD(px, iLevel); - } - } - - pos[iLevel] = px; - } - } -} - -static int32_t memDataPutRow(SVBufPool *pPool, SMemData *pMemData, TSDBROW *pRow, int8_t isForward, - SMemSkipListNode **pos) { - int32_t code = 0; - int8_t level; - SMemSkipListNode *pNode; - - level = tsdbMemSkipListRandLevel(&pMemData->sl); - pNode = (SMemSkipListNode *)vnodeBufPoolMalloc(pPool, SL_NODE_SIZE(level) + tPutTSDBRow(NULL, pRow)); - if (pNode == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; - } - - // do the read put - if (isForward) { - // TODO - } else { - // TODO - } - -_exit: +static int32_t memDataDoPut(SMemData *pMemData, SMemSkipListNode **pos, TSDBROW *pRow, int8_t isForward) { + int32_t code = 0; + // TODO return code; } \ No newline at end of file