From 80bd6409abbfc6472f1bd32dfc207ec2f1e60bcf Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Tue, 7 Jun 2022 08:49:25 +0000 Subject: [PATCH] refact --- source/dnode/vnode/src/inc/tsdb.h | 48 -- source/dnode/vnode/src/inc/vnodeInt.h | 14 +- source/dnode/vnode/src/tsdb/tsdbCommit.c | 83 ++- source/dnode/vnode/src/tsdb/tsdbMemTable2.c | 530 -------------------- 4 files changed, 59 insertions(+), 616 deletions(-) delete mode 100644 source/dnode/vnode/src/tsdb/tsdbMemTable2.c diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index ea1f38f37e..101f3fdc69 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -61,25 +61,6 @@ bool tsdbTbDataIterGet(STbDataIter *pIter, TSDBROW *pRow); int tsdbLoadDataFromCache(STsdb *pTsdb, STable *pTable, STbDataIter *pIter, TSKEY maxKey, int maxRowsToRead, SDataCols *pCols, TKEY *filterKeys, int nFilterKeys, bool keepDup, SMergeInfo *pMergeInfo); -// tsdbMemTable2.c ============================================================================================== -// typedef struct SMemTable2 SMemTable2; -// typedef struct SMemData SMemData; -// typedef struct SMemDataIter SMemDataIter; - -// int32_t tsdbMemTableCreate2(STsdb *pTsdb, SMemTable2 **ppMemTable); -// void tsdbMemTableDestroy2(SMemTable2 *pMemTable); -// int32_t tsdbInsertTableData2(STsdb *pTsdb, int64_t version, SVSubmitBlk *pSubmitBlk); -// int32_t tsdbDeleteTableData2(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey); - -// /* SMemDataIter */ -// void tsdbMemDataIterOpen(SMemData *pMemData, TSDBKEY *pKey, int8_t backward, SMemDataIter *pIter); -// bool tsdbMemDataIterNext(SMemDataIter *pIter); -// void tsdbMemDataIterGet(SMemDataIter *pIter, TSDBROW **ppRow); - -// // tsdbCommit2.c ============================================================================================== -// int32_t tsdbBegin2(STsdb *pTsdb); -// int32_t tsdbCommit2(STsdb *pTsdb); - // tsdbFile.c ============================================================================================== typedef int32_t TSDB_FILE_T; typedef struct SDFInfo SDFInfo; @@ -700,17 +681,6 @@ typedef struct { TSKEY eKey; } SDelInfo; -struct SMemTable2 { - STsdb *pTsdb; - int32_t nRef; - TSDBKEY minKey; - TSDBKEY maxKey; - int64_t nRows; - int64_t nDelOp; - SArray *aSkmInfo; - SArray *aMemData; -}; - static FORCE_INLINE int tsdbKeyCmprFn(const void *p1, const void *p2) { TSDBKEY *pKey1 = (TSDBKEY *)p1; TSDBKEY *pKey2 = (TSDBKEY *)p2; @@ -730,24 +700,6 @@ static FORCE_INLINE int tsdbKeyCmprFn(const void *p1, const void *p2) { return 0; } -struct SMemData { - tb_uid_t suid; - tb_uid_t uid; - TSDBKEY minKey; - TSDBKEY maxKey; - SDelOp *delOpHead; - SDelOp *delOpTail; - SMemSkipList sl; -}; - -struct SMemDataIter { - STbData *pMemData; - int8_t backward; - TSDBROW *pRow; - SMemSkipListNode *pNode; // current node - TSDBROW row; -}; - struct STbDataIter { STbData *pTbData; int8_t backward; diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 77005767fe..de2fc03f69 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -112,7 +112,7 @@ int32_t metaDropTSma(SMeta* pMeta, int64_t indexUid); // tsdb int tsdbOpen(SVnode* pVnode, STsdb** ppTsdb, const char* dir, STsdbKeepCfg* pKeepCfg); int tsdbClose(STsdb** pTsdb); -int tsdbBegin(STsdb* pTsdb); +int32_t tsdbBegin(STsdb* pTsdb); int32_t tsdbCommit(STsdb* pTsdb); int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, SSubmitReq* pMsg); int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSubmitRsp* pRsp); @@ -161,18 +161,6 @@ int32_t tdUpdateTbUidList(SSma* pSma, STbUidStore* pUidStore); void tdUidStoreDestory(STbUidStore* pStore); void* tdUidStoreFree(STbUidStore* pStore); -#if 0 -int32_t tsdbUpdateSmaWindow(STsdb* pTsdb, SSubmitReq* pMsg, int64_t version); -int32_t tsdbCreateTSma(STsdb* pTsdb, char* pMsg); -int32_t tsdbInsertTSmaData(STsdb* pTsdb, int64_t indexUid, const char* msg); -int32_t tsdbRegisterRSma(STsdb* pTsdb, SMeta* pMeta, SVCreateStbReq* pReq, SMsgCb* pMsgCb); -int32_t tsdbFetchTbUidList(STsdb* pTsdb, STbUidStore** ppStore, tb_uid_t suid, tb_uid_t uid); -int32_t tsdbUpdateTbUidList(STsdb* pTsdb, STbUidStore* pUidStore); -void tsdbUidStoreDestory(STbUidStore* pStore); -void* tsdbUidStoreFree(STbUidStore* pStore); -int32_t tsdbTriggerRSma(STsdb* pTsdb, void* pMsg, int32_t inputType); -#endif - typedef struct { int8_t streamType; // sma or other int8_t dstType; diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 865023e628..0a83fb126d 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -58,6 +58,9 @@ typedef struct { #define TSDB_COMMIT_DEFAULT_ROWS(ch) TSDB_DEFAULT_BLOCK_ROWS(TSDB_COMMIT_REPO(ch)->pVnode->config.tsdbCfg.maxRows) #define TSDB_COMMIT_TXN_VERSION(ch) FS_TXN_VERSION(REPO_FS(TSDB_COMMIT_REPO(ch))) +static int32_t tsdbCommitData(SCommitH *pCommith); +static int32_t tsdbCommitDel(SCommitH *pCommith); +static int32_t tsdbCommitCache(SCommitH *pCommith); static void tsdbStartCommit(STsdb *pRepo); static void tsdbEndCommit(STsdb *pTsdb, int eno); static int tsdbInitCommitH(SCommitH *pCommith, STsdb *pRepo); @@ -89,7 +92,7 @@ static void tsdbLoadAndMergeFromCache(STsdb *pTsdb, SDataCols *pDataCols, int *i static int tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf); static int tsdbApplyRtnOnFSet(STsdb *pRepo, SDFileSet *pSet, SRtn *pRtn); -int tsdbBegin(STsdb *pTsdb) { +int32_t tsdbBegin(STsdb *pTsdb) { if (!pTsdb) return 0; SMemTable *pMem; @@ -117,10 +120,42 @@ int32_t tsdbCommit(STsdb *pTsdb) { return -1; } + // commit impl + code = tsdbCommitData(&commith); + if (code) { + goto _err; + } + + code = tsdbCommitDel(&commith); + if (code) { + goto _err; + } + + code = tsdbCommitCache(&commith); + if (code) { + goto _err; + } + + // end commit + tsdbDestroyCommitH(&commith); + tsdbEndCommit(pTsdb, TSDB_CODE_SUCCESS); + + return code; + +_err: + return code; +} + +static int32_t tsdbCommitData(SCommitH *pCommith) { + int32_t fid; + SDFileSet *pSet; + int32_t code = 0; + STsdb *pTsdb = TSDB_COMMIT_REPO(pCommith); + // Skip expired memory data and expired FSET - tsdbSeekCommitIter(&commith, commith.rtn.minKey); - while ((pSet = tsdbFSIterNext(&(commith.fsIter)))) { - if (pSet->fid < commith.rtn.minFid) { + tsdbSeekCommitIter(pCommith, pCommith->rtn.minKey); + while ((pSet = tsdbFSIterNext(&(pCommith->fsIter)))) { + if (pSet->fid < pCommith->rtn.minFid) { tsdbInfo("vgId:%d, FSET %d on level %d disk id %d expires, remove it", REPO_ID(pTsdb), pSet->fid, TSDB_FSET_LEVEL(pSet), TSDB_FSET_ID(pSet)); } else { @@ -129,7 +164,7 @@ int32_t tsdbCommit(STsdb *pTsdb) { } // commit - fid = tsdbNextCommitFid(&(commith)); + fid = tsdbNextCommitFid(pCommith); while (true) { // Loop over both on disk and memory if (pSet == NULL && fid == TSDB_IVLD_FID) break; @@ -137,12 +172,12 @@ int32_t tsdbCommit(STsdb *pTsdb) { if (pSet && (fid == TSDB_IVLD_FID || pSet->fid < fid)) { // Only has existing FSET but no memory data to commit in this // existing FSET, only check if file in correct retention - if (tsdbApplyRtnOnFSet(pTsdb, pSet, &(commith.rtn)) < 0) { - tsdbDestroyCommitH(&commith); + if (tsdbApplyRtnOnFSet(TSDB_COMMIT_REPO(pCommith), pSet, &(pCommith->rtn)) < 0) { + tsdbDestroyCommitH(pCommith); return -1; } - pSet = tsdbFSIterNext(&(commith.fsIter)); + pSet = tsdbFSIterNext(&(pCommith->fsIter)); } else { // Has memory data to commit SDFileSet *pCSet; @@ -156,22 +191,30 @@ int32_t tsdbCommit(STsdb *pTsdb) { // Commit to an existing FSET pCSet = pSet; cfid = pSet->fid; - pSet = tsdbFSIterNext(&(commith.fsIter)); + pSet = tsdbFSIterNext(&(pCommith->fsIter)); } - if (tsdbCommitToFile(&commith, pCSet, cfid) < 0) { - tsdbDestroyCommitH(&commith); + if (tsdbCommitToFile(pCommith, pCSet, cfid) < 0) { + tsdbDestroyCommitH(pCommith); return -1; } - fid = tsdbNextCommitFid(&commith); + fid = tsdbNextCommitFid(pCommith); } } - // end commit - tsdbDestroyCommitH(&commith); - tsdbEndCommit(pTsdb, TSDB_CODE_SUCCESS); + return code; +} +static int32_t tsdbCommitDel(SCommitH *pCommith) { + int32_t code = 0; + // TODO + return code; +} + +static int32_t tsdbCommitCache(SCommitH *pCommith) { + int32_t code = 0; + // TODO return code; } @@ -216,16 +259,6 @@ static int tsdbApplyRtnOnFSet(STsdb *pRepo, SDFileSet *pSet, SRtn *pRtn) { return 0; } -// int tsdbPrepareCommit(STsdb *pTsdb) { -// if (pTsdb->mem == NULL) return 0; - -// ASSERT(pTsdb->imem == NULL); - -// pTsdb->imem = pTsdb->mem; -// pTsdb->mem = NULL; -// return 0; -// } - void tsdbGetRtnSnap(STsdb *pRepo, SRtn *pRtn) { STsdbKeepCfg *pCfg = REPO_KEEP_CFG(pRepo); TSKEY minKey, midKey, maxKey, now; diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable2.c b/source/dnode/vnode/src/tsdb/tsdbMemTable2.c deleted file mode 100644 index aa58cc92a0..0000000000 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable2.c +++ /dev/null @@ -1,530 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#include "tsdb.h" - -typedef struct { - tb_uid_t uid; - STSchema *pTSchema; -} SSkmInfo; - -#define SL_MAX_LEVEL 5 - -#define SL_NODE_SIZE(l) (sizeof(SMemSkipListNode) + sizeof(SMemSkipListNode *) * (l)*2) -#define SL_NODE_FORWARD(n, l) ((n)->forwards[l]) -#define SL_NODE_BACKWARD(n, l) ((n)->forwards[(n)->level + (l)]) -#define SL_NODE_DATA(n) (&SL_NODE_BACKWARD(n, (n)->level)) - -#define SL_MOVE_BACKWARD 0x1 -#define SL_MOVE_FROM_POS 0x2 - -static int32_t tsdbGetOrCreateMemData(SMemTable2 *pMemTable, tb_uid_t suid, tb_uid_t uid, SMemData **ppMemData); -static int memDataPCmprFn(const void *p1, const void *p2); -static int32_t tPutTSDBRow(uint8_t *p, TSDBROW *pRow); -static int32_t tGetTSDBRow(uint8_t *p, TSDBROW *pRow); -static int8_t tsdbMemSkipListRandLevel(SMemSkipList *pSl); -static int32_t tsdbInsertTableDataImpl(SMemTable2 *pMemTable, SMemData *pMemData, int64_t version, - SVSubmitBlk *pSubmitBlk); -static void memDataMovePosTo(SMemData *pMemData, SMemSkipListNode **pos, TSDBKEY *pKey, int32_t flags); - -// SMemTable ============================================== -int32_t tsdbMemTableCreate2(STsdb *pTsdb, SMemTable2 **ppMemTable) { - int32_t code = 0; - SMemTable2 *pMemTable = NULL; - - pMemTable = (SMemTable2 *)taosMemoryCalloc(1, sizeof(*pMemTable)); - if (pMemTable == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } - pMemTable->pTsdb = pTsdb; - pMemTable->nRef = 1; - pMemTable->minKey = (TSDBKEY){.version = INT64_MAX, .ts = TSKEY_MAX}; - pMemTable->maxKey = (TSDBKEY){.version = -1, .ts = TSKEY_MIN}; - pMemTable->nRows = 0; - pMemTable->nDelOp = 0; - pMemTable->aMemData = taosArrayInit(512, sizeof(SMemData *)); - if (pMemTable->aMemData == NULL) { - taosMemoryFree(pMemTable); - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } - - *ppMemTable = pMemTable; - return code; - -_err: - *ppMemTable = NULL; - return code; -} - -void tsdbMemTableDestroy2(SMemTable2 *pMemTable) { - taosArrayDestroyEx(pMemTable->aMemData, NULL /*TODO*/); - taosMemoryFree(pMemTable); -} - -int32_t tsdbInsertTableData2(STsdb *pTsdb, int64_t version, SVSubmitBlk *pSubmitBlk) { - int32_t code = 0; - SMemTable2 *pMemTable = (SMemTable2 *)pTsdb->mem; // TODO - SMemData *pMemData; - TSDBROW row = {.version = version}; - - ASSERT(pMemTable); - ASSERT(pSubmitBlk->nData > 0); - - { - // check if table exists (todo) - } - - code = tsdbGetOrCreateMemData(pMemTable, pSubmitBlk->suid, pSubmitBlk->uid, &pMemData); - if (code) { - tsdbError("vgId:%d, failed to create/get table data since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); - goto _err; - } - - // do insert - code = tsdbInsertTableDataImpl(pMemTable, pMemData, version, pSubmitBlk); - if (code) { - goto _err; - } - - return code; - -_err: - return code; -} - -int32_t tsdbDeleteTableData2(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey) { - int32_t code = 0; - SMemTable2 *pMemTable = (SMemTable2 *)pTsdb->mem; // TODO - SMemData *pMemData; - SVBufPool *pPool = pTsdb->pVnode->inUse; - - ASSERT(pMemTable); - - { - // check if table exists (todo) - } - - code = tsdbGetOrCreateMemData(pMemTable, suid, uid, &pMemData); - if (code) { - goto _err; - } - - // do delete - SDelOp *pDelOp = (SDelOp *)vnodeBufPoolMalloc(pPool, sizeof(*pDelOp)); - if (pDelOp == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } - pDelOp->version = version; - pDelOp->sKey = sKey; - pDelOp->eKey = eKey; - pDelOp->pNext = NULL; - if (pMemData->delOpHead == NULL) { - ASSERT(pMemData->delOpTail == NULL); - pMemData->delOpHead = pMemData->delOpTail = pDelOp; - } else { - pMemData->delOpTail->pNext = pDelOp; - pMemData->delOpTail = pDelOp; - } - - { - // update the state of pMemTable, pMemData, last and lastrow (todo) - } - - pMemTable->nDelOp++; - - tsdbDebug("vgId:%d, delete data from table suid:%" PRId64 " uid:%" PRId64 " sKey:%" PRId64 " eKey:%" PRId64 - " since %s", - TD_VID(pTsdb->pVnode), suid, uid, sKey, eKey, tstrerror(code)); - return code; - -_err: - tsdbError("vgId:%d, failed to delete data from table suid:%" PRId64 " uid:%" PRId64 " sKey:%" PRId64 " eKey:%" PRId64 - " since %s", - TD_VID(pTsdb->pVnode), suid, uid, sKey, eKey, tstrerror(code)); - return code; -} - -void tsdbMemDataIterOpen(SMemData *pMemData, TSDBKEY *pKey, int8_t backward, SMemDataIter *pIter) { - SMemSkipListNode *pos[SL_MAX_LEVEL]; - - pIter->pMemData = pMemData; - pIter->backward = backward; - pIter->pRow = NULL; - if (pKey == NULL) { - // create from head or tail - if (backward) { - pIter->pNode = SL_NODE_BACKWARD(pMemData->sl.pTail, 0); - } else { - pIter->pNode = SL_NODE_FORWARD(pMemData->sl.pHead, 0); - } - } else { - // create from a key - if (backward) { - memDataMovePosTo(pMemData, pos, pKey, SL_MOVE_BACKWARD); - pIter->pNode = SL_NODE_BACKWARD(pos[0], 0); - } else { - memDataMovePosTo(pMemData, pos, pKey, 0); - pIter->pNode = SL_NODE_FORWARD(pos[0], 0); - } - } -} - -bool tsdbMemDataIterNext(SMemDataIter *pIter) { - SMemSkipListNode *pHead = pIter->pMemData->sl.pHead; - SMemSkipListNode *pTail = pIter->pMemData->sl.pTail; - - pIter->pRow = NULL; - if (pIter->backward) { - ASSERT(pIter->pNode != pTail); - - if (pIter->pNode == pHead) { - return false; - } - - pIter->pNode = SL_NODE_BACKWARD(pIter->pNode, 0); - if (pIter->pNode == pHead) { - return false; - } - } else { - ASSERT(pIter->pNode != pHead); - - if (pIter->pNode == pTail) { - return false; - } - - pIter->pNode = SL_NODE_FORWARD(pIter->pNode, 0); - if (pIter->pNode == pTail) { - return false; - } - } - - return true; -} - -void tsdbMemDataIterGet(SMemDataIter *pIter, TSDBROW **ppRow) { - if (pIter->pRow) { - *ppRow = pIter->pRow; - } else { - SMemSkipListNode *pHead = pIter->pMemData->sl.pHead; - SMemSkipListNode *pTail = pIter->pMemData->sl.pTail; - - if (pIter->backward) { - ASSERT(pIter->pNode != pTail); - - if (pIter->pNode == pHead) { - *ppRow = NULL; - } else { - tGetTSDBRow((uint8_t *)SL_NODE_DATA(pIter->pNode), &pIter->row); - *ppRow = &pIter->row; - } - } else { - ASSERT(pIter->pNode != pHead); - - if (pIter->pNode == pTail) { - *ppRow = NULL; - } else { - tGetTSDBRow((uint8_t *)SL_NODE_DATA(pIter->pNode), &pIter->row); - *ppRow = &pIter->row; - } - } - } -} - -static int32_t tsdbGetOrCreateMemData(SMemTable2 *pMemTable, tb_uid_t suid, tb_uid_t uid, SMemData **ppMemData) { - int32_t code = 0; - int32_t idx = 0; - SMemData *pMemDataT = &(SMemData){.suid = suid, .uid = uid}; - SMemData *pMemData = NULL; - SVBufPool *pPool = pMemTable->pTsdb->pVnode->inUse; - int8_t maxLevel = pMemTable->pTsdb->pVnode->config.tsdbCfg.slLevel; - - // get - idx = taosArraySearchIdx(pMemTable->aMemData, &pMemDataT, memDataPCmprFn, TD_GE); - if (idx >= 0) { - pMemData = (SMemData *)taosArrayGet(pMemTable->aMemData, idx); - if (memDataPCmprFn(&pMemDataT, &pMemData) == 0) goto _exit; - } - - // create - pMemData = vnodeBufPoolMalloc(pPool, sizeof(*pMemData) + SL_NODE_SIZE(maxLevel) * 2); - if (pMemData == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } - pMemData->suid = suid; - pMemData->uid = uid; - pMemData->minKey = (TSDBKEY){.version = INT64_MAX, .ts = TSKEY_MAX}; - pMemData->maxKey = (TSDBKEY){.version = -1, .ts = TSKEY_MIN}; - pMemData->delOpHead = pMemData->delOpTail = NULL; - pMemData->sl.seed = taosRand(); - pMemData->sl.size = 0; - pMemData->sl.maxLevel = maxLevel; - pMemData->sl.level = 0; - pMemData->sl.pHead = (SMemSkipListNode *)&pMemData[1]; - pMemData->sl.pTail = (SMemSkipListNode *)POINTER_SHIFT(pMemData->sl.pHead, SL_NODE_SIZE(maxLevel)); - pMemData->sl.pHead->level = maxLevel; - pMemData->sl.pTail->level = maxLevel; - - for (int8_t iLevel = 0; iLevel < pMemData->sl.maxLevel; iLevel++) { - SL_NODE_FORWARD(pMemData->sl.pHead, iLevel) = pMemData->sl.pTail; - SL_NODE_BACKWARD(pMemData->sl.pHead, iLevel) = NULL; - SL_NODE_BACKWARD(pMemData->sl.pTail, iLevel) = pMemData->sl.pHead; - SL_NODE_FORWARD(pMemData->sl.pTail, iLevel) = NULL; - } - - if (idx < 0) idx = 0; - if (taosArrayInsert(pMemTable->aMemData, idx, &pMemData) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } - -_exit: - *ppMemData = pMemData; - return code; - -_err: - *ppMemData = NULL; - return code; -} - -static int memDataPCmprFn(const void *p1, const void *p2) { - SMemData *pMemData1 = *(SMemData **)p1; - SMemData *pMemData2 = *(SMemData **)p2; - - if (pMemData1->suid < pMemData2->suid) { - return -1; - } else if (pMemData1->suid > pMemData2->suid) { - return 1; - } - - if (pMemData1->uid < pMemData2->uid) { - return -1; - } else if (pMemData1->uid > pMemData2->uid) { - return 1; - } - - return 0; -} - -static int32_t tPutTSDBRow(uint8_t *p, TSDBROW *pRow) { - int32_t n = 0; - - n += tPutI64(p ? p + n : p, pRow->version); - n += tPutTSRow(p ? p + n : p, &pRow->tsRow); - - return n; -} - -static int32_t tGetTSDBRow(uint8_t *p, TSDBROW *pRow) { - int32_t n = 0; - - n += tGetI64(p + n, &pRow->version); - n += tGetTSRow(p + n, &pRow->tsRow); - - return n; -} - -static FORCE_INLINE int8_t tsdbMemSkipListRandLevel(SMemSkipList *pSl) { - int8_t level = 1; - int8_t tlevel = TMIN(pSl->maxLevel, pSl->level + 1); - const uint32_t factor = 4; - - while ((taosRandR(&pSl->seed) % factor) == 0 && level < tlevel) { - level++; - } - - return level; -} - -static void memDataMovePosTo(SMemData *pMemData, SMemSkipListNode **pos, TSDBKEY *pKey, int32_t flags) { - SMemSkipListNode *px; - SMemSkipListNode *pn; - TSDBKEY *pTKey; - int c; - int backward = flags & SL_MOVE_BACKWARD; - int fromPos = flags & SL_MOVE_FROM_POS; - - if (backward) { - px = pMemData->sl.pTail; - - for (int8_t iLevel = pMemData->sl.maxLevel - 1; iLevel >= pMemData->sl.level; iLevel--) { - pos[iLevel] = px; - } - - if (pMemData->sl.level) { - if (fromPos) px = pos[pMemData->sl.level - 1]; - - for (int8_t iLevel = pMemData->sl.level - 1; iLevel >= 0; iLevel--) { - pn = SL_NODE_BACKWARD(px, iLevel); - while (pn != pMemData->sl.pHead) { - pTKey = (TSDBKEY *)SL_NODE_DATA(pn); - - c = tsdbKeyCmprFn(pTKey, pKey); - if (c <= 0) { - break; - } else { - px = pn; - pn = SL_NODE_BACKWARD(px, iLevel); - } - } - - pos[iLevel] = px; - } - } - } else { - px = pMemData->sl.pHead; - - for (int8_t iLevel = pMemData->sl.maxLevel - 1; iLevel >= pMemData->sl.level; iLevel--) { - pos[iLevel] = px; - } - - if (pMemData->sl.level) { - if (fromPos) px = pos[pMemData->sl.level - 1]; - - for (int8_t iLevel = pMemData->sl.level - 1; iLevel >= 0; iLevel--) { - pn = SL_NODE_FORWARD(px, iLevel); - while (pn != pMemData->sl.pHead) { - pTKey = (TSDBKEY *)SL_NODE_DATA(pn); - - c = tsdbKeyCmprFn(pTKey, pKey); - if (c >= 0) { - break; - } else { - px = pn; - pn = SL_NODE_FORWARD(px, iLevel); - } - } - - pos[iLevel] = px; - } - } - } -} - -static int32_t memDataDoPut(SMemTable2 *pMemTable, SMemData *pMemData, SMemSkipListNode **pos, TSDBROW *pRow, - int8_t forward) { - int32_t code = 0; - int8_t level; - SMemSkipListNode *pNode; - SVBufPool *pPool = pMemTable->pTsdb->pVnode->inUse; - - // node - level = tsdbMemSkipListRandLevel(&pMemData->sl); - pNode = (SMemSkipListNode *)vnodeBufPoolMalloc(pPool, SL_NODE_SIZE(level) + tPutTSDBRow(NULL, pRow)); - if (pNode == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; - } - pNode->level = level; - for (int8_t iLevel = 0; iLevel < level; iLevel++) { - SL_NODE_FORWARD(pNode, iLevel) = NULL; - SL_NODE_BACKWARD(pNode, iLevel) = NULL; - } - - tPutTSDBRow((uint8_t *)SL_NODE_DATA(pNode), pRow); - - // put - for (int8_t iLevel = 0; iLevel < pNode->level; iLevel++) { - SMemSkipListNode *px = pos[iLevel]; - - if (forward) { - SMemSkipListNode *pNext = SL_NODE_FORWARD(px, iLevel); - - SL_NODE_FORWARD(pNode, iLevel) = pNext; - SL_NODE_BACKWARD(pNode, iLevel) = px; - - SL_NODE_BACKWARD(pNext, iLevel) = pNode; - SL_NODE_FORWARD(px, iLevel) = pNode; - } else { - SMemSkipListNode *pPrev = SL_NODE_BACKWARD(px, iLevel); - - SL_NODE_FORWARD(pNode, iLevel) = px; - SL_NODE_BACKWARD(pNode, iLevel) = pPrev; - - SL_NODE_FORWARD(pPrev, iLevel) = pNode; - SL_NODE_BACKWARD(px, iLevel) = pNode; - } - } - - pMemData->sl.size++; - if (pMemData->sl.level < pNode->level) { - pMemData->sl.level = pNode->level; - } - -_exit: - return code; -} - -static int32_t tsdbInsertTableDataImpl(SMemTable2 *pMemTable, SMemData *pMemData, int64_t version, - SVSubmitBlk *pSubmitBlk) { - int32_t code = 0; - int32_t n = 0; - uint8_t *p = pSubmitBlk->pData; - int32_t nRow = 0; - TSDBROW row = {.version = version}; - - SMemSkipListNode *pos[SL_MAX_LEVEL]; - - ASSERT(pSubmitBlk->nData); - - // backward put first data - n += tGetTSRow(p + n, &row.tsRow); - ASSERT(n <= pSubmitBlk->nData); - - memDataMovePosTo(pMemData, pos, &(TSDBKEY){.version = version, .ts = row.tsRow.ts}, SL_MOVE_BACKWARD); - code = memDataDoPut(pMemTable, pMemData, pos, &row, 0); - if (code) { - goto _exit; - } - nRow++; - - if (tsdbKeyCmprFn((TSDBKEY *)&row, &pMemData->minKey) < 0) { - pMemData->minKey = *(TSDBKEY *)&row; - } - if (tsdbKeyCmprFn((TSDBKEY *)&row, &pMemTable->minKey) < 0) { - pMemTable->minKey = *(TSDBKEY *)&row; - } - - // forward put rest - for (int8_t iLevel = 0; iLevel < pMemData->sl.maxLevel; iLevel++) { - pos[iLevel] = SL_NODE_BACKWARD(pos[iLevel], iLevel); - } - while (n < pSubmitBlk->nData) { - n += tGetTSRow(p + n, &row.tsRow); - ASSERT(n <= pSubmitBlk->nData); - - memDataMovePosTo(pMemData, pos, &(TSDBKEY){.version = version, .ts = row.tsRow.ts}, SL_MOVE_FROM_POS); - code = memDataDoPut(pMemTable, pMemData, pos, &row, 1); - if (code) { - goto _exit; - } - - nRow++; - } - - if (tsdbKeyCmprFn((TSDBKEY *)&row, &pMemData->maxKey) > 0) { - pMemData->maxKey = *(TSDBKEY *)&row; - } - if (tsdbKeyCmprFn((TSDBKEY *)&row, &pMemTable->maxKey) > 0) { - pMemTable->maxKey = *(TSDBKEY *)&row; - } - pMemTable->nRows += nRow; - -_exit: - return code; -}