From 638aa9cdccc97c476488503a10ccc1f7b0689a8f Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Sun, 5 Jun 2022 13:19:42 +0000 Subject: [PATCH 01/17] refact --- source/dnode/vnode/src/inc/tsdb.h | 8 +++++--- source/dnode/vnode/src/tsdb/tsdbFile.c | 3 +-- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 171d52164c..6ce1e1e6e0 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -237,9 +237,11 @@ struct STsdbFSMeta { // ================== typedef struct { - STsdbFSMeta meta; // FS meta - SArray *df; // data file array - SArray *sf; // sma data file array v2f1900.index_name_1 + STsdbFSMeta meta; // FS meta + SDFile cacheFile; // cache file + SDFile tombstone; // tomestome file + SArray *df; // data file array + SArray *sf; // sma data file array v2f1900.index_name_1 } SFSStatus; struct STsdbFS { diff --git a/source/dnode/vnode/src/tsdb/tsdbFile.c b/source/dnode/vnode/src/tsdb/tsdbFile.c index a88cb8a353..11d206dc35 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFile.c +++ b/source/dnode/vnode/src/tsdb/tsdbFile.c @@ -25,8 +25,7 @@ static const char *TSDB_FNAME_SUFFIX[] = { "meta", // TSDB_FILE_META }; -static void tsdbGetFilename(int vid, int fid, uint32_t ver, TSDB_FILE_T ftype, const char *dname, char *fname); -// static int tsdbRollBackMFile(SMFile *pMFile); +static void tsdbGetFilename(int vid, int fid, uint32_t ver, TSDB_FILE_T ftype, const char *dname, char *fname); static int tsdbEncodeDFInfo(void **buf, SDFInfo *pInfo); static void *tsdbDecodeDFInfo(void *buf, SDFInfo *pInfo); static int tsdbRollBackDFile(SDFile *pDFile); From 9ea8640b2435d7c3dcaf92091b791e2766132bdf Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Sun, 5 Jun 2022 13:50:10 +0000 Subject: [PATCH 02/17] refact --- source/dnode/vnode/src/inc/tsdb.h | 52 ++++++++++----------- source/dnode/vnode/src/tsdb/tsdbCommit.c | 6 +-- source/dnode/vnode/src/tsdb/tsdbCommit2.c | 38 +++++++-------- source/dnode/vnode/src/tsdb/tsdbMemTable.c | 12 ++--- source/dnode/vnode/src/tsdb/tsdbMemTable2.c | 36 +++++++------- source/dnode/vnode/src/tsdb/tsdbRead.c | 10 ++-- 6 files changed, 77 insertions(+), 77 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 6ce1e1e6e0..228d9ffa90 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -39,13 +39,24 @@ typedef struct SDelOp SDelOp; static int tsdbKeyCmprFn(const void *p1, const void *p2); +// tsdbMemTable ================ +typedef struct STbData STbData; +typedef struct SMemTable SMemTable; +typedef struct SMergeInfo SMergeInfo; +typedef struct STable STable; + +int tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable); +void tsdbMemTableDestroy(SMemTable *pMemTable); +int tsdbLoadDataFromCache(STsdb *pTsdb, STable *pTable, SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, + SDataCols *pCols, TKEY *filterKeys, int nFilterKeys, bool keepDup, SMergeInfo *pMergeInfo); + // tsdbMemTable2.c ============================================================================================== -typedef struct SMemTable SMemTable; +typedef struct SMemTable2 SMemTable2; typedef struct SMemData SMemData; typedef struct SMemDataIter SMemDataIter; -int32_t tsdbMemTableCreate2(STsdb *pTsdb, SMemTable **ppMemTable); -void tsdbMemTableDestroy2(SMemTable *pMemTable); +int32_t tsdbMemTableCreate2(STsdb *pTsdb, SMemTable2 **ppMemTable); +void tsdbMemTableDestroy2(SMemTable2 *pMemTable); int32_t tsdbInsertTableData2(STsdb *pTsdb, int64_t version, SVSubmitBlk *pSubmitBlk); int32_t tsdbDeleteTableData2(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey); @@ -124,17 +135,6 @@ int tsdbRLockFS(STsdbFS *pFs); int tsdbWLockFS(STsdbFS *pFs); int tsdbUnLockFS(STsdbFS *pFs); -// tsdbMemTable ================ -typedef struct STbData STbData; -typedef struct STsdbMemTable STsdbMemTable; -typedef struct SMergeInfo SMergeInfo; -typedef struct STable STable; - -int tsdbMemTableCreate(STsdb *pTsdb, STsdbMemTable **ppMemTable); -void tsdbMemTableDestroy(STsdbMemTable *pMemTable); -int tsdbLoadDataFromCache(STsdb *pTsdb, STable *pTable, SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, - SDataCols *pCols, TKEY *filterKeys, int nFilterKeys, bool keepDup, SMergeInfo *pMergeInfo); - // structs typedef struct { int minFid; @@ -145,16 +145,16 @@ typedef struct { #define TSDB_DATA_DIR_LEN 6 // adapt accordingly struct STsdb { - char *path; - SVnode *pVnode; - TdThreadMutex mutex; - char dir[TSDB_DATA_DIR_LEN]; - bool repoLocked; - STsdbKeepCfg keepCfg; - STsdbMemTable *mem; - STsdbMemTable *imem; - SRtn rtn; - STsdbFS *fs; + char *path; + SVnode *pVnode; + TdThreadMutex mutex; + char dir[TSDB_DATA_DIR_LEN]; + bool repoLocked; + STsdbKeepCfg keepCfg; + SMemTable *mem; + SMemTable *imem; + SRtn rtn; + STsdbFS *fs; }; #if 1 // ====================================== @@ -216,7 +216,7 @@ struct STbData { SSkipList *pData; }; -struct STsdbMemTable { +struct SMemTable { SVBufPool *pPool; T_REF_DECLARE() SRWLatch latch; @@ -677,7 +677,7 @@ typedef struct { TSKEY eKey; } SDelInfo; -struct SMemTable { +struct SMemTable2 { STsdb *pTsdb; int32_t nRef; TSDBKEY minKey; diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 59a795b77a..06c75d029d 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -92,7 +92,7 @@ static int tsdbApplyRtnOnFSet(STsdb *pRepo, SDFileSet *pSet, SRtn *pRtn); int tsdbBegin(STsdb *pTsdb) { if (!pTsdb) return 0; - STsdbMemTable *pMem; + SMemTable *pMem; if (tsdbMemTableCreate(pTsdb, &pTsdb->mem) < 0) { return -1; @@ -244,7 +244,7 @@ void tsdbGetRtnSnap(STsdb *pRepo, SRtn *pRtn) { } static void tsdbStartCommit(STsdb *pRepo) { - STsdbMemTable *pMem = pRepo->imem; + SMemTable *pMem = pRepo->imem; tsdbInfo("vgId:%d, start to commit", REPO_ID(pRepo)); @@ -455,7 +455,7 @@ static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid) { static int tsdbCreateCommitIters(SCommitH *pCommith) { STsdb *pRepo = TSDB_COMMIT_REPO(pCommith); - STsdbMemTable *pMem = pRepo->imem; + SMemTable *pMem = pRepo->imem; SSkipListIterator *pSlIter; SCommitIter *pCommitIter; SSkipListNode *pNode; diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit2.c b/source/dnode/vnode/src/tsdb/tsdbCommit2.c index 321667c55b..07d4ef8656 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit2.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit2.c @@ -16,19 +16,19 @@ #include "tsdb.h" typedef struct { - SMemTable *pMemTable; - int32_t minutes; - int8_t precision; - TSKEY nCommitKey; - int32_t fid; - TSKEY minKey; - TSKEY maxKey; - SReadH readh; - SDFileSet wSet; - SArray *aBlkIdx; - SArray *aSupBlk; - SArray *aSubBlk; - SArray *aDelInfo; + SMemTable2 *pMemTable; + int32_t minutes; + int8_t precision; + TSKEY nCommitKey; + int32_t fid; + TSKEY minKey; + TSKEY maxKey; + SReadH readh; + SDFileSet wSet; + SArray *aBlkIdx; + SArray *aSupBlk; + SArray *aSubBlk; + SArray *aDelInfo; } SCommitH; static int32_t tsdbCommitStart(SCommitH *pCHandle, STsdb *pTsdb); @@ -39,7 +39,7 @@ int32_t tsdbBegin2(STsdb *pTsdb) { int32_t code = 0; ASSERT(pTsdb->mem == NULL); - code = tsdbMemTableCreate2(pTsdb, (SMemTable **)&pTsdb->mem); + code = tsdbMemTableCreate2(pTsdb, (SMemTable2 **)&pTsdb->mem); if (code) { tsdbError("vgId:%d failed to begin TSDB since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); goto _exit; @@ -80,8 +80,8 @@ _err: } static int32_t tsdbCommitStart(SCommitH *pCHandle, STsdb *pTsdb) { - int32_t code = 0; - SMemTable *pMemTable = (SMemTable *)pTsdb->mem; + int32_t code = 0; + SMemTable2 *pMemTable = (SMemTable2 *)pTsdb->mem; tsdbInfo("vgId:%d start to commit", TD_VID(pTsdb->pVnode)); @@ -131,9 +131,9 @@ _err: } static int32_t tsdbCommitEnd(SCommitH *pCHandle) { - int32_t code = 0; - STsdb *pTsdb = pCHandle->pMemTable->pTsdb; - SMemTable *pMemTable = (SMemTable *)pTsdb->imem; + int32_t code = 0; + STsdb *pTsdb = pCHandle->pMemTable->pTsdb; + SMemTable2 *pMemTable = (SMemTable2 *)pTsdb->imem; // end transaction code = tsdbEndFSTxn(pTsdb); diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index fa392baa16..d683c7df27 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -23,15 +23,15 @@ static char *tsdbTbDataGetUid(const void *arg); static int tsdbAppendTableRowToCols(STsdb *pTsdb, STable *pTable, SDataCols *pCols, STSchema **ppSchema, STSRow *row, bool merge); -int tsdbMemTableCreate(STsdb *pTsdb, STsdbMemTable **ppMemTable) { - STsdbMemTable *pMemTable; - SVnode *pVnode; +int tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable) { + SMemTable *pMemTable; + SVnode *pVnode; *ppMemTable = NULL; pVnode = pTsdb->pVnode; // alloc handle - pMemTable = (STsdbMemTable *)taosMemoryCalloc(1, sizeof(*pMemTable)); + pMemTable = (SMemTable *)taosMemoryCalloc(1, sizeof(*pMemTable)); if (pMemTable == NULL) { return -1; } @@ -60,7 +60,7 @@ int tsdbMemTableCreate(STsdb *pTsdb, STsdbMemTable **ppMemTable) { return 0; } -void tsdbMemTableDestroy(STsdbMemTable *pMemTable) { +void tsdbMemTableDestroy(SMemTable *pMemTable) { if (pMemTable) { taosHashCleanup(pMemTable->pHashIdx); SSkipListIterator *pIter = tSkipListCreateIter(pMemTable->pSlIdx); @@ -240,7 +240,7 @@ int tsdbLoadDataFromCache(STsdb *pTsdb, STable *pTable, SSkipListIterator *pIter int tsdbInsertTableData(STsdb *pTsdb, SSubmitMsgIter *pMsgIter, SSubmitBlk *pBlock, SSubmitBlkRsp *pRsp) { SSubmitBlkIter blkIter = {0}; - STsdbMemTable *pMemTable = pTsdb->mem; + SMemTable *pMemTable = pTsdb->mem; void *tptr; STbData *pTbData; STSRow *row; diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable2.c b/source/dnode/vnode/src/tsdb/tsdbMemTable2.c index cf8557bba3..24c81b1782 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable2.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable2.c @@ -35,21 +35,21 @@ typedef struct { #define SL_MOVE_BACKWARD 0x1 #define SL_MOVE_FROM_POS 0x2 -static int32_t tsdbGetOrCreateMemData(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid, SMemData **ppMemData); +static int32_t tsdbGetOrCreateMemData(SMemTable2 *pMemTable, tb_uid_t suid, tb_uid_t uid, SMemData **ppMemData); static int memDataPCmprFn(const void *p1, const void *p2); static int32_t tPutTSDBRow(uint8_t *p, TSDBROW *pRow); static int32_t tGetTSDBRow(uint8_t *p, TSDBROW *pRow); static int8_t tsdbMemSkipListRandLevel(SMemSkipList *pSl); -static int32_t tsdbInsertTableDataImpl(SMemTable *pMemTable, SMemData *pMemData, int64_t version, +static int32_t tsdbInsertTableDataImpl(SMemTable2 *pMemTable, SMemData *pMemData, int64_t version, SVSubmitBlk *pSubmitBlk); static void memDataMovePosTo(SMemData *pMemData, SMemSkipListNode **pos, TSDBKEY *pKey, int32_t flags); // SMemTable ============================================== -int32_t tsdbMemTableCreate2(STsdb *pTsdb, SMemTable **ppMemTable) { - int32_t code = 0; - SMemTable *pMemTable = NULL; +int32_t tsdbMemTableCreate2(STsdb *pTsdb, SMemTable2 **ppMemTable) { + int32_t code = 0; + SMemTable2 *pMemTable = NULL; - pMemTable = (SMemTable *)taosMemoryCalloc(1, sizeof(*pMemTable)); + pMemTable = (SMemTable2 *)taosMemoryCalloc(1, sizeof(*pMemTable)); if (pMemTable == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _err; @@ -75,16 +75,16 @@ _err: return code; } -void tsdbMemTableDestroy2(SMemTable *pMemTable) { +void tsdbMemTableDestroy2(SMemTable2 *pMemTable) { taosArrayDestroyEx(pMemTable->aMemData, NULL /*TODO*/); taosMemoryFree(pMemTable); } int32_t tsdbInsertTableData2(STsdb *pTsdb, int64_t version, SVSubmitBlk *pSubmitBlk) { - int32_t code = 0; - SMemTable *pMemTable = (SMemTable *)pTsdb->mem; // TODO - SMemData *pMemData; - TSDBROW row = {.version = version}; + int32_t code = 0; + SMemTable2 *pMemTable = (SMemTable2 *)pTsdb->mem; // TODO + SMemData *pMemData; + TSDBROW row = {.version = version}; ASSERT(pMemTable); ASSERT(pSubmitBlk->nData > 0); @@ -112,10 +112,10 @@ _err: } int32_t tsdbDeleteTableData2(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey) { - int32_t code = 0; - SMemTable *pMemTable = (SMemTable *)pTsdb->mem; // TODO - SMemData *pMemData; - SVBufPool *pPool = pTsdb->pVnode->inUse; + int32_t code = 0; + SMemTable2 *pMemTable = (SMemTable2 *)pTsdb->mem; // TODO + SMemData *pMemData; + SVBufPool *pPool = pTsdb->pVnode->inUse; ASSERT(pMemTable); @@ -250,7 +250,7 @@ void tsdbMemDataIterGet(SMemDataIter *pIter, TSDBROW **ppRow) { } } -static int32_t tsdbGetOrCreateMemData(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid, SMemData **ppMemData) { +static int32_t tsdbGetOrCreateMemData(SMemTable2 *pMemTable, tb_uid_t suid, tb_uid_t uid, SMemData **ppMemData) { int32_t code = 0; int32_t idx = 0; SMemData *pMemDataT = &(SMemData){.suid = suid, .uid = uid}; @@ -421,7 +421,7 @@ static void memDataMovePosTo(SMemData *pMemData, SMemSkipListNode **pos, TSDBKEY } } -static int32_t memDataDoPut(SMemTable *pMemTable, SMemData *pMemData, SMemSkipListNode **pos, TSDBROW *pRow, +static int32_t memDataDoPut(SMemTable2 *pMemTable, SMemData *pMemData, SMemSkipListNode **pos, TSDBROW *pRow, int8_t forward) { int32_t code = 0; int8_t level; @@ -475,7 +475,7 @@ _exit: return code; } -static int32_t tsdbInsertTableDataImpl(SMemTable *pMemTable, SMemData *pMemData, int64_t version, +static int32_t tsdbInsertTableDataImpl(SMemTable2 *pMemTable, SMemData *pMemData, int64_t version, SVSubmitBlk *pSubmitBlk) { int32_t code = 0; int32_t n = 0; diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 56220beea4..7693e2bce1 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -200,8 +200,8 @@ static SArray* getDefaultLoadColumns(STsdbReadHandle* pTsdbReadHandle, bool load int64_t tsdbGetNumOfRowsInMemTable(tsdbReaderT* pHandle) { STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)pHandle; - int64_t rows = 0; - STsdbMemTable* pMemTable = NULL; // pTsdbReadHandle->pMemTable; + int64_t rows = 0; + SMemTable* pMemTable = NULL; // pTsdbReadHandle->pMemTable; if (pMemTable == NULL) { return rows; } @@ -658,7 +658,7 @@ tsdbReaderT tsdbQueryLastRow(SVnode* pVnode, SQueryTableDataCond* pCond, STableL } #if 0 -tsdbReaderT tsdbQueryCacheLastT(STsdb *tsdb, SQueryTableDataCond *pCond, STableGroupInfo *groupList, uint64_t qId, STsdbMemTable* pMemRef) { +tsdbReaderT tsdbQueryCacheLastT(STsdb *tsdb, SQueryTableDataCond *pCond, STableGroupInfo *groupList, uint64_t qId, SMemTable* pMemRef) { STsdbReadHandle *pTsdbReadHandle = (STsdbReadHandle*) tsdbQueryTablesT(tsdb, pCond, groupList, qId, pMemRef); if (pTsdbReadHandle == NULL) { return NULL; @@ -2918,7 +2918,7 @@ static bool loadBlockOfActiveTable(STsdbReadHandle* pTsdbReadHandle) { // current result is empty if (pTsdbReadHandle->currentLoadExternalRows && pTsdbReadHandle->window.skey == pTsdbReadHandle->window.ekey && pTsdbReadHandle->cur.rows == 0) { - // STsdbMemTable* pMemRef = pTsdbReadHandle->pMemTable; + // SMemTable* pMemRef = pTsdbReadHandle->pMemTable; // doGetExternalRow(pTsdbReadHandle, TSDB_PREV_ROW, pMemRef); // doGetExternalRow(pTsdbReadHandle, TSDB_NEXT_ROW, pMemRef); @@ -3216,7 +3216,7 @@ bool tsdbNextDataBlock(tsdbReaderT pHandle) { } } -// static int32_t doGetExternalRow(STsdbReadHandle* pTsdbReadHandle, int16_t type, STsdbMemTable* pMemRef) { +// static int32_t doGetExternalRow(STsdbReadHandle* pTsdbReadHandle, int16_t type, SMemTable* pMemRef) { // STsdbReadHandle* pSecQueryHandle = NULL; // // if (type == TSDB_PREV_ROW && pTsdbReadHandle->prev) { From f81d1027478f5c6eac6581b3b62d5f8bbb15257a Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Mon, 6 Jun 2022 01:24:14 +0000 Subject: [PATCH 03/17] refact --- source/dnode/vnode/src/inc/tsdb.h | 19 ++++++++++--------- source/dnode/vnode/src/tsdb/tsdbMemTable.c | 8 ++++---- source/dnode/vnode/src/tsdb/tsdbRead.c | 4 ++-- 3 files changed, 16 insertions(+), 15 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 228d9ffa90..9ab50ad62f 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -206,12 +206,18 @@ struct SDFileSet { SDFile files[TSDB_FILE_MAX]; }; +struct TSDBKEY { + int64_t version; + TSKEY ts; +}; + struct STbData { + tb_uid_t suid; tb_uid_t uid; - TSKEY keyMin; - TSKEY keyMax; - int64_t minVer; - int64_t maxVer; + TSDBKEY minKey; + TSDBKEY maxKey; + SDelOp *pHead; + SDelOp *pTail; int64_t nrows; SSkipList *pData; }; @@ -313,11 +319,6 @@ static FORCE_INLINE TSKEY tsdbNextIterKey(SSkipListIterator *pIter) { // tsdbReadImpl typedef struct SReadH SReadH; -struct TSDBKEY { - int64_t version; - TSKEY ts; -}; - typedef struct { uint64_t suid; uint64_t uid; diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index d683c7df27..26f750a774 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -305,8 +305,8 @@ int tsdbInsertTableData(STsdb *pTsdb, SSubmitMsgIter *pMsgIter, SSubmitBlk *pBlo keyMax = TD_ROW_KEY(blkIter.row); pTbData->nrows += pMsgIter->numOfRows; - if (pTbData->keyMin > keyMin) pTbData->keyMin = keyMin; - if (pTbData->keyMax < keyMax) pTbData->keyMax = keyMax; + if (pTbData->minKey.ts > keyMin) pTbData->minKey.ts = keyMin; + if (pTbData->maxKey.ts < keyMax) pTbData->maxKey.ts = keyMax; pMemTable->nRow += pMsgIter->numOfRows; if (pMemTable->keyMin > keyMin) pMemTable->keyMin = keyMin; @@ -326,8 +326,8 @@ static STbData *tsdbNewTbData(tb_uid_t uid) { } pTbData->uid = uid; - pTbData->keyMin = TSKEY_MAX; - pTbData->keyMax = TSKEY_MIN; + pTbData->minKey.ts = TSKEY_MAX; + pTbData->maxKey.ts = TSKEY_MIN; pTbData->nrows = 0; #if 0 pTbData->pData = tSkipListCreate(5, TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), tkeyComparFn, SL_DISCARD_DUP_KEY, diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 7693e2bce1..6a9f2dc9a0 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -791,7 +791,7 @@ static bool initTableMemIterator(STsdbReadHandle* pHandle, STableCheckInfo* pChe TSKEY key = TD_ROW_KEY(row); // first timestamp in buffer tsdbDebug("%p uid:%" PRId64 ", check data in mem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64 "-%" PRId64 ", lastKey:%" PRId64 ", numOfRows:%" PRId64 ", %s", - pHandle, pCheckInfo->tableId, key, order, (*pMem)->keyMin, (*pMem)->keyMax, pCheckInfo->lastKey, + pHandle, pCheckInfo->tableId, key, order, (*pMem)->minKey.ts, (*pMem)->maxKey.ts, pCheckInfo->lastKey, (*pMem)->nrows, pHandle->idStr); if (ASCENDING_TRAVERSE(order)) { @@ -812,7 +812,7 @@ static bool initTableMemIterator(STsdbReadHandle* pHandle, STableCheckInfo* pChe TSKEY key = TD_ROW_KEY(row); // first timestamp in buffer tsdbDebug("%p uid:%" PRId64 ", check data in imem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64 "-%" PRId64 ", lastKey:%" PRId64 ", numOfRows:%" PRId64 ", %s", - pHandle, pCheckInfo->tableId, key, order, (*pIMem)->keyMin, (*pIMem)->keyMax, pCheckInfo->lastKey, + pHandle, pCheckInfo->tableId, key, order, (*pIMem)->minKey.ts, (*pIMem)->maxKey.ts, pCheckInfo->lastKey, (*pIMem)->nrows, pHandle->idStr); if (ASCENDING_TRAVERSE(order)) { From 5b64b2921fd989f48696a75d95d4ff30f32c0707 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Mon, 6 Jun 2022 01:46:27 +0000 Subject: [PATCH 04/17] more --- source/dnode/vnode/src/inc/tsdb.h | 6 ++---- source/dnode/vnode/src/tsdb/tsdbMemTable.c | 8 ++++---- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 9ab50ad62f..329b93e73c 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -226,10 +226,8 @@ struct SMemTable { SVBufPool *pPool; T_REF_DECLARE() SRWLatch latch; - TSKEY keyMin; - TSKEY keyMax; - int64_t minVer; - int64_t maxVer; + TSDBKEY minKey; + TSDBKEY maxKey; int64_t nRow; SSkipList *pSlIdx; // SSkiplist SHashObj *pHashIdx; diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index 26f750a774..1427751eae 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -39,8 +39,8 @@ int tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable) { pMemTable->pPool = pTsdb->pVnode->inUse; T_REF_INIT_VAL(pMemTable, 1); taosInitRWLatch(&pMemTable->latch); - pMemTable->keyMin = TSKEY_MAX; - pMemTable->keyMax = TSKEY_MIN; + pMemTable->minKey = (TSDBKEY){.ts = TSKEY_MAX, .version = INT64_MAX}; + pMemTable->maxKey = (TSDBKEY){.ts = TSKEY_MIN, .version = -1}; pMemTable->nRow = 0; pMemTable->pSlIdx = tSkipListCreate(pVnode->config.tsdbCfg.slLevel, TSDB_DATA_TYPE_BIGINT, sizeof(tb_uid_t), tsdbTbDataComp, SL_DISCARD_DUP_KEY, tsdbTbDataGetUid); @@ -309,8 +309,8 @@ int tsdbInsertTableData(STsdb *pTsdb, SSubmitMsgIter *pMsgIter, SSubmitBlk *pBlo if (pTbData->maxKey.ts < keyMax) pTbData->maxKey.ts = keyMax; pMemTable->nRow += pMsgIter->numOfRows; - if (pMemTable->keyMin > keyMin) pMemTable->keyMin = keyMin; - if (pMemTable->keyMax < keyMax) pMemTable->keyMax = keyMax; + if (pMemTable->minKey.ts > keyMin) pMemTable->minKey.ts = keyMin; + if (pMemTable->maxKey.ts < keyMax) pMemTable->maxKey.ts = keyMax; pRsp->numOfRows = pMsgIter->numOfRows; pRsp->affectedRows = pMsgIter->numOfRows; From 179dc390752161448acaea297acb98e70f56ac43 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Mon, 6 Jun 2022 01:51:10 +0000 Subject: [PATCH 05/17] more refact --- source/dnode/vnode/src/inc/tsdb.h | 4 ++-- source/dnode/vnode/src/tsdb/tsdbMemTable.c | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 329b93e73c..35bdd0e312 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -223,8 +223,8 @@ struct STbData { }; struct SMemTable { - SVBufPool *pPool; - T_REF_DECLARE() + STsdb *pTsdb; + int32_t nRef; SRWLatch latch; TSDBKEY minKey; TSDBKEY maxKey; diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index 1427751eae..a235a6523b 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -36,8 +36,8 @@ int tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable) { return -1; } - pMemTable->pPool = pTsdb->pVnode->inUse; - T_REF_INIT_VAL(pMemTable, 1); + pMemTable->pTsdb = pTsdb; + pMemTable->nRef = 1; taosInitRWLatch(&pMemTable->latch); pMemTable->minKey = (TSDBKEY){.ts = TSKEY_MAX, .version = INT64_MAX}; pMemTable->maxKey = (TSDBKEY){.ts = TSKEY_MIN, .version = -1}; @@ -287,7 +287,7 @@ int tsdbInsertTableData(STsdb *pTsdb, SSubmitMsgIter *pMsgIter, SSubmitBlk *pBlo // copy data to buffer pool int32_t tlen = pMsgIter->dataLen + pMsgIter->schemaLen + sizeof(*pBlock); - pBlkCopy = (SSubmitBlk *)vnodeBufPoolMalloc(pTsdb->mem->pPool, tlen); + pBlkCopy = (SSubmitBlk *)vnodeBufPoolMalloc(pTsdb->pVnode->inUse, tlen); memcpy(pBlkCopy, pBlock, tlen); tInitSubmitBlkIter(pMsgIter, pBlkCopy, &blkIter); From 6d3f41c96ee7b3c92f85445a7d06f9ed1da112fc Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Mon, 6 Jun 2022 02:28:58 +0000 Subject: [PATCH 06/17] more --- source/dnode/vnode/src/inc/tsdb.h | 3 +++ 1 file changed, 3 insertions(+) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 35bdd0e312..c593ddda23 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -229,6 +229,9 @@ struct SMemTable { TSDBKEY minKey; TSDBKEY maxKey; int64_t nRow; + int64_t nDelOp; + SDelOp *pHead; + SDelOp *pTail; SSkipList *pSlIdx; // SSkiplist SHashObj *pHashIdx; }; From 52a62d6ae980a751346d33eedef588bc8fc337fd Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Mon, 6 Jun 2022 07:13:52 +0000 Subject: [PATCH 07/17] more --- source/dnode/vnode/CMakeLists.txt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/CMakeLists.txt b/source/dnode/vnode/CMakeLists.txt index 05b4a270d6..c92a9d7cb7 100644 --- a/source/dnode/vnode/CMakeLists.txt +++ b/source/dnode/vnode/CMakeLists.txt @@ -36,12 +36,12 @@ target_sources( # tsdb "src/tsdb/tsdbCommit.c" - "src/tsdb/tsdbCommit2.c" + # "src/tsdb/tsdbCommit2.c" "src/tsdb/tsdbFile.c" "src/tsdb/tsdbFS.c" "src/tsdb/tsdbOpen.c" "src/tsdb/tsdbMemTable.c" - "src/tsdb/tsdbMemTable2.c" + # "src/tsdb/tsdbMemTable2.c" "src/tsdb/tsdbRead.c" "src/tsdb/tsdbReadImpl.c" "src/tsdb/tsdbWrite.c" From 487530636d4e3b73f5b903a1c3c33b56c6654578 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Mon, 6 Jun 2022 10:00:48 +0000 Subject: [PATCH 08/17] feat: tsdb multi-version 1 --- source/dnode/vnode/src/inc/tsdb.h | 125 ++-- source/dnode/vnode/src/inc/vnodeInt.h | 4 +- source/dnode/vnode/src/tsdb/tsdbCommit.c | 72 +- source/dnode/vnode/src/tsdb/tsdbMemTable.c | 701 +++++++++++++++----- source/dnode/vnode/src/tsdb/tsdbMemTable2.c | 5 - source/dnode/vnode/src/tsdb/tsdbRead.c | 124 ++-- source/dnode/vnode/src/tsdb/tsdbWrite.c | 2 +- source/dnode/vnode/src/vnd/vnodeSvr.c | 2 +- 8 files changed, 702 insertions(+), 333 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index c593ddda23..b828b1ca78 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -39,35 +39,46 @@ typedef struct SDelOp SDelOp; static int tsdbKeyCmprFn(const void *p1, const void *p2); -// tsdbMemTable ================ -typedef struct STbData STbData; -typedef struct SMemTable SMemTable; -typedef struct SMergeInfo SMergeInfo; -typedef struct STable STable; +// tsdbMemTable ============================================================================================== +typedef struct STbData STbData; +typedef struct SMemTable SMemTable; +typedef struct STbDataIter STbDataIter; +typedef struct SMergeInfo SMergeInfo; +typedef struct STable STable; -int tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable); -void tsdbMemTableDestroy(SMemTable *pMemTable); -int tsdbLoadDataFromCache(STsdb *pTsdb, STable *pTable, SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, - SDataCols *pCols, TKEY *filterKeys, int nFilterKeys, bool keepDup, SMergeInfo *pMergeInfo); +// SMemTable +int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable); +void tsdbMemTableDestroy(SMemTable *pMemTable); +void tsdbGetTbDataFromMemTable(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid, STbData **ppTbData); + +// STbDataIter +int32_t tsdbTbDataIterCreate(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, STbDataIter **ppIter); +void *tsdbTbDataIterDestroy(STbDataIter *pIter); +void tsdbTbDataIterOpen(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, STbDataIter *pIter); +bool tsdbTbDataIterNext(STbDataIter *pIter); +bool tsdbTbDataIterGet(STbDataIter *pIter, TSDBROW *pRow); + +int tsdbLoadDataFromCache(STsdb *pTsdb, STable *pTable, SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, + SDataCols *pCols, TKEY *filterKeys, int nFilterKeys, bool keepDup, SMergeInfo *pMergeInfo); // tsdbMemTable2.c ============================================================================================== -typedef struct SMemTable2 SMemTable2; -typedef struct SMemData SMemData; -typedef struct SMemDataIter SMemDataIter; +// typedef struct SMemTable2 SMemTable2; +// typedef struct SMemData SMemData; +// typedef struct SMemDataIter SMemDataIter; -int32_t tsdbMemTableCreate2(STsdb *pTsdb, SMemTable2 **ppMemTable); -void tsdbMemTableDestroy2(SMemTable2 *pMemTable); -int32_t tsdbInsertTableData2(STsdb *pTsdb, int64_t version, SVSubmitBlk *pSubmitBlk); -int32_t tsdbDeleteTableData2(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey); +// int32_t tsdbMemTableCreate2(STsdb *pTsdb, SMemTable2 **ppMemTable); +// void tsdbMemTableDestroy2(SMemTable2 *pMemTable); +// int32_t tsdbInsertTableData2(STsdb *pTsdb, int64_t version, SVSubmitBlk *pSubmitBlk); +// int32_t tsdbDeleteTableData2(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey); -/* SMemDataIter */ -void tsdbMemDataIterOpen(SMemData *pMemData, TSDBKEY *pKey, int8_t backward, SMemDataIter *pIter); -bool tsdbMemDataIterNext(SMemDataIter *pIter); -void tsdbMemDataIterGet(SMemDataIter *pIter, TSDBROW **ppRow); +// /* SMemDataIter */ +// void tsdbMemDataIterOpen(SMemData *pMemData, TSDBKEY *pKey, int8_t backward, SMemDataIter *pIter); +// bool tsdbMemDataIterNext(SMemDataIter *pIter); +// void tsdbMemDataIterGet(SMemDataIter *pIter, TSDBROW **ppRow); -// tsdbCommit2.c ============================================================================================== -int32_t tsdbBegin2(STsdb *pTsdb); -int32_t tsdbCommit2(STsdb *pTsdb); +// // tsdbCommit2.c ============================================================================================== +// int32_t tsdbBegin2(STsdb *pTsdb); +// int32_t tsdbCommit2(STsdb *pTsdb); // tsdbFile.c ============================================================================================== typedef int32_t TSDB_FILE_T; @@ -211,29 +222,39 @@ struct TSDBKEY { TSKEY ts; }; +typedef struct SMemSkipListNode SMemSkipListNode; +struct SMemSkipListNode { + int8_t level; + SMemSkipListNode *forwards[0]; +}; +typedef struct SMemSkipList { + uint32_t seed; + int64_t size; + int8_t maxLevel; + int8_t level; + SMemSkipListNode *pHead; + SMemSkipListNode *pTail; +} SMemSkipList; + struct STbData { - tb_uid_t suid; - tb_uid_t uid; - TSDBKEY minKey; - TSDBKEY maxKey; - SDelOp *pHead; - SDelOp *pTail; - int64_t nrows; - SSkipList *pData; + tb_uid_t suid; + tb_uid_t uid; + TSDBKEY minKey; + TSDBKEY maxKey; + SDelOp *pHead; + SDelOp *pTail; + SMemSkipList sl; }; struct SMemTable { - STsdb *pTsdb; - int32_t nRef; - SRWLatch latch; - TSDBKEY minKey; - TSDBKEY maxKey; - int64_t nRow; - int64_t nDelOp; - SDelOp *pHead; - SDelOp *pTail; - SSkipList *pSlIdx; // SSkiplist - SHashObj *pHashIdx; + SRWLatch latch; + STsdb *pTsdb; + int32_t nRef; + TSDBKEY minKey; + TSDBKEY maxKey; + int64_t nRow; + int64_t nDelOp; + SArray *aTbData; // SArray }; struct STsdbFSMeta { @@ -656,7 +677,7 @@ struct SFSIter { struct TSDBROW { int64_t version; - STSRow2 tsRow; + STSRow *pTSRow; }; struct TABLEID { @@ -709,16 +730,6 @@ static FORCE_INLINE int tsdbKeyCmprFn(const void *p1, const void *p2) { return 0; } -typedef struct SMemSkipListNode SMemSkipListNode; -typedef struct SMemSkipList { - uint32_t seed; - int32_t size; - int8_t maxLevel; - int8_t level; - SMemSkipListNode *pHead; - SMemSkipListNode *pTail; -} SMemSkipList; - struct SMemData { tb_uid_t suid; tb_uid_t uid; @@ -730,13 +741,19 @@ struct SMemData { }; struct SMemDataIter { - SMemData *pMemData; + STbData *pMemData; int8_t backward; TSDBROW *pRow; SMemSkipListNode *pNode; // current node TSDBROW row; }; +struct STbDataIter { + STbData *pTbData; + int8_t backward; + SMemSkipListNode *pNode; +}; + #endif #ifdef __cplusplus diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 15469786e0..77005767fe 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -116,7 +116,9 @@ int tsdbBegin(STsdb* pTsdb); int32_t tsdbCommit(STsdb* pTsdb); int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, SSubmitReq* pMsg); int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSubmitRsp* pRsp); -int tsdbInsertTableData(STsdb* pTsdb, SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock, SSubmitBlkRsp* pRsp); +int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock, + SSubmitBlkRsp* pRsp); +int32_t tsdbDeleteTableData(STsdb* pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey); tsdbReaderT* tsdbQueryTables(SVnode* pVnode, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId, uint64_t taskId); tsdbReaderT tsdbQueryCacheLastT(STsdb* tsdb, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId, diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 06c75d029d..b1f6b6143e 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -58,26 +58,26 @@ typedef struct { #define TSDB_COMMIT_DEFAULT_ROWS(ch) TSDB_DEFAULT_BLOCK_ROWS(TSDB_COMMIT_REPO(ch)->pVnode->config.tsdbCfg.maxRows) #define TSDB_COMMIT_TXN_VERSION(ch) FS_TXN_VERSION(REPO_FS(TSDB_COMMIT_REPO(ch))) -static void tsdbStartCommit(STsdb *pRepo); -static void tsdbEndCommit(STsdb *pTsdb, int eno); -static int tsdbInitCommitH(SCommitH *pCommith, STsdb *pRepo); -static void tsdbSeekCommitIter(SCommitH *pCommith, TSKEY key); -static int tsdbNextCommitFid(SCommitH *pCommith); -static void tsdbDestroyCommitH(SCommitH *pCommith); -static int tsdbCreateCommitIters(SCommitH *pCommith); -static void tsdbDestroyCommitIters(SCommitH *pCommith); -static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid); -static void tsdbResetCommitFile(SCommitH *pCommith); -static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid); -static int tsdbCommitToTable(SCommitH *pCommith, int tid); -static bool tsdbCommitIsSameFile(SCommitH *pCommith, int bidx); -static int tsdbMoveBlkIdx(SCommitH *pCommith, SBlockIdx *pIdx); -static int tsdbSetCommitTable(SCommitH *pCommith, STable *pTable); -static int tsdbComparKeyBlock(const void *arg1, const void *arg2); -static int tsdbWriteBlockInfo(SCommitH *pCommih); -static int tsdbCommitMemData(SCommitH *pCommith, SCommitIter *pIter, TSKEY keyLimit, bool toData); -static int tsdbMergeMemData(SCommitH *pCommith, SCommitIter *pIter, int bidx); -static int tsdbMoveBlock(SCommitH *pCommith, int bidx); +static void tsdbStartCommit(STsdb *pRepo); +static void tsdbEndCommit(STsdb *pTsdb, int eno); +static int tsdbInitCommitH(SCommitH *pCommith, STsdb *pRepo); +static void tsdbSeekCommitIter(SCommitH *pCommith, TSKEY key); +static int tsdbNextCommitFid(SCommitH *pCommith); +static void tsdbDestroyCommitH(SCommitH *pCommith); +static int32_t tsdbCreateCommitIters(SCommitH *pCommith); +static void tsdbDestroyCommitIters(SCommitH *pCommith); +static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid); +static void tsdbResetCommitFile(SCommitH *pCommith); +static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid); +static int tsdbCommitToTable(SCommitH *pCommith, int tid); +static bool tsdbCommitIsSameFile(SCommitH *pCommith, int bidx); +static int tsdbMoveBlkIdx(SCommitH *pCommith, SBlockIdx *pIdx); +static int tsdbSetCommitTable(SCommitH *pCommith, STable *pTable); +static int tsdbComparKeyBlock(const void *arg1, const void *arg2); +static int tsdbWriteBlockInfo(SCommitH *pCommih); +static int tsdbCommitMemData(SCommitH *pCommith, SCommitIter *pIter, TSKEY keyLimit, bool toData); +static int tsdbMergeMemData(SCommitH *pCommith, SCommitIter *pIter, int bidx); +static int tsdbMoveBlock(SCommitH *pCommith, int bidx); static int tsdbCommitAddBlock(SCommitH *pCommith, const SBlock *pSupBlock, const SBlock *pSubBlocks, int nSubBlocks); static int tsdbMergeBlockData(SCommitH *pCommith, SCommitIter *pIter, SDataCols *pDataCols, TSKEY keyLimit, bool isLastOneBlock); @@ -453,11 +453,32 @@ static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid) { return 0; } -static int tsdbCreateCommitIters(SCommitH *pCommith) { - STsdb *pRepo = TSDB_COMMIT_REPO(pCommith); - SMemTable *pMem = pRepo->imem; +static int32_t tsdbCreateCommitIters(SCommitH *pCommith) { + int32_t code = 0; + STsdb *pRepo = TSDB_COMMIT_REPO(pCommith); + SMemTable *pMem = pRepo->imem; + SCommitIter *pCommitIter; + + pCommith->niters = taosArrayGetSize(pMem->aTbData); + pCommith->iters = (SCommitIter *)taosMemoryCalloc(pCommith->niters, sizeof(SCommitIter)); + if (pCommith->iters == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + + for (int32_t iIter = 0; iIter < pCommith->niters; iIter++) { + pCommitIter = (SCommitIter *)taosArrayGetP(pMem->aTbData, iIter); + // TODO + + // pCommitIter->pIter = + } + + return code; + +_err: + return code; +#if 0 SSkipListIterator *pSlIter; - SCommitIter *pCommitIter; SSkipListNode *pNode; STbData *pTbData; STSchema *pTSchema = NULL; @@ -495,8 +516,7 @@ static int tsdbCreateCommitIters(SCommitH *pCommith) { } } tSkipListDestroyIter(pSlIter); - - return 0; +#endif } static void tsdbDestroyCommitIters(SCommitH *pCommith) { diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index a235a6523b..46ad963c2c 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -15,69 +15,301 @@ #include "tsdb.h" -static STbData *tsdbNewTbData(tb_uid_t uid); -static void tsdbFreeTbData(STbData *pTbData); -static char *tsdbGetTsTupleKey(const void *data); -static int tsdbTbDataComp(const void *arg1, const void *arg2); -static char *tsdbTbDataGetUid(const void *arg); -static int tsdbAppendTableRowToCols(STsdb *pTsdb, STable *pTable, SDataCols *pCols, STSchema **ppSchema, STSRow *row, - bool merge); +#define SL_MAX_LEVEL 5 -int tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable) { - SMemTable *pMemTable; - SVnode *pVnode; +#define SL_NODE_SIZE(l) (sizeof(SMemSkipListNode) + sizeof(SMemSkipListNode *) * (l)*2) +#define SL_NODE_FORWARD(n, l) ((n)->forwards[l]) +#define SL_NODE_BACKWARD(n, l) ((n)->forwards[(n)->level + (l)]) +#define SL_NODE_DATA(n) (&SL_NODE_BACKWARD(n, (n)->level)) - *ppMemTable = NULL; - pVnode = pTsdb->pVnode; +#define SL_MOVE_BACKWARD 0x1 +#define SL_MOVE_FROM_POS 0x2 + +static int32_t tPutTSDBRow(uint8_t *p, TSDBROW *pRow); +static int32_t tGetTSDBRow(uint8_t *p, TSDBROW *pRow); +static void tbDataMovePosTo(STbData *pTbData, SMemSkipListNode **pos, TSDBKEY *pKey, int32_t flags); +static int32_t tsdbGetOrCreateTbData(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid, STbData **ppTbData); +static int32_t tsdbInsertTableDataImpl(SMemTable *pMemTable, STbData *pTbData, int64_t version, + SSubmitMsgIter *pMsgIter, SSubmitBlk *pBlock, SSubmitBlkRsp *pRsp); + +int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable) { + int32_t code = 0; + SMemTable *pMemTable = NULL; - // alloc handle pMemTable = (SMemTable *)taosMemoryCalloc(1, sizeof(*pMemTable)); if (pMemTable == NULL) { - return -1; + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; } - + taosInitRWLatch(&pMemTable->latch); pMemTable->pTsdb = pTsdb; pMemTable->nRef = 1; - taosInitRWLatch(&pMemTable->latch); pMemTable->minKey = (TSDBKEY){.ts = TSKEY_MAX, .version = INT64_MAX}; pMemTable->maxKey = (TSDBKEY){.ts = TSKEY_MIN, .version = -1}; pMemTable->nRow = 0; - pMemTable->pSlIdx = tSkipListCreate(pVnode->config.tsdbCfg.slLevel, TSDB_DATA_TYPE_BIGINT, sizeof(tb_uid_t), - tsdbTbDataComp, SL_DISCARD_DUP_KEY, tsdbTbDataGetUid); - if (pMemTable->pSlIdx == NULL) { + pMemTable->nDelOp = 0; + pMemTable->aTbData = taosArrayInit(128, sizeof(STbData *)); + if (pMemTable->aTbData == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; taosMemoryFree(pMemTable); - return -1; - } - - pMemTable->pHashIdx = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); - if (pMemTable->pHashIdx == NULL) { - tSkipListDestroy(pMemTable->pSlIdx); - taosMemoryFree(pMemTable); - return -1; + goto _err; } *ppMemTable = pMemTable; - return 0; + return code; + +_err: + *ppMemTable = NULL; + return code; } void tsdbMemTableDestroy(SMemTable *pMemTable) { if (pMemTable) { - taosHashCleanup(pMemTable->pHashIdx); - SSkipListIterator *pIter = tSkipListCreateIter(pMemTable->pSlIdx); - SSkipListNode *pNode = NULL; - STbData *pTbData = NULL; - for (;;) { - if (!tSkipListIterNext(pIter)) break; - pNode = tSkipListIterGet(pIter); - pTbData = (STbData *)pNode->pData; - tsdbFreeTbData(pTbData); - } - tSkipListDestroyIter(pIter); - tSkipListDestroy(pMemTable->pSlIdx); + taosArrayDestroy(pMemTable->aTbData); taosMemoryFree(pMemTable); } } +static int32_t tbDataPCmprFn(const void *p1, const void *p2) { + STbData *pTbData1 = *(STbData **)p1; + STbData *pTbData2 = *(STbData **)p2; + + if (pTbData1->suid < pTbData2->suid) { + return -1; + } else if (pTbData1->suid > pTbData2->suid) { + return 1; + } + + if (pTbData1->uid < pTbData2->uid) { + return -1; + } else if (pTbData1->uid > pTbData2->uid) { + return 1; + } + + return 0; +} +void tsdbGetTbDataFromMemTable(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid, STbData **ppTbData) { + STbData *pTbData = &(STbData){.suid = suid, .uid = uid}; + void *p = taosArraySearch(pMemTable->aTbData, &pTbData, tbDataPCmprFn, TD_EQ); + *ppTbData = p ? *(STbData **)p : NULL; +} + +int32_t tsdbInsertTableData(STsdb *pTsdb, int64_t version, SSubmitMsgIter *pMsgIter, SSubmitBlk *pBlock, + SSubmitBlkRsp *pRsp) { + int32_t code = 0; + SMemTable *pMemTable = pTsdb->mem; + STbData *pTbData = NULL; + tb_uid_t suid = pMsgIter->suid; + tb_uid_t uid = pMsgIter->uid; + int32_t sverNew; + + // check if table exists (todo: refact) + SMetaReader mr = {0}; + SMetaEntry me = {0}; + metaReaderInit(&mr, pTsdb->pVnode->pMeta, 0); + if (metaGetTableEntryByUid(&mr, pMsgIter->uid) < 0) { + metaReaderClear(&mr); + code = TSDB_CODE_PAR_TABLE_NOT_EXIST; + goto _err; + } + if (pRsp->tblFName) strcat(pRsp->tblFName, mr.me.name); + + if (mr.me.type == TSDB_NORMAL_TABLE) { + sverNew = mr.me.ntbEntry.schemaRow.version; + } else { + metaGetTableEntryByUid(&mr, mr.me.ctbEntry.suid); + sverNew = mr.me.stbEntry.schemaRow.version; + } + metaReaderClear(&mr); + pRsp->sver = sverNew; + + // create/get STbData to op + code = tsdbGetOrCreateTbData(pMemTable, suid, uid, &pTbData); + if (code) { + goto _err; + } + + // do insert impl + code = tsdbInsertTableDataImpl(pMemTable, pTbData, version, pMsgIter, pBlock, pRsp); + if (code) { + goto _err; + } + + return code; + +_err: + return code; +} + +int32_t tsdbDeleteTableData(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey) { + int32_t code = 0; + SMemTable *pMemTable = pTsdb->mem; + STbData *pTbData = NULL; + SVBufPool *pPool = pTsdb->pVnode->inUse; + + // check if table exists (todo) + + code = tsdbGetOrCreateTbData(pMemTable, suid, uid, &pTbData); + if (code) { + goto _err; + } + + // do delete + SDelOp *pDelOp = (SDelOp *)vnodeBufPoolMalloc(pPool, sizeof(*pDelOp)); + if (pDelOp == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + pDelOp->version = version; + pDelOp->sKey = sKey; + pDelOp->eKey = eKey; + pDelOp->pNext = NULL; + if (pTbData->pHead == NULL) { + ASSERT(pTbData->pTail == NULL); + pTbData->pHead = pTbData->pTail = pDelOp; + } else { + pTbData->pTail->pNext = pDelOp; + pTbData->pTail = pDelOp; + } + + // update the state of pMemTable and other (todo) + + pMemTable->nDelOp++; + + tsdbError("vgId:%d delete data from table suid:%" PRId64 " uid:%" PRId64 " skey:%" PRId64 " eKey:%" PRId64 + " since %s", + TD_VID(pTsdb->pVnode), suid, uid, sKey, eKey, tstrerror(code)); + return code; + +_err: + tsdbError("vgId:%d failed to delete data from table suid:%" PRId64 " uid:%" PRId64 " skey:%" PRId64 " eKey:%" PRId64 + " since %s", + TD_VID(pTsdb->pVnode), suid, uid, sKey, eKey, tstrerror(code)); + return code; +} + +static int tsdbAppendTableRowToCols(STsdb *pTsdb, STable *pTable, SDataCols *pCols, STSchema **ppSchema, STSRow *row, + bool merge) { + if (pCols) { + if (*ppSchema == NULL || schemaVersion(*ppSchema) != TD_ROW_SVER(row)) { + *ppSchema = tsdbGetTableSchemaImpl(pTsdb, pTable, false, false, TD_ROW_SVER(row)); + if (*ppSchema == NULL) { + ASSERT(false); + return -1; + } + } + + tdAppendSTSRowToDataCol(row, *ppSchema, pCols, merge); + } + + return 0; +} + +int32_t tsdbTbDataIterCreate(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, STbDataIter **ppIter) { + int32_t code = 0; + + (*ppIter) = (STbDataIter *)taosMemoryCalloc(1, sizeof(STbDataIter)); + if ((*ppIter) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + + tsdbTbDataIterOpen(pTbData, pFrom, backward, *ppIter); + +_exit: + return code; +} + +void *tsdbTbDataIterDestroy(STbDataIter *pIter) { + if (pIter) { + taosMemoryFree(pIter); + } + + return NULL; +} + +void tsdbTbDataIterOpen(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, STbDataIter *pIter) { + SMemSkipListNode *pos[SL_MAX_LEVEL]; + + pIter->pTbData = pTbData; + pIter->backward = backward; + if (pFrom == NULL) { + // create from head or tail + if (backward) { + pIter->pNode = SL_NODE_BACKWARD(pTbData->sl.pTail, 0); + } else { + pIter->pNode = SL_NODE_FORWARD(pTbData->sl.pHead, 0); + } + } else { + // create from a key + if (backward) { + tbDataMovePosTo(pTbData, pos, pFrom, SL_MOVE_BACKWARD); + pIter->pNode = SL_NODE_BACKWARD(pos[0], 0); + } else { + tbDataMovePosTo(pTbData, pos, pFrom, 0); + pIter->pNode = SL_NODE_FORWARD(pos[0], 0); + } + } +} + +bool tsdbTbDataIterNext(STbDataIter *pIter) { + SMemSkipListNode *pHead = pIter->pTbData->sl.pHead; + SMemSkipListNode *pTail = pIter->pTbData->sl.pTail; + + if (pIter->backward) { + ASSERT(pIter->pNode != pTail); + + if (pIter->pNode == pHead) { + return false; + } + + pIter->pNode = SL_NODE_BACKWARD(pIter->pNode, 0); + if (pIter->pNode == pHead) { + return false; + } + } else { + ASSERT(pIter->pNode != pHead); + + if (pIter->pNode == pTail) { + return false; + } + + pIter->pNode = SL_NODE_FORWARD(pIter->pNode, 0); + if (pIter->pNode == pTail) { + return false; + } + } + + return true; +} + +bool tsdbTbDataIterGet(STbDataIter *pIter, TSDBROW *pRow) { + SMemSkipListNode *pHead = pIter->pTbData->sl.pHead; + SMemSkipListNode *pTail = pIter->pTbData->sl.pTail; + TSDBROW row = {0}; + + if (pRow == NULL) { + pRow = &row; + } + + if (pIter->backward) { + ASSERT(pIter->pNode != pTail); + + if (pIter->pNode == pHead) { + return false; + } + } else { + ASSERT(pIter->pNode != pHead); + + if (pIter->pNode == pTail) { + return false; + } + } + + tGetTSDBRow((uint8_t *)SL_NODE_DATA(pIter->pNode), pRow); + return true; +} + /** * This is an important function to load data or try to load data from memory skiplist iterator. * @@ -238,151 +470,276 @@ int tsdbLoadDataFromCache(STsdb *pTsdb, STable *pTable, SSkipListIterator *pIter return 0; } -int tsdbInsertTableData(STsdb *pTsdb, SSubmitMsgIter *pMsgIter, SSubmitBlk *pBlock, SSubmitBlkRsp *pRsp) { - SSubmitBlkIter blkIter = {0}; - SMemTable *pMemTable = pTsdb->mem; - void *tptr; - STbData *pTbData; - STSRow *row; - TSKEY keyMin; - TSKEY keyMax; - SSubmitBlk *pBlkCopy; - int64_t sverNew; +static int32_t tsdbGetOrCreateTbData(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid, STbData **ppTbData) { + int32_t code = 0; + int32_t idx = 0; + STbData *pTbData = NULL; + STbData *pTbDataT = &(STbData){.suid = suid, .uid = uid}; - // check if table exists - SMetaReader mr = {0}; - SMetaEntry me = {0}; - metaReaderInit(&mr, pTsdb->pVnode->pMeta, 0); - if (metaGetTableEntryByUid(&mr, pMsgIter->uid) < 0) { - metaReaderClear(&mr); - terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST; - return -1; + // get + idx = taosArraySearchIdx(pMemTable->aTbData, &pTbDataT, tbDataPCmprFn, TD_GE); + if (idx >= 0) { + pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, idx); + if (tbDataPCmprFn(&pTbDataT, &pTbData) == 0) goto _exit; } - if (pRsp->tblFName) strcat(pRsp->tblFName, mr.me.name); - if (mr.me.type == TSDB_NORMAL_TABLE) { - sverNew = mr.me.ntbEntry.schemaRow.version; - } else { - metaGetTableEntryByUid(&mr, mr.me.ctbEntry.suid); - sverNew = mr.me.stbEntry.schemaRow.version; + // create + SVBufPool *pPool = pMemTable->pTsdb->pVnode->inUse; + int8_t maxLevel = pMemTable->pTsdb->pVnode->config.tsdbCfg.slLevel; + + pTbData = vnodeBufPoolMalloc(pPool, sizeof(*pTbData) + SL_NODE_SIZE(maxLevel) * 2); + if (pTbData == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; } - metaReaderClear(&mr); + pTbData->suid = suid; + pTbData->uid = uid; + pTbData->minKey = (TSDBKEY){.ts = TSKEY_MAX, .version = INT64_MAX}; + pTbData->maxKey = (TSDBKEY){.ts = TSKEY_MIN, .version = -1}; + pTbData->pHead = NULL; + pTbData->pTail = NULL; + pTbData->sl.seed = taosRand(); + pTbData->sl.size = 0; + pTbData->sl.maxLevel = maxLevel; + pTbData->sl.level = 0; + pTbData->sl.pHead = (SMemSkipListNode *)&pTbData[1]; + pTbData->sl.pTail = (SMemSkipListNode *)POINTER_SHIFT(pTbData->sl.pHead, SL_NODE_SIZE(maxLevel)); + pTbData->sl.pHead->level = maxLevel; + pTbData->sl.pTail->level = maxLevel; + for (int8_t iLevel = 0; iLevel < maxLevel; iLevel++) { + SL_NODE_FORWARD(pTbData->sl.pHead, iLevel) = pTbData->sl.pTail; + SL_NODE_BACKWARD(pTbData->sl.pTail, iLevel) = pTbData->sl.pHead; - // create container is nedd - tptr = taosHashGet(pMemTable->pHashIdx, &(pMsgIter->uid), sizeof(pMsgIter->uid)); - if (tptr == NULL) { - pTbData = tsdbNewTbData(pMsgIter->uid); - if (pTbData == NULL) { - return -1; + SL_NODE_BACKWARD(pTbData->sl.pHead, iLevel) = NULL; + SL_NODE_FORWARD(pTbData->sl.pTail, iLevel) = NULL; + } + + if (taosArrayInsert(pMemTable->aTbData, idx < 0 ? 0 : idx, &pTbData) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + +_exit: + *ppTbData = pTbData; + return code; + +_err: + *ppTbData = NULL; + return code; +} + +static void tbDataMovePosTo(STbData *pTbData, SMemSkipListNode **pos, TSDBKEY *pKey, int32_t flags) { + SMemSkipListNode *px; + SMemSkipListNode *pn; + TSDBKEY *pTKey; + int c; + int backward = flags & SL_MOVE_BACKWARD; + int fromPos = flags & SL_MOVE_FROM_POS; + + if (backward) { + px = pTbData->sl.pTail; + + for (int8_t iLevel = pTbData->sl.maxLevel - 1; iLevel >= pTbData->sl.level; iLevel--) { + pos[iLevel] = px; } - // Put into hash - taosHashPut(pMemTable->pHashIdx, &(pMsgIter->uid), sizeof(pMsgIter->uid), &(pTbData), sizeof(pTbData)); + if (pTbData->sl.level) { + if (fromPos) px = pos[pTbData->sl.level - 1]; - // Put into skiplist - tSkipListPut(pMemTable->pSlIdx, pTbData); - } else { - pTbData = *(STbData **)tptr; - } + for (int8_t iLevel = pTbData->sl.level - 1; iLevel >= 0; iLevel--) { + pn = SL_NODE_BACKWARD(px, iLevel); + while (pn != pTbData->sl.pHead) { + pTKey = (TSDBKEY *)SL_NODE_DATA(pn); - // copy data to buffer pool - int32_t tlen = pMsgIter->dataLen + pMsgIter->schemaLen + sizeof(*pBlock); - pBlkCopy = (SSubmitBlk *)vnodeBufPoolMalloc(pTsdb->pVnode->inUse, tlen); - memcpy(pBlkCopy, pBlock, tlen); + c = tsdbKeyCmprFn(pTKey, pKey); + if (c <= 0) { + break; + } else { + px = pn; + pn = SL_NODE_BACKWARD(px, iLevel); + } + } - tInitSubmitBlkIter(pMsgIter, pBlkCopy, &blkIter); - if (blkIter.row == NULL) return 0; - keyMin = TD_ROW_KEY(blkIter.row); - - tSkipListPutBatchByIter(pTbData->pData, &blkIter, (iter_next_fn_t)tGetSubmitBlkNext); - -#ifdef TD_DEBUG_PRINT_ROW - printf("!!! %s:%d vgId:%d dir:%s table:%" PRIi64 " has %d rows in skiplist\n\n", __func__, __LINE__, - TD_VID(pTsdb->pVnode), pTsdb->dir, pTbData->uid, SL_SIZE(pTbData->pData)); -#endif - - // Set statistics - keyMax = TD_ROW_KEY(blkIter.row); - - pTbData->nrows += pMsgIter->numOfRows; - if (pTbData->minKey.ts > keyMin) pTbData->minKey.ts = keyMin; - if (pTbData->maxKey.ts < keyMax) pTbData->maxKey.ts = keyMax; - - pMemTable->nRow += pMsgIter->numOfRows; - if (pMemTable->minKey.ts > keyMin) pMemTable->minKey.ts = keyMin; - if (pMemTable->maxKey.ts < keyMax) pMemTable->maxKey.ts = keyMax; - - pRsp->numOfRows = pMsgIter->numOfRows; - pRsp->affectedRows = pMsgIter->numOfRows; - pRsp->sver = sverNew; - - return 0; -} - -static STbData *tsdbNewTbData(tb_uid_t uid) { - STbData *pTbData = (STbData *)taosMemoryCalloc(1, sizeof(*pTbData)); - if (pTbData == NULL) { - return NULL; - } - - pTbData->uid = uid; - pTbData->minKey.ts = TSKEY_MAX; - pTbData->maxKey.ts = TSKEY_MIN; - pTbData->nrows = 0; -#if 0 - pTbData->pData = tSkipListCreate(5, TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), tkeyComparFn, SL_DISCARD_DUP_KEY, - tsdbGetTsTupleKey); -#endif - pTbData->pData = - tSkipListCreate(5, TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), tkeyComparFn, SL_ALLOW_DUP_KEY, tsdbGetTsTupleKey); - if (pTbData->pData == NULL) { - taosMemoryFree(pTbData); - return NULL; - } - - return pTbData; -} - -static void tsdbFreeTbData(STbData *pTbData) { - if (pTbData) { - tSkipListDestroy(pTbData->pData); - taosMemoryFree(pTbData); - } -} - -static char *tsdbGetTsTupleKey(const void *data) { return (char *)TD_ROW_KEY_ADDR((STSRow *)data); } - -static int tsdbTbDataComp(const void *arg1, const void *arg2) { - STbData *pTbData1 = (STbData *)arg1; - STbData *pTbData2 = (STbData *)arg2; - - if (pTbData1->uid > pTbData2->uid) { - return 1; - } else if (pTbData1->uid == pTbData2->uid) { - return 0; - } else { - return -1; - } -} - -static char *tsdbTbDataGetUid(const void *arg) { - STbData *pTbData = (STbData *)arg; - return (char *)(&(pTbData->uid)); -} - -static int tsdbAppendTableRowToCols(STsdb *pTsdb, STable *pTable, SDataCols *pCols, STSchema **ppSchema, STSRow *row, - bool merge) { - if (pCols) { - if (*ppSchema == NULL || schemaVersion(*ppSchema) != TD_ROW_SVER(row)) { - *ppSchema = tsdbGetTableSchemaImpl(pTsdb, pTable, false, false, TD_ROW_SVER(row)); - if (*ppSchema == NULL) { - ASSERT(false); - return -1; + pos[iLevel] = px; } } + } else { + px = pTbData->sl.pHead; - tdAppendSTSRowToDataCol(row, *ppSchema, pCols, merge); + for (int8_t iLevel = pTbData->sl.maxLevel - 1; iLevel >= pTbData->sl.level; iLevel--) { + pos[iLevel] = px; + } + + if (pTbData->sl.level) { + if (fromPos) px = pos[pTbData->sl.level - 1]; + + for (int8_t iLevel = pTbData->sl.level - 1; iLevel >= 0; iLevel--) { + pn = SL_NODE_FORWARD(px, iLevel); + while (pn != pTbData->sl.pHead) { + pTKey = (TSDBKEY *)SL_NODE_DATA(pn); + + c = tsdbKeyCmprFn(pTKey, pKey); + if (c >= 0) { + break; + } else { + px = pn; + pn = SL_NODE_FORWARD(px, iLevel); + } + } + + pos[iLevel] = px; + } + } + } +} + +static FORCE_INLINE int8_t tsdbMemSkipListRandLevel(SMemSkipList *pSl) { + int8_t level = 1; + int8_t tlevel = TMIN(pSl->maxLevel, pSl->level + 1); + const uint32_t factor = 4; + + while ((taosRandR(&pSl->seed) % factor) == 0 && level < tlevel) { + level++; } - return 0; + return level; +} +static int32_t tbDataDoPut(SMemTable *pMemTable, STbData *pTbData, SMemSkipListNode **pos, TSDBROW *pRow, + int8_t forward) { + int32_t code = 0; + int8_t level; + SMemSkipListNode *pNode; + SVBufPool *pPool = pMemTable->pTsdb->pVnode->inUse; + + // node + level = tsdbMemSkipListRandLevel(&pTbData->sl); + pNode = (SMemSkipListNode *)vnodeBufPoolMalloc(pPool, SL_NODE_SIZE(level) + tPutTSDBRow(NULL, pRow)); + if (pNode == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + pNode->level = level; + for (int8_t iLevel = 0; iLevel < level; iLevel++) { + SL_NODE_FORWARD(pNode, iLevel) = NULL; + SL_NODE_BACKWARD(pNode, iLevel) = NULL; + } + + tPutTSDBRow((uint8_t *)SL_NODE_DATA(pNode), pRow); + + // put + for (int8_t iLevel = 0; iLevel < pNode->level; iLevel++) { + SMemSkipListNode *px = pos[iLevel]; + + if (forward) { + SMemSkipListNode *pNext = SL_NODE_FORWARD(px, iLevel); + + SL_NODE_FORWARD(pNode, iLevel) = pNext; + SL_NODE_BACKWARD(pNode, iLevel) = px; + + SL_NODE_BACKWARD(pNext, iLevel) = pNode; + SL_NODE_FORWARD(px, iLevel) = pNode; + } else { + SMemSkipListNode *pPrev = SL_NODE_BACKWARD(px, iLevel); + + SL_NODE_FORWARD(pNode, iLevel) = px; + SL_NODE_BACKWARD(pNode, iLevel) = pPrev; + + SL_NODE_FORWARD(pPrev, iLevel) = pNode; + SL_NODE_BACKWARD(px, iLevel) = pNode; + } + } + + pTbData->sl.size++; + if (pTbData->sl.level < pNode->level) { + pTbData->sl.level = pNode->level; + } + +_exit: + return code; +} + +static int32_t tsdbInsertTableDataImpl(SMemTable *pMemTable, STbData *pTbData, int64_t version, + SSubmitMsgIter *pMsgIter, SSubmitBlk *pBlock, SSubmitBlkRsp *pRsp) { + int32_t code = 0; + SSubmitBlkIter blkIter = {0}; + TSDBKEY key = {.version = version}; + SMemSkipListNode *pos[SL_MAX_LEVEL]; + TSDBROW row = {.version = version, .pTSRow = NULL}; + int32_t nRow = 0; + + tInitSubmitBlkIter(pMsgIter, pBlock, &blkIter); + + // backward put first data + row.pTSRow = tGetSubmitBlkNext(&blkIter); + key.ts = row.pTSRow->ts; + nRow++; + tbDataMovePosTo(pTbData, pos, &key, SL_MOVE_BACKWARD); + code = tbDataDoPut(pMemTable, pTbData, pos, &row, 0); + if (code) { + goto _err; + } + + if (tsdbKeyCmprFn(&key, &pTbData->minKey) < 0) { + pTbData->minKey = key; + } + + if (tsdbKeyCmprFn(&key, &pMemTable->minKey) < 0) { + pMemTable->minKey = key; + } + + // forward put rest data + row.pTSRow = tGetSubmitBlkNext(&blkIter); + if (row.pTSRow) { + key.ts = row.pTSRow->ts; + for (int8_t iLevel = 0; iLevel < pTbData->sl.maxLevel; iLevel++) { + pos[iLevel] = SL_NODE_BACKWARD(pos[iLevel], iLevel); + } + do { + nRow++; + tbDataMovePosTo(pTbData, pos, &key, SL_MOVE_FROM_POS); + code = tbDataDoPut(pMemTable, pTbData, pos, &row, 1); + if (code) { + goto _err; + } + + row.pTSRow = tGetSubmitBlkNext(&blkIter); + key.ts = row.pTSRow->ts; + } while (row.pTSRow); + } + + if (tsdbKeyCmprFn(&key, &pTbData->maxKey) > 0) { + pTbData->maxKey = key; + } + + if (tsdbKeyCmprFn(&key, &pMemTable->maxKey) > 0) { + pMemTable->maxKey = key; + } + pMemTable->nRef++; + pRsp->numOfRows = nRow; + pRsp->affectedRows = nRow; + + return code; + +_err: + return code; +} + +static int32_t tPutTSDBRow(uint8_t *p, TSDBROW *pRow) { + int32_t n = 0; + + n += tPutI64(p, pRow->version); + if (p) memcpy(p + n, pRow->pTSRow, pRow->pTSRow->len); + n += pRow->pTSRow->len; + + return n; +} + +static int32_t tGetTSDBRow(uint8_t *p, TSDBROW *pRow) { + int32_t n = 0; + + n += tGetI64(p, &pRow->version); + pRow->pTSRow = (STSRow *)(p + n); + n += pRow->pTSRow->len; + + return n; } \ No newline at end of file diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable2.c b/source/dnode/vnode/src/tsdb/tsdbMemTable2.c index 24c81b1782..aa58cc92a0 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable2.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable2.c @@ -15,11 +15,6 @@ #include "tsdb.h" -struct SMemSkipListNode { - int8_t level; - SMemSkipListNode *forwards[0]; -}; - typedef struct { tb_uid_t uid; STSchema *pTSchema; diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 3a22b57cc0..1d3f689e80 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -67,15 +67,16 @@ enum { }; typedef struct STableCheckInfo { - uint64_t tableId; - TSKEY lastKey; - SBlockInfo* pCompInfo; - int32_t compSize; - int32_t numOfBlocks : 29; // number of qualified data blocks not the original blocks - uint8_t chosen : 2; // indicate which iterator should move forward - bool initBuf : 1; // whether to initialize the in-memory skip list iterator or not - SSkipListIterator* iter; // mem buffer skip list iterator - SSkipListIterator* iiter; // imem buffer skip list iterator + uint64_t suid; + uint64_t tableId; + TSKEY lastKey; + SBlockInfo* pCompInfo; + int32_t compSize; + int32_t numOfBlocks : 29; // number of qualified data blocks not the original blocks + uint8_t chosen : 2; // indicate which iterator should move forward + bool initBuf : 1; // whether to initialize the in-memory skip list iterator or not + STbDataIter* iter; // mem buffer skip list iterator + STbDataIter* iiter; // imem buffer skip list iterator } STableCheckInfo; typedef struct STableBlockInfo { @@ -265,8 +266,8 @@ static void resetCheckInfo(STsdbReadHandle* pTsdbReadHandle) { for (int32_t i = 0; i < numOfTables; ++i) { STableCheckInfo* pCheckInfo = (STableCheckInfo*)taosArrayGet(pTsdbReadHandle->pTableCheckInfo, i); pCheckInfo->lastKey = pTsdbReadHandle->window.skey; - pCheckInfo->iter = tSkipListDestroyIter(pCheckInfo->iter); - pCheckInfo->iiter = tSkipListDestroyIter(pCheckInfo->iiter); + pCheckInfo->iter = tsdbTbDataIterDestroy(pCheckInfo->iter); + pCheckInfo->iiter = tsdbTbDataIterDestroy(pCheckInfo->iiter); pCheckInfo->initBuf = false; if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) { @@ -752,23 +753,21 @@ static bool initTableMemIterator(STsdbReadHandle* pHandle, STableCheckInfo* pChe pCheckInfo->initBuf = true; int32_t order = pHandle->order; - STbData** pMem = NULL; - STbData** pIMem = NULL; + STbData* pMem = NULL; + STbData* pIMem = NULL; TSKEY tLastKey = keyToTkey(pCheckInfo->lastKey); if (pHandle->pTsdb->mem != NULL) { - pMem = taosHashGet(pHandle->pTsdb->mem->pHashIdx, &pCheckInfo->tableId, sizeof(pCheckInfo->tableId)); + tsdbGetTbDataFromMemTable(pHandle->pTsdb->mem, pCheckInfo->suid, pCheckInfo->tableId, &pMem); if (pMem != NULL) { - pCheckInfo->iter = - tSkipListCreateIterFromVal((*pMem)->pData, (const char*)&tLastKey, TSDB_DATA_TYPE_TIMESTAMP, order); + tsdbTbDataIterCreate(pMem, &(TSDBKEY){.version = 0, .ts = tLastKey}, 0, &pCheckInfo->iter); } } if (pHandle->pTsdb->imem != NULL) { - pIMem = taosHashGet(pHandle->pTsdb->imem->pHashIdx, &pCheckInfo->tableId, sizeof(pCheckInfo->tableId)); + tsdbGetTbDataFromMemTable(pHandle->pTsdb->mem, pCheckInfo->suid, pCheckInfo->tableId, &pIMem); if (pIMem != NULL) { - pCheckInfo->iiter = - tSkipListCreateIterFromVal((*pIMem)->pData, (const char*)&tLastKey, TSDB_DATA_TYPE_TIMESTAMP, order); + tsdbTbDataIterCreate(pIMem, &(TSDBKEY){.version = 0, .ts = tLastKey}, 0, &pCheckInfo->iiter); } } @@ -777,22 +776,21 @@ static bool initTableMemIterator(STsdbReadHandle* pHandle, STableCheckInfo* pChe return false; } - bool memEmpty = (pCheckInfo->iter == NULL) || (pCheckInfo->iter != NULL && !tSkipListIterNext(pCheckInfo->iter)); - bool imemEmpty = (pCheckInfo->iiter == NULL) || (pCheckInfo->iiter != NULL && !tSkipListIterNext(pCheckInfo->iiter)); + bool memEmpty = (pCheckInfo->iter == NULL) || (pCheckInfo->iter != NULL && !tsdbTbDataIterNext(pCheckInfo->iter)); + bool imemEmpty = (pCheckInfo->iiter == NULL) || (pCheckInfo->iiter != NULL && !tsdbTbDataIterNext(pCheckInfo->iiter)); if (memEmpty && imemEmpty) { // buffer is empty return false; } if (!memEmpty) { - SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter); - assert(node != NULL); + TSDBROW row; - STSRow* row = (STSRow*)SL_GET_NODE_DATA(node); - TSKEY key = TD_ROW_KEY(row); // first timestamp in buffer + tsdbTbDataIterGet(pCheckInfo->iter, &row); + TSKEY key = row.pTSRow->ts; // first timestamp in buffer tsdbDebug("%p uid:%" PRId64 ", check data in mem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64 "-%" PRId64 ", lastKey:%" PRId64 ", numOfRows:%" PRId64 ", %s", - pHandle, pCheckInfo->tableId, key, order, (*pMem)->minKey.ts, (*pMem)->maxKey.ts, pCheckInfo->lastKey, - (*pMem)->nrows, pHandle->idStr); + pHandle, pCheckInfo->tableId, key, order, pMem->minKey.ts, pMem->maxKey.ts, pCheckInfo->lastKey, + pMem->sl.size, pHandle->idStr); if (ASCENDING_TRAVERSE(order)) { assert(pCheckInfo->lastKey <= key); @@ -805,15 +803,14 @@ static bool initTableMemIterator(STsdbReadHandle* pHandle, STableCheckInfo* pChe } if (!imemEmpty) { - SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter); - assert(node != NULL); + TSDBROW row; - STSRow* row = (STSRow*)SL_GET_NODE_DATA(node); - TSKEY key = TD_ROW_KEY(row); // first timestamp in buffer + tsdbTbDataIterGet(pCheckInfo->iter, &row); + TSKEY key = row.pTSRow->ts; // first timestamp in buffer tsdbDebug("%p uid:%" PRId64 ", check data in imem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64 "-%" PRId64 ", lastKey:%" PRId64 ", numOfRows:%" PRId64 ", %s", - pHandle, pCheckInfo->tableId, key, order, (*pIMem)->minKey.ts, (*pIMem)->maxKey.ts, pCheckInfo->lastKey, - (*pIMem)->nrows, pHandle->idStr); + pHandle, pCheckInfo->tableId, key, order, pIMem->minKey.ts, pIMem->maxKey.ts, pCheckInfo->lastKey, + pIMem->sl.size, pHandle->idStr); if (ASCENDING_TRAVERSE(order)) { assert(pCheckInfo->lastKey <= key); @@ -828,31 +825,23 @@ static bool initTableMemIterator(STsdbReadHandle* pHandle, STableCheckInfo* pChe } static void destroyTableMemIterator(STableCheckInfo* pCheckInfo) { - tSkipListDestroyIter(pCheckInfo->iter); - tSkipListDestroyIter(pCheckInfo->iiter); + tsdbTbDataIterDestroy(pCheckInfo->iter); + tsdbTbDataIterDestroy(pCheckInfo->iiter); } static TSKEY extractFirstTraverseKey(STableCheckInfo* pCheckInfo, int32_t order, int32_t update, TDRowVerT maxVer) { + TSDBROW row = {0}; STSRow *rmem = NULL, *rimem = NULL; + if (pCheckInfo->iter) { - SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter); - if (node != NULL) { - rmem = (STSRow*)SL_GET_NODE_DATA(node); - // TODO: filter max version - // if (TD_ROW_VER(rmem) > maxVer) { - // rmem = NULL; - // } + if (tsdbTbDataIterGet(pCheckInfo->iter, &row)) { + rmem = row.pTSRow; } } if (pCheckInfo->iiter) { - SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter); - if (node != NULL) { - rimem = (STSRow*)SL_GET_NODE_DATA(node); - // TODO: filter max version - // if (TD_ROW_VER(rimem) > maxVer) { - // rimem = NULL; - // } + if (tsdbTbDataIterGet(pCheckInfo->iiter, &row)) { + rimem = row.pTSRow; } } @@ -889,7 +878,7 @@ static TSKEY extractFirstTraverseKey(STableCheckInfo* pCheckInfo, int32_t order, pCheckInfo->chosen = CHECKINFO_CHOSEN_BOTH; } else { pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM; - tSkipListIterNext(pCheckInfo->iter); + tsdbTbDataIterNext(pCheckInfo->iter); } return r1; } else if (r1 < r2 && ASCENDING_TRAVERSE(order)) { @@ -903,28 +892,17 @@ static TSKEY extractFirstTraverseKey(STableCheckInfo* pCheckInfo, int32_t order, static STSRow* getSRowInTableMem(STableCheckInfo* pCheckInfo, int32_t order, int32_t update, STSRow** extraRow, TDRowVerT maxVer) { + TSDBROW row; STSRow *rmem = NULL, *rimem = NULL; if (pCheckInfo->iter) { - SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter); - if (node != NULL) { - rmem = (STSRow*)SL_GET_NODE_DATA(node); -#if 0 // TODO: skiplist refactor - if (TD_ROW_VER(rmem) > maxVer) { - rmem = NULL; - } -#endif + if (tsdbTbDataIterGet(pCheckInfo->iter, &row)) { + rmem = row.pTSRow; } } if (pCheckInfo->iiter) { - SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter); - if (node != NULL) { - rimem = (STSRow*)SL_GET_NODE_DATA(node); -#if 0 // TODO: skiplist refactor - if (TD_ROW_VER(rimem) > maxVer) { - rimem = NULL; - } -#endif + if (tsdbTbDataIterGet(pCheckInfo->iiter, &row)) { + rimem = row.pTSRow; } } @@ -966,7 +944,7 @@ static STSRow* getSRowInTableMem(STableCheckInfo* pCheckInfo, int32_t order, int *extraRow = rimem; return rmem; } else { - tSkipListIterNext(pCheckInfo->iter); + tsdbTbDataIterNext(pCheckInfo->iter); pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM; return rimem; } @@ -995,7 +973,7 @@ static bool moveToNextRowInMem(STableCheckInfo* pCheckInfo) { bool hasNext = false; if (pCheckInfo->chosen == CHECKINFO_CHOSEN_MEM) { if (pCheckInfo->iter != NULL) { - hasNext = tSkipListIterNext(pCheckInfo->iter); + hasNext = tsdbTbDataIterNext(pCheckInfo->iter); } if (hasNext) { @@ -1003,11 +981,11 @@ static bool moveToNextRowInMem(STableCheckInfo* pCheckInfo) { } if (pCheckInfo->iiter != NULL) { - return tSkipListIterGet(pCheckInfo->iiter) != NULL; + return tsdbTbDataIterGet(pCheckInfo->iiter, NULL); } } else if (pCheckInfo->chosen == CHECKINFO_CHOSEN_IMEM) { if (pCheckInfo->iiter != NULL) { - hasNext = tSkipListIterNext(pCheckInfo->iiter); + hasNext = tsdbTbDataIterNext(pCheckInfo->iiter); } if (hasNext) { @@ -1015,14 +993,14 @@ static bool moveToNextRowInMem(STableCheckInfo* pCheckInfo) { } if (pCheckInfo->iter != NULL) { - return tSkipListIterGet(pCheckInfo->iter) != NULL; + return tsdbTbDataIterGet(pCheckInfo->iter, NULL); } } else { if (pCheckInfo->iter != NULL) { - hasNext = tSkipListIterNext(pCheckInfo->iter); + hasNext = tsdbTbDataIterNext(pCheckInfo->iter); } if (pCheckInfo->iiter != NULL) { - hasNext = tSkipListIterNext(pCheckInfo->iiter) || hasNext; + hasNext = tsdbTbDataIterNext(pCheckInfo->iiter) || hasNext; } } diff --git a/source/dnode/vnode/src/tsdb/tsdbWrite.c b/source/dnode/vnode/src/tsdb/tsdbWrite.c index 6faf6bd167..e184763bc8 100644 --- a/source/dnode/vnode/src/tsdb/tsdbWrite.c +++ b/source/dnode/vnode/src/tsdb/tsdbWrite.c @@ -39,7 +39,7 @@ int tsdbInsertData(STsdb *pTsdb, int64_t version, SSubmitReq *pMsg, SSubmitRsp * SSubmitBlkRsp r = {0}; tGetSubmitMsgNext(&msgIter, &pBlock); if (pBlock == NULL) break; - if (tsdbInsertTableData(pTsdb, &msgIter, pBlock, &r) < 0) { + if (tsdbInsertTableData(pTsdb, version, &msgIter, pBlock, &r) < 0) { return -1; } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 4bcd5fc095..9ab941335e 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -779,7 +779,7 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq sprintf(submitBlkRsp.tblFName, "%s.", pVnode->config.dbname); } - if (tsdbInsertTableData(pVnode->pTsdb, &msgIter, pBlock, &submitBlkRsp) < 0) { + if (tsdbInsertTableData(pVnode->pTsdb, version, &msgIter, pBlock, &submitBlkRsp) < 0) { submitBlkRsp.code = terrno; } From 3283fe93d882e21ad6184b4a0f6eeaed321f4433 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Mon, 6 Jun 2022 11:59:05 +0000 Subject: [PATCH 09/17] more progress --- source/dnode/vnode/src/inc/tsdb.h | 25 +++++---- source/dnode/vnode/src/tsdb/tsdbCommit.c | 61 ++++++---------------- source/dnode/vnode/src/tsdb/tsdbMemTable.c | 6 +-- 3 files changed, 33 insertions(+), 59 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index b828b1ca78..b3b8637f00 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -58,7 +58,7 @@ void tsdbTbDataIterOpen(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, ST bool tsdbTbDataIterNext(STbDataIter *pIter); bool tsdbTbDataIterGet(STbDataIter *pIter, TSDBROW *pRow); -int tsdbLoadDataFromCache(STsdb *pTsdb, STable *pTable, SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, +int tsdbLoadDataFromCache(STsdb *pTsdb, STable *pTable, STbDataIter *pIter, TSKEY maxKey, int maxRowsToRead, SDataCols *pCols, TKEY *filterKeys, int nFilterKeys, bool keepDup, SMergeInfo *pMergeInfo); // tsdbMemTable2.c ============================================================================================== @@ -322,16 +322,24 @@ static void *taosTZfree(void *ptr); static size_t taosTSizeof(void *ptr); static void taosTMemset(void *ptr, int c); -static FORCE_INLINE STSRow *tsdbNextIterRow(SSkipListIterator *pIter) { +struct TSDBROW { + int64_t version; + STSRow *pTSRow; +}; + +static FORCE_INLINE STSRow *tsdbNextIterRow(STbDataIter *pIter) { + TSDBROW row; + if (pIter == NULL) return NULL; - SSkipListNode *node = tSkipListIterGet(pIter); - if (node == NULL) return NULL; + if (tsdbTbDataIterGet(pIter, &row)) { + return row.pTSRow; + } - return (STSRow *)SL_GET_NODE_DATA(node); + return NULL; } -static FORCE_INLINE TSKEY tsdbNextIterKey(SSkipListIterator *pIter) { +static FORCE_INLINE TSKEY tsdbNextIterKey(STbDataIter *pIter) { STSRow *row = tsdbNextIterRow(pIter); if (row == NULL) return TSDB_DATA_TIMESTAMP_NULL; @@ -675,11 +683,6 @@ struct SFSIter { #define TSDB_FS_ITER_FORWARD TSDB_ORDER_ASC #define TSDB_FS_ITER_BACKWARD TSDB_ORDER_DESC -struct TSDBROW { - int64_t version; - STSRow *pTSRow; -}; - struct TABLEID { tb_uid_t suid; tb_uid_t uid; diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index b1f6b6143e..2d298d2006 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -18,8 +18,8 @@ #define TSDB_MAX_SUBBLOCKS 8 typedef struct { - STable *pTable; - SSkipListIterator *pIter; + STable *pTable; + STbDataIter *pIter; } SCommitIter; typedef struct { @@ -457,7 +457,9 @@ static int32_t tsdbCreateCommitIters(SCommitH *pCommith) { int32_t code = 0; STsdb *pRepo = TSDB_COMMIT_REPO(pCommith); SMemTable *pMem = pRepo->imem; + STbData *pTbData; SCommitIter *pCommitIter; + STSchema *pTSchema = NULL; pCommith->niters = taosArrayGetSize(pMem->aTbData); pCommith->iters = (SCommitIter *)taosMemoryCalloc(pCommith->niters, sizeof(SCommitIter)); @@ -467,46 +469,12 @@ static int32_t tsdbCreateCommitIters(SCommitH *pCommith) { } for (int32_t iIter = 0; iIter < pCommith->niters; iIter++) { - pCommitIter = (SCommitIter *)taosArrayGetP(pMem->aTbData, iIter); - // TODO + pTbData = (STbData *)taosArrayGetP(pMem->aTbData, iIter); + pCommitIter = &pCommith->iters[iIter]; - // pCommitIter->pIter = - } - - return code; - -_err: - return code; -#if 0 - SSkipListIterator *pSlIter; - SSkipListNode *pNode; - STbData *pTbData; - STSchema *pTSchema = NULL; - - pCommith->niters = SL_SIZE(pMem->pSlIdx); - pCommith->iters = (SCommitIter *)taosMemoryCalloc(pCommith->niters, sizeof(SCommitIter)); - if (pCommith->iters == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - return -1; - } - - // Loop to create iters for each skiplist - pSlIter = tSkipListCreateIter(pMem->pSlIdx); - if (pSlIter == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - return -1; - } - for (int i = 0; i < pCommith->niters; i++) { - tSkipListIterNext(pSlIter); - pNode = tSkipListIterGet(pSlIter); - pTbData = (STbData *)pNode->pData; - - pCommitIter = pCommith->iters + i; pTSchema = metaGetTbTSchema(REPO_META(pRepo), pTbData->uid, -1); - if (pTSchema) { - pCommitIter->pIter = tSkipListCreateIter(pTbData->pData); - tSkipListIterNext(pCommitIter->pIter); + tsdbTbDataIterCreate(pTbData, NULL, 0, &pCommitIter->pIter); pCommitIter->pTable = (STable *)taosMemoryMalloc(sizeof(STable)); pCommitIter->pTable->uid = pTbData->uid; @@ -515,15 +483,18 @@ _err: pCommitIter->pTable->pCacheSchema = NULL; } } - tSkipListDestroyIter(pSlIter); -#endif + + return code; + +_err: + return code; } static void tsdbDestroyCommitIters(SCommitH *pCommith) { if (pCommith->iters == NULL) return; for (int i = 1; i < pCommith->niters; i++) { - tSkipListDestroyIter(pCommith->iters[i].pIter); + tsdbTbDataIterDestroy(pCommith->iters[i].pIter); if (pCommith->iters[i].pTable) { tdFreeSchema(pCommith->iters[i].pTable->pSchema); tdFreeSchema(pCommith->iters[i].pTable->pCacheSchema); @@ -1333,7 +1304,7 @@ static int tsdbMergeMemData(SCommitH *pCommith, SCommitIter *pIter, int bidx) { keyLimit = pBlock[1].minKey.ts - 1; } - SSkipListIterator titer = *(pIter->pIter); + STbDataIter titer = *(pIter->pIter); if (tsdbLoadBlockDataCols(&(pCommith->readh), pBlock, NULL, &colId, 1, false) < 0) return -1; tsdbLoadDataFromCache(TSDB_COMMIT_REPO(pCommith), pIter->pTable, &titer, keyLimit, INT32_MAX, NULL, @@ -1542,7 +1513,7 @@ static void tsdbLoadAndMergeFromCache(STsdb *pTsdb, SDataCols *pDataCols, int *i lastKey = key2; } - tSkipListIterNext(pCommitIter->pIter); + tsdbTbDataIterNext(pCommitIter->pIter); } else { if (lastKey != key1) { if (lastKey != TSKEY_INITIAL_VAL) { @@ -1574,7 +1545,7 @@ static void tsdbLoadAndMergeFromCache(STsdb *pTsdb, SDataCols *pDataCols, int *i tdAppendSTSRowToDataCol(row, pSchema, pTarget, true); } ++(*iter); - tSkipListIterNext(pCommitIter->pIter); + tsdbTbDataIterNext(pCommitIter->pIter); } if (pTarget->numOfRows >= (maxRows - 1)) break; diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index 46ad963c2c..41caf2598f 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -321,7 +321,7 @@ bool tsdbTbDataIterGet(STbDataIter *pIter, TSDBROW *pRow) { * * The function tries to procceed AS MUCH AS POSSIBLE. */ -int tsdbLoadDataFromCache(STsdb *pTsdb, STable *pTable, SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, +int tsdbLoadDataFromCache(STsdb *pTsdb, STable *pTable, STbDataIter *pIter, TSKEY maxKey, int maxRowsToRead, SDataCols *pCols, TKEY *filterKeys, int nFilterKeys, bool keepDup, SMergeInfo *pMergeInfo) { ASSERT(maxRowsToRead > 0 && nFilterKeys >= 0); if (pIter == NULL) return 0; @@ -404,7 +404,7 @@ int tsdbLoadDataFromCache(STsdb *pTsdb, STable *pTable, SSkipListIterator *pIter } } - tSkipListIterNext(pIter); + tsdbTbDataIterNext(pIter); row = tsdbNextIterRow(pIter); if (row == NULL || TD_ROW_KEY(row) > maxKey) { rowKey = INT64_MAX; @@ -444,7 +444,7 @@ int tsdbLoadDataFromCache(STsdb *pTsdb, STable *pTable, SSkipListIterator *pIter } } - tSkipListIterNext(pIter); + tsdbTbDataIterNext(pIter); row = tsdbNextIterRow(pIter); if (row == NULL || TD_ROW_KEY(row) > maxKey) { rowKey = INT64_MAX; From c28aac66dc7b7bfdc1fb18d377f1f66c82d9fd65 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Mon, 6 Jun 2022 12:22:21 +0000 Subject: [PATCH 10/17] more --- include/common/tcommon.h | 19 ++++++++++--------- source/dnode/vnode/src/tsdb/tsdbMemTable.c | 3 +-- source/dnode/vnode/src/tsdb/tsdbRead.c | 2 ++ source/libs/executor/src/executorimpl.c | 1 + 4 files changed, 14 insertions(+), 11 deletions(-) diff --git a/include/common/tcommon.h b/include/common/tcommon.h index 88fa0e728f..5c55df9fba 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -105,14 +105,15 @@ typedef struct SColumnInfoData { } SColumnInfoData; typedef struct SQueryTableDataCond { - //STimeWindow twindow; + // STimeWindow twindow; + uint64_t suid; int32_t order; // desc|asc order to iterate the data block int32_t numOfCols; - SColumnInfo *colList; + SColumnInfo* colList; bool loadExternalRows; // load external rows or not int32_t type; // data block load type: int32_t numOfTWindows; - STimeWindow *twindows; + STimeWindow* twindows; } SQueryTableDataCond; void* blockDataDestroy(SSDataBlock* pBlock); @@ -202,17 +203,17 @@ typedef struct SExprInfo { } SExprInfo; typedef struct { - const char* key; - int32_t keyLen; - uint8_t type; - union{ + const char* key; + int32_t keyLen; + uint8_t type; + union { const char* value; int64_t i; uint64_t u; double d; float f; }; - int32_t length; + int32_t length; } SSmlKv; #define QUERY_ASC_FORWARD_STEP 1 @@ -220,7 +221,7 @@ typedef struct { #define GET_FORWARD_DIRECTION_FACTOR(ord) (((ord) == TSDB_ORDER_ASC) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP) -#define SORT_QSORT_T 0x1 +#define SORT_QSORT_T 0x1 #define SORT_SPILLED_MERGE_SORT_T 0x2 typedef struct SSortExecInfo { int32_t sortMethod; diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index 41caf2598f..9ebbafe418 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -690,11 +690,11 @@ static int32_t tsdbInsertTableDataImpl(SMemTable *pMemTable, STbData *pTbData, i // forward put rest data row.pTSRow = tGetSubmitBlkNext(&blkIter); if (row.pTSRow) { - key.ts = row.pTSRow->ts; for (int8_t iLevel = 0; iLevel < pTbData->sl.maxLevel; iLevel++) { pos[iLevel] = SL_NODE_BACKWARD(pos[iLevel], iLevel); } do { + key.ts = row.pTSRow->ts; nRow++; tbDataMovePosTo(pTbData, pos, &key, SL_MOVE_FROM_POS); code = tbDataDoPut(pMemTable, pTbData, pos, &row, 1); @@ -703,7 +703,6 @@ static int32_t tsdbInsertTableDataImpl(SMemTable *pMemTable, STbData *pTbData, i } row.pTSRow = tGetSubmitBlkNext(&blkIter); - key.ts = row.pTSRow->ts; } while (row.pTSRow); } diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 1d3f689e80..0ba0e08556 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -108,6 +108,7 @@ typedef struct SBlockLoadSuppInfo { typedef struct STsdbReadHandle { STsdb* pTsdb; + uint64_t suid; SQueryFilePos cur; // current position int16_t order; STimeWindow window; // the primary query time window that applies to all queries @@ -238,6 +239,7 @@ static SArray* createCheckInfoFromTableGroup(STsdbReadHandle* pTsdbReadHandle, S STableKeyInfo* pKeyInfo = (STableKeyInfo*)taosArrayGet(pTableList->pTableList, j); STableCheckInfo info = {.lastKey = pKeyInfo->lastKey, .tableId = pKeyInfo->uid}; + info.suid = pTsdbReadHandle->suid; if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) { if (info.lastKey == INT64_MIN || info.lastKey < pTsdbReadHandle->window.skey) { info.lastKey = pTsdbReadHandle->window.skey; diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 10381f72fd..ac06e51514 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4625,6 +4625,7 @@ int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysi pCond->numOfTWindows = 1; pCond->twindows = taosMemoryCalloc(pCond->numOfTWindows, sizeof(STimeWindow)); pCond->twindows[0] = pTableScanNode->scanRange; + pCond->suid = pTableScanNode->scan.suid; #if 1 // todo work around a problem, remove it later From 5852936a8d9f21e0895e4de6a17cf479ee94ec76 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Mon, 6 Jun 2022 12:46:21 +0000 Subject: [PATCH 11/17] fix query problem --- source/dnode/vnode/src/tsdb/tsdbRead.c | 6 ++++-- source/libs/parser/inc/parInsertData.h | 3 +-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 0ba0e08556..10b58546dc 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -778,8 +778,10 @@ static bool initTableMemIterator(STsdbReadHandle* pHandle, STableCheckInfo* pChe return false; } - bool memEmpty = (pCheckInfo->iter == NULL) || (pCheckInfo->iter != NULL && !tsdbTbDataIterNext(pCheckInfo->iter)); - bool imemEmpty = (pCheckInfo->iiter == NULL) || (pCheckInfo->iiter != NULL && !tsdbTbDataIterNext(pCheckInfo->iiter)); + bool memEmpty = + (pCheckInfo->iter == NULL) || (pCheckInfo->iter != NULL && !tsdbTbDataIterGet(pCheckInfo->iter, NULL)); + bool imemEmpty = + (pCheckInfo->iiter == NULL) || (pCheckInfo->iiter != NULL && !tsdbTbDataIterGet(pCheckInfo->iiter, NULL)); if (memEmpty && imemEmpty) { // buffer is empty return false; } diff --git a/source/libs/parser/inc/parInsertData.h b/source/libs/parser/inc/parInsertData.h index aeebf51c96..167970838b 100644 --- a/source/libs/parser/inc/parInsertData.h +++ b/source/libs/parser/inc/parInsertData.h @@ -116,8 +116,7 @@ static FORCE_INLINE void getSTSRowAppendInfo(uint8_t rowType, SParsedDataColInfo } static FORCE_INLINE int32_t setBlockInfo(SSubmitBlk *pBlocks, STableDataBlocks *dataBuf, int32_t numOfRows) { - pBlocks->suid = - (TSDB_NORMAL_TABLE == dataBuf->pTableMeta->tableType ? dataBuf->pTableMeta->uid : dataBuf->pTableMeta->suid); + pBlocks->suid = (TSDB_NORMAL_TABLE == dataBuf->pTableMeta->tableType ? 0 : dataBuf->pTableMeta->suid); pBlocks->uid = dataBuf->pTableMeta->uid; pBlocks->sversion = dataBuf->pTableMeta->sversion; pBlocks->schemaLen = dataBuf->createTbReqLen; From 92f68d83f14f66e98e3492264802ab834323a23e Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Mon, 6 Jun 2022 12:57:35 +0000 Subject: [PATCH 12/17] fix query problem --- source/dnode/vnode/src/tsdb/tsdbRead.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 10b58546dc..459762cd05 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -390,6 +390,7 @@ static STsdbReadHandle* tsdbQueryTablesImpl(SVnode* pVnode, SQueryTableDataCond* pReadHandle->locateStart = false; pReadHandle->loadType = pCond->type; + pReadHandle->suid = pCond->suid; pReadHandle->outputCapacity = 4096; //((STsdb*)tsdb)->config.maxRowsPerFileBlock; pReadHandle->loadExternalRow = pCond->loadExternalRows; pReadHandle->currentLoadExternalRows = pCond->loadExternalRows; From ae3434222d515fef93c5e69a52fdec2a3f5aeef5 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Tue, 7 Jun 2022 02:19:05 +0000 Subject: [PATCH 13/17] fix query problem --- source/dnode/vnode/src/tsdb/tsdbMemTable.c | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index 9ebbafe418..b52cb17025 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -514,7 +514,13 @@ static int32_t tsdbGetOrCreateTbData(SMemTable *pMemTable, tb_uid_t suid, tb_uid SL_NODE_FORWARD(pTbData->sl.pTail, iLevel) = NULL; } - if (taosArrayInsert(pMemTable->aTbData, idx < 0 ? 0 : idx, &pTbData) == NULL) { + void *p; + if (idx < 0) { + p = taosArrayPush(pMemTable->aTbData, &pTbData); + } else { + p = taosArrayInsert(pMemTable->aTbData, idx, &pTbData); + } + if (p == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _err; } From 933de38cd0a0b0af31d4dd243de5c2078ef34fab Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Tue, 7 Jun 2022 02:48:54 +0000 Subject: [PATCH 14/17] fix insert bug --- source/dnode/vnode/src/tsdb/tsdbMemTable.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index b52cb17025..3a4ada04a8 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -581,7 +581,7 @@ static void tbDataMovePosTo(STbData *pTbData, SMemSkipListNode **pos, TSDBKEY *p for (int8_t iLevel = pTbData->sl.level - 1; iLevel >= 0; iLevel--) { pn = SL_NODE_FORWARD(px, iLevel); - while (pn != pTbData->sl.pHead) { + while (pn != pTbData->sl.pTail) { pTKey = (TSDBKEY *)SL_NODE_DATA(pn); c = tsdbKeyCmprFn(pTKey, pKey); From 892021b089e977557c978ac788488f42816e9f44 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Tue, 7 Jun 2022 05:24:25 +0000 Subject: [PATCH 15/17] fix tsdb save --- source/dnode/vnode/src/inc/tsdb.h | 7 +- source/dnode/vnode/src/tsdb/tsdbCommit.c | 17 +++-- source/dnode/vnode/src/tsdb/tsdbRead.c | 2 +- source/dnode/vnode/src/tsdb/tsdbReadImpl.c | 85 ++++++++++++++-------- 4 files changed, 68 insertions(+), 43 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index b3b8637f00..ea1f38f37e 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -171,15 +171,12 @@ struct STsdb { #if 1 // ====================================== struct STable { - uint64_t tid; + uint64_t suid; uint64_t uid; STSchema *pSchema; // latest schema STSchema *pCacheSchema; // cached cache }; -#define TABLE_TID(t) (t)->tid -#define TABLE_UID(t) (t)->uid - // int tsdbPrepareCommit(STsdb *pTsdb); typedef enum { TSDB_FILE_HEAD = 0, // .head @@ -387,7 +384,7 @@ typedef struct { typedef struct { int32_t delimiter; // For recovery usage - int32_t tid; + uint64_t suid; uint64_t uid; SBlock blocks[]; } SBlockInfo; diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 2d298d2006..865023e628 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -400,7 +400,7 @@ static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid) { break; } - if (pIter && pIter->pTable && (!pIdx || (pIter->pTable->uid <= pIdx->uid))) { + if (pIter && pIter->pTable && (!pIdx || (pIter->pTable->suid <= pIdx->suid || pIter->pTable->uid <= pIdx->uid))) { if (tsdbCommitToTable(pCommith, mIter) < 0) { tsdbCloseCommitFile(pCommith, true); // revert the file change @@ -478,7 +478,7 @@ static int32_t tsdbCreateCommitIters(SCommitH *pCommith) { pCommitIter->pTable = (STable *)taosMemoryMalloc(sizeof(STable)); pCommitIter->pTable->uid = pTbData->uid; - pCommitIter->pTable->tid = pTbData->uid; + pCommitIter->pTable->suid = pTbData->suid; pCommitIter->pTable->pSchema = pTSchema; pCommitIter->pTable->pCacheSchema = NULL; } @@ -734,8 +734,8 @@ static int tsdbWriteBlockInfoImpl(SDFile *pHeadf, STable *pTable, SArray *pSupA, pBlkInfo = *ppBuf; pBlkInfo->delimiter = TSDB_FILE_DELIMITER; - pBlkInfo->tid = TABLE_TID(pTable); - pBlkInfo->uid = TABLE_UID(pTable); + pBlkInfo->suid = pTable->suid; + pBlkInfo->uid = pTable->uid; memcpy((void *)(pBlkInfo->blocks), taosArrayGet(pSupA, 0), nSupBlocks * sizeof(SBlock)); if (nSubBlocks > 0) { @@ -761,7 +761,8 @@ static int tsdbWriteBlockInfoImpl(SDFile *pHeadf, STable *pTable, SArray *pSupA, // Set pIdx pBlock = taosArrayGetLast(pSupA); - pIdx->uid = TABLE_UID(pTable); + pIdx->suid = pTable->suid; + pIdx->uid = pTable->uid; pIdx->hasLast = pBlock->last ? 1 : 0; pIdx->maxKey = pBlock->maxKey; pIdx->numOfBlocks = (uint32_t)nSupBlocks; @@ -916,7 +917,7 @@ static int tsdbMoveBlkIdx(SCommitH *pCommith, SBlockIdx *pIdx) { return -1; } - STable table = {.tid = pIdx->uid, .uid = pIdx->uid, .pSchema = NULL}; + STable table = {.suid = pIdx->suid, .uid = pIdx->uid, .pSchema = NULL}; pCommith->pTable = &table; while (bidx < nBlocks) { @@ -1177,7 +1178,7 @@ static int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDFi } pBlockData->delimiter = TSDB_FILE_DELIMITER; - pBlockData->uid = TABLE_UID(pTable); + pBlockData->uid = pTable->uid; pBlockData->numOfCols = nColsNotAllNull; taosCalcChecksumAppend(0, (uint8_t *)pBlockData, tsize); @@ -1217,7 +1218,7 @@ static int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDFi tsdbDebug("vgId:%d, uid:%" PRId64 " a block of data is written to file %s, offset %" PRId64 " numOfRows %d len %d numOfCols %" PRId16 " keyFirst %" PRId64 " keyLast %" PRId64, - REPO_ID(pRepo), TABLE_UID(pTable), TSDB_FILE_FULL_NAME(pDFile), offset, rowsToWrite, pBlock->len, + REPO_ID(pRepo), pTable->uid, TSDB_FILE_FULL_NAME(pDFile), offset, rowsToWrite, pBlock->len, pBlock->numOfCols, pBlock->minKey.ts, pBlock->maxKey.ts); return 0; diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 459762cd05..d5705d3a0a 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -1109,7 +1109,7 @@ static int32_t loadBlockInfo(STsdbReadHandle* pTsdbReadHandle, int32_t index, in STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, index); pCheckInfo->numOfBlocks = 0; - STable table = {.uid = pCheckInfo->tableId, .tid = pCheckInfo->tableId}; + STable table = {.uid = pCheckInfo->tableId, .suid = pCheckInfo->suid}; table.pSchema = pTsdbReadHandle->pSchema; if (tsdbSetReadTable(&pTsdbReadHandle->rhelper, &table) != TSDB_CODE_SUCCESS) { diff --git a/source/dnode/vnode/src/tsdb/tsdbReadImpl.c b/source/dnode/vnode/src/tsdb/tsdbReadImpl.c index a26436156d..747f1f3a71 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadImpl.c +++ b/source/dnode/vnode/src/tsdb/tsdbReadImpl.c @@ -156,6 +156,24 @@ int tsdbLoadBlockIdx(SReadH *pReadh) { return 0; } +static int32_t tsdbBlockIdxCmprFn(const void *p1, const void *p2) { + SBlockIdx *pBlockIdx1 = (SBlockIdx *)p1; + SBlockIdx *pBlockIdx2 = (SBlockIdx *)p2; + + if (pBlockIdx1->suid < pBlockIdx2->suid) { + return -1; + } else if (pBlockIdx1->suid > pBlockIdx2->suid) { + return 1; + } + + if (pBlockIdx1->uid < pBlockIdx2->uid) { + return -1; + } else if (pBlockIdx1->uid > pBlockIdx2->uid) { + return 1; + } + + return 0; +} int tsdbSetReadTable(SReadH *pReadh, STable *pTable) { STSchema *pSchema = tsdbGetTableSchemaImpl(TSDB_READ_REPO(pReadh), pTable, false, false, -1); @@ -171,33 +189,40 @@ int tsdbSetReadTable(SReadH *pReadh, STable *pTable) { return -1; } - size_t size = taosArrayGetSize(pReadh->aBlkIdx); - if (size > 0) { - while (true) { - if (pReadh->cidx >= size) { - pReadh->pBlkIdx = NULL; - break; - } - - SBlockIdx *pBlkIdx = taosArrayGet(pReadh->aBlkIdx, pReadh->cidx); - if (pBlkIdx->uid == TABLE_TID(pTable)) { - if (pBlkIdx->uid == TABLE_UID(pTable)) { - pReadh->pBlkIdx = pBlkIdx; - } else { - pReadh->pBlkIdx = NULL; - } - pReadh->cidx++; - break; - } else if (pBlkIdx->uid > TABLE_TID(pTable)) { - pReadh->pBlkIdx = NULL; - break; - } else { - pReadh->cidx++; - } - } - } else { + uint8_t *p = taosArraySearch(pReadh->aBlkIdx, &(SBlockIdx){.suid = pTable->suid, .uid = pTable->uid}, + tsdbBlockIdxCmprFn, TD_EQ); + if (p == NULL) { pReadh->pBlkIdx = NULL; + } else { + pReadh->pBlkIdx = (SBlockIdx *)p; } + // size_t size = taosArrayGetSize(pReadh->aBlkIdx); + // if (size > 0) { + // while (true) { + // if (pReadh->cidx >= size) { + // pReadh->pBlkIdx = NULL; + // break; + // } + + // SBlockIdx *pBlkIdx = taosArrayGet(pReadh->aBlkIdx, pReadh->cidx); + // if (pBlkIdx->uid == TABLE_TID(pTable)) { + // if (pBlkIdx->uid == TABLE_UID(pTable)) { + // pReadh->pBlkIdx = pBlkIdx; + // } else { + // pReadh->pBlkIdx = NULL; + // } + // pReadh->cidx++; + // break; + // } else if (pBlkIdx->uid > TABLE_TID(pTable)) { + // pReadh->pBlkIdx = NULL; + // break; + // } else { + // pReadh->cidx++; + // } + // } + // } else { + // pReadh->pBlkIdx = NULL; + // } return 0; } @@ -553,12 +578,12 @@ static int tsdbLoadBlockOffset(SReadH *pReadh, SBlock *pBlock) { int tsdbEncodeSBlockIdx(void **buf, SBlockIdx *pIdx) { int tlen = 0; - // tlen += taosEncodeVariantI32(buf, pIdx->tid); + tlen += taosEncodeFixedU64(buf, pIdx->suid); + tlen += taosEncodeFixedU64(buf, pIdx->uid); tlen += taosEncodeVariantU32(buf, pIdx->len); tlen += taosEncodeVariantU32(buf, pIdx->offset); tlen += taosEncodeFixedU8(buf, pIdx->hasLast); tlen += taosEncodeVariantU32(buf, pIdx->numOfBlocks); - tlen += taosEncodeFixedU64(buf, pIdx->uid); tlen += taosEncodeFixedU64(buf, pIdx->maxKey.ts); return tlen; @@ -570,6 +595,10 @@ void *tsdbDecodeSBlockIdx(void *buf, SBlockIdx *pIdx) { uint64_t value = 0; // if ((buf = taosDecodeVariantI32(buf, &(pIdx->tid))) == NULL) return NULL; + if ((buf = taosDecodeFixedU64(buf, &value)) == NULL) return NULL; + pIdx->suid = (int64_t)value; + if ((buf = taosDecodeFixedU64(buf, &value)) == NULL) return NULL; + pIdx->uid = (int64_t)value; if ((buf = taosDecodeVariantU32(buf, &(pIdx->len))) == NULL) return NULL; if ((buf = taosDecodeVariantU32(buf, &(pIdx->offset))) == NULL) return NULL; if ((buf = taosDecodeFixedU8(buf, &(hasLast))) == NULL) return NULL; @@ -577,8 +606,6 @@ void *tsdbDecodeSBlockIdx(void *buf, SBlockIdx *pIdx) { if ((buf = taosDecodeVariantU32(buf, &(numOfBlocks))) == NULL) return NULL; pIdx->numOfBlocks = numOfBlocks; if ((buf = taosDecodeFixedU64(buf, &value)) == NULL) return NULL; - pIdx->uid = (int64_t)value; - if ((buf = taosDecodeFixedU64(buf, &value)) == NULL) return NULL; pIdx->maxKey.ts = (TSKEY)value; return buf; From e1967b5708ced50d7f3f1810c74b7e3b50189f30 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Tue, 7 Jun 2022 07:56:33 +0000 Subject: [PATCH 16/17] fix ci problem --- source/dnode/vnode/src/meta/metaQuery.c | 138 ++++++++++++------------ source/dnode/vnode/src/tq/tqRead.c | 4 +- 2 files changed, 70 insertions(+), 72 deletions(-) diff --git a/source/dnode/vnode/src/meta/metaQuery.c b/source/dnode/vnode/src/meta/metaQuery.c index 21a55d6463..063a78b0c6 100644 --- a/source/dnode/vnode/src/meta/metaQuery.c +++ b/source/dnode/vnode/src/meta/metaQuery.c @@ -31,7 +31,7 @@ void metaReaderClear(SMetaReader *pReader) { } int metaGetTableEntryByVersion(SMetaReader *pReader, int64_t version, tb_uid_t uid) { - SMeta * pMeta = pReader->pMeta; + SMeta *pMeta = pReader->pMeta; STbDbKey tbDbKey = {.version = version, .uid = uid}; // query table.db @@ -54,7 +54,7 @@ _err: } int metaGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid) { - SMeta * pMeta = pReader->pMeta; + SMeta *pMeta = pReader->pMeta; int64_t version; // query uid.idx @@ -68,7 +68,7 @@ int metaGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid) { } int metaGetTableEntryByName(SMetaReader *pReader, const char *name) { - SMeta * pMeta = pReader->pMeta; + SMeta *pMeta = pReader->pMeta; tb_uid_t uid; // query name.idx @@ -82,7 +82,7 @@ int metaGetTableEntryByName(SMetaReader *pReader, const char *name) { } tb_uid_t metaGetTableEntryUidByName(SMeta *pMeta, const char *name) { - void * pData = NULL; + void *pData = NULL; int nData = 0; tb_uid_t uid = 0; @@ -134,7 +134,7 @@ void metaCloseTbCursor(SMTbCursor *pTbCur) { int metaTbCursorNext(SMTbCursor *pTbCur) { int ret; - void * pBuf; + void *pBuf; STbCfg tbCfg; for (;;) { @@ -155,7 +155,7 @@ int metaTbCursorNext(SMTbCursor *pTbCur) { } SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, bool isinline) { - void * pData = NULL; + void *pData = NULL; int nData = 0; int64_t version; SSchemaWrapper schema = {0}; @@ -163,37 +163,47 @@ SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, boo SDecoder dc = {0}; metaRLock(pMeta); - if (sver < 0) { - if (tdbTbGet(pMeta->pUidIdx, &uid, sizeof(uid), &pData, &nData) < 0) { - goto _err; - } - - version = *(int64_t *)pData; - - tdbTbGet(pMeta->pTbDb, &(STbDbKey){.uid = uid, .version = version}, sizeof(STbDbKey), &pData, &nData); - - SMetaEntry me = {0}; - tDecoderInit(&dc, pData, nData); - metaDecodeEntry(&dc, &me); - if (me.type == TSDB_SUPER_TABLE) { - pSchema = tCloneSSchemaWrapper(&me.stbEntry.schemaRow); - } else if (me.type == TSDB_NORMAL_TABLE) { - pSchema = tCloneSSchemaWrapper(&me.ntbEntry.schemaRow); - } else { - ASSERT(0); - } - tDecoderClear(&dc); - } else { - if (tdbTbGet(pMeta->pSkmDb, &(SSkmDbKey){.uid = uid, .sver = sver}, sizeof(SSkmDbKey), &pData, &nData) < 0) { - goto _err; - } - - tDecoderInit(&dc, pData, nData); - tDecodeSSchemaWrapper(&dc, &schema); - pSchema = tCloneSSchemaWrapper(&schema); - tDecoderClear(&dc); +_query: + if (tdbTbGet(pMeta->pUidIdx, &uid, sizeof(uid), &pData, &nData) < 0) { + goto _err; } + version = *(int64_t *)pData; + + tdbTbGet(pMeta->pTbDb, &(STbDbKey){.uid = uid, .version = version}, sizeof(STbDbKey), &pData, &nData); + SMetaEntry me = {0}; + tDecoderInit(&dc, pData, nData); + metaDecodeEntry(&dc, &me); + if (me.type == TSDB_SUPER_TABLE) { + if (sver == -1 || sver == me.stbEntry.schemaRow.version) { + pSchema = tCloneSSchemaWrapper(&me.stbEntry.schemaRow); + tDecoderClear(&dc); + goto _exit; + } + } else if (me.type == TSDB_CHILD_TABLE) { + uid = me.ctbEntry.suid; + tDecoderClear(&dc); + goto _query; + } else { + if (sver == -1 || sver == me.ntbEntry.schemaRow.version) { + pSchema = tCloneSSchemaWrapper(&me.ntbEntry.schemaRow); + tDecoderClear(&dc); + goto _exit; + } + } + tDecoderClear(&dc); + + // query from skm db + if (tdbTbGet(pMeta->pSkmDb, &(SSkmDbKey){.uid = uid, .sver = sver}, sizeof(SSkmDbKey), &pData, &nData) < 0) { + goto _err; + } + + tDecoderInit(&dc, pData, nData); + tDecodeSSchemaWrapper(&dc, &schema); + pSchema = tCloneSSchemaWrapper(&schema); + tDecoderClear(&dc); + +_exit: metaULock(pMeta); tdbFree(pData); return pSchema; @@ -205,11 +215,11 @@ _err: } struct SMCtbCursor { - SMeta * pMeta; - TBC * pCur; + SMeta *pMeta; + TBC *pCur; tb_uid_t suid; - void * pKey; - void * pVal; + void *pKey; + void *pVal; int kLen; int vLen; }; @@ -279,25 +289,13 @@ tb_uid_t metaCtbCursorNext(SMCtbCursor *pCtbCur) { } STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver) { - tb_uid_t quid; - SMetaReader mr = {0}; - STSchema * pTSchema = NULL; + // SMetaReader mr = {0}; + STSchema *pTSchema = NULL; SSchemaWrapper *pSW = NULL; STSchemaBuilder sb = {0}; - SSchema * pSchema; + SSchema *pSchema; - metaReaderInit(&mr, pMeta, 0); - metaGetTableEntryByUid(&mr, uid); - - if (mr.me.type == TSDB_CHILD_TABLE) { - quid = mr.me.ctbEntry.suid; - } else { - quid = uid; - } - - metaReaderClear(&mr); - - pSW = metaGetTableSchema(pMeta, quid, sver, 0); + pSW = metaGetTableSchema(pMeta, uid, sver, 0); if (!pSW) return NULL; tdInitTSchemaBuilder(&sb, pSW->version); @@ -321,11 +319,11 @@ int metaGetTbNum(SMeta *pMeta) { } typedef struct { - SMeta * pMeta; - TBC * pCur; + SMeta *pMeta; + TBC *pCur; tb_uid_t uid; - void * pKey; - void * pVal; + void *pKey; + void *pVal; int kLen; int vLen; } SMSmaCursor; @@ -397,7 +395,7 @@ tb_uid_t metaSmaCursorNext(SMSmaCursor *pSmaCur) { STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid, bool deepCopy) { STSmaWrapper *pSW = NULL; - SArray * pSmaIds = NULL; + SArray *pSmaIds = NULL; if (!(pSmaIds = metaGetSmaIdsByTable(pMeta, uid))) { return NULL; @@ -421,7 +419,7 @@ STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid, bool deepCopy) { metaReaderInit(&mr, pMeta, 0); int64_t smaId; int smaIdx = 0; - STSma * pTSma = NULL; + STSma *pTSma = NULL; for (int i = 0; i < pSW->number; ++i) { smaId = *(tb_uid_t *)taosArrayGet(pSmaIds, i); if (metaGetTableEntryByUid(&mr, smaId) < 0) { @@ -469,7 +467,7 @@ _err: } STSma *metaGetSmaInfoByIndex(SMeta *pMeta, int64_t indexUid) { - STSma * pTSma = NULL; + STSma *pTSma = NULL; SMetaReader mr = {0}; metaReaderInit(&mr, pMeta, 0); if (metaGetTableEntryByUid(&mr, indexUid) < 0) { @@ -491,7 +489,7 @@ STSma *metaGetSmaInfoByIndex(SMeta *pMeta, int64_t indexUid) { } SArray *metaGetSmaIdsByTable(SMeta *pMeta, tb_uid_t uid) { - SArray * pUids = NULL; + SArray *pUids = NULL; SSmaIdxKey *pSmaIdxKey = NULL; SMSmaCursor *pCur = metaOpenSmaCursor(pMeta, uid); @@ -529,7 +527,7 @@ SArray *metaGetSmaIdsByTable(SMeta *pMeta, tb_uid_t uid) { } SArray *metaGetSmaTbUids(SMeta *pMeta) { - SArray * pUids = NULL; + SArray *pUids = NULL; SSmaIdxKey *pSmaIdxKey = NULL; tb_uid_t lastUid = 0; @@ -591,13 +589,13 @@ const void *metaGetTableTagVal(SMetaEntry *pEntry, int16_t type, STagVal *val) { } typedef struct { - SMeta * pMeta; - TBC * pCur; + SMeta *pMeta; + TBC *pCur; tb_uid_t suid; int16_t cid; int16_t type; - void * pKey; - void * pVal; + void *pKey; + void *pVal; int32_t kLen; int32_t vLen; } SIdxCursor; @@ -621,7 +619,7 @@ int32_t metaFilteTableIds(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) { int32_t nKey = 0; int32_t nTagData = 0; - void * tagData = NULL; + void *tagData = NULL; if (IS_VAR_DATA_TYPE(param->type)) { tagData = varDataVal(param->val); @@ -640,7 +638,7 @@ int32_t metaFilteTableIds(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) { goto END; } - void * entryKey = NULL, *entryVal = NULL; + void *entryKey = NULL, *entryVal = NULL; int32_t nEntryKey, nEntryVal; while (1) { valid = tdbTbcGet(pCursor->pCur, (const void **)&entryKey, &nEntryKey, (const void **)&entryVal, &nEntryVal); diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 43f49b4cc2..2ecaeff747 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -141,10 +141,10 @@ int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, uint64_t* p } // this interface use suid instead of uid - pHandle->pSchemaWrapper = metaGetTableSchema(pHandle->pVnodeMeta, pHandle->msgIter.suid, sversion, true); + pHandle->pSchemaWrapper = metaGetTableSchema(pHandle->pVnodeMeta, pHandle->msgIter.uid, sversion, true); if (pHandle->pSchemaWrapper == NULL) { tqWarn("cannot found schema wrapper for table: suid: %ld, version %d, possibly dropped table", - pHandle->msgIter.suid, pHandle->cachedSchemaVer); + pHandle->msgIter.uid, pHandle->cachedSchemaVer); /*ASSERT(0);*/ terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND; return -1; From 53d66c41ac71bf1b5281d5e795c9414f100fdbc5 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Tue, 7 Jun 2022 08:10:21 +0000 Subject: [PATCH 17/17] fix ci test --- source/dnode/vnode/src/tsdb/tsdbRead.c | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index d5705d3a0a..146757096f 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -758,19 +758,20 @@ static bool initTableMemIterator(STsdbReadHandle* pHandle, STableCheckInfo* pChe STbData* pMem = NULL; STbData* pIMem = NULL; + int8_t backward = (pHandle->order == TSDB_ORDER_DESC) ? 1 : 0; TSKEY tLastKey = keyToTkey(pCheckInfo->lastKey); if (pHandle->pTsdb->mem != NULL) { tsdbGetTbDataFromMemTable(pHandle->pTsdb->mem, pCheckInfo->suid, pCheckInfo->tableId, &pMem); if (pMem != NULL) { - tsdbTbDataIterCreate(pMem, &(TSDBKEY){.version = 0, .ts = tLastKey}, 0, &pCheckInfo->iter); + tsdbTbDataIterCreate(pMem, &(TSDBKEY){.version = 0, .ts = tLastKey}, backward, &pCheckInfo->iter); } } if (pHandle->pTsdb->imem != NULL) { tsdbGetTbDataFromMemTable(pHandle->pTsdb->mem, pCheckInfo->suid, pCheckInfo->tableId, &pIMem); if (pIMem != NULL) { - tsdbTbDataIterCreate(pIMem, &(TSDBKEY){.version = 0, .ts = tLastKey}, 0, &pCheckInfo->iiter); + tsdbTbDataIterCreate(pIMem, &(TSDBKEY){.version = 0, .ts = tLastKey}, backward, &pCheckInfo->iiter); } }