diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 6ce1e1e6e0..228d9ffa90 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -39,13 +39,24 @@ typedef struct SDelOp SDelOp; static int tsdbKeyCmprFn(const void *p1, const void *p2); +// tsdbMemTable ================ +typedef struct STbData STbData; +typedef struct SMemTable SMemTable; +typedef struct SMergeInfo SMergeInfo; +typedef struct STable STable; + +int tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable); +void tsdbMemTableDestroy(SMemTable *pMemTable); +int tsdbLoadDataFromCache(STsdb *pTsdb, STable *pTable, SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, + SDataCols *pCols, TKEY *filterKeys, int nFilterKeys, bool keepDup, SMergeInfo *pMergeInfo); + // tsdbMemTable2.c ============================================================================================== -typedef struct SMemTable SMemTable; +typedef struct SMemTable2 SMemTable2; typedef struct SMemData SMemData; typedef struct SMemDataIter SMemDataIter; -int32_t tsdbMemTableCreate2(STsdb *pTsdb, SMemTable **ppMemTable); -void tsdbMemTableDestroy2(SMemTable *pMemTable); +int32_t tsdbMemTableCreate2(STsdb *pTsdb, SMemTable2 **ppMemTable); +void tsdbMemTableDestroy2(SMemTable2 *pMemTable); int32_t tsdbInsertTableData2(STsdb *pTsdb, int64_t version, SVSubmitBlk *pSubmitBlk); int32_t tsdbDeleteTableData2(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey); @@ -124,17 +135,6 @@ int tsdbRLockFS(STsdbFS *pFs); int tsdbWLockFS(STsdbFS *pFs); int tsdbUnLockFS(STsdbFS *pFs); -// tsdbMemTable ================ -typedef struct STbData STbData; -typedef struct STsdbMemTable STsdbMemTable; -typedef struct SMergeInfo SMergeInfo; -typedef struct STable STable; - -int tsdbMemTableCreate(STsdb *pTsdb, STsdbMemTable **ppMemTable); -void tsdbMemTableDestroy(STsdbMemTable *pMemTable); -int tsdbLoadDataFromCache(STsdb *pTsdb, STable *pTable, SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, - SDataCols *pCols, TKEY *filterKeys, int nFilterKeys, bool keepDup, SMergeInfo *pMergeInfo); - // structs typedef struct { int minFid; @@ -145,16 +145,16 @@ typedef struct { #define TSDB_DATA_DIR_LEN 6 // adapt accordingly struct STsdb { - char *path; - SVnode *pVnode; - TdThreadMutex mutex; - char dir[TSDB_DATA_DIR_LEN]; - bool repoLocked; - STsdbKeepCfg keepCfg; - STsdbMemTable *mem; - STsdbMemTable *imem; - SRtn rtn; - STsdbFS *fs; + char *path; + SVnode *pVnode; + TdThreadMutex mutex; + char dir[TSDB_DATA_DIR_LEN]; + bool repoLocked; + STsdbKeepCfg keepCfg; + SMemTable *mem; + SMemTable *imem; + SRtn rtn; + STsdbFS *fs; }; #if 1 // ====================================== @@ -216,7 +216,7 @@ struct STbData { SSkipList *pData; }; -struct STsdbMemTable { +struct SMemTable { SVBufPool *pPool; T_REF_DECLARE() SRWLatch latch; @@ -677,7 +677,7 @@ typedef struct { TSKEY eKey; } SDelInfo; -struct SMemTable { +struct SMemTable2 { STsdb *pTsdb; int32_t nRef; TSDBKEY minKey; diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 59a795b77a..06c75d029d 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -92,7 +92,7 @@ static int tsdbApplyRtnOnFSet(STsdb *pRepo, SDFileSet *pSet, SRtn *pRtn); int tsdbBegin(STsdb *pTsdb) { if (!pTsdb) return 0; - STsdbMemTable *pMem; + SMemTable *pMem; if (tsdbMemTableCreate(pTsdb, &pTsdb->mem) < 0) { return -1; @@ -244,7 +244,7 @@ void tsdbGetRtnSnap(STsdb *pRepo, SRtn *pRtn) { } static void tsdbStartCommit(STsdb *pRepo) { - STsdbMemTable *pMem = pRepo->imem; + SMemTable *pMem = pRepo->imem; tsdbInfo("vgId:%d, start to commit", REPO_ID(pRepo)); @@ -455,7 +455,7 @@ static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid) { static int tsdbCreateCommitIters(SCommitH *pCommith) { STsdb *pRepo = TSDB_COMMIT_REPO(pCommith); - STsdbMemTable *pMem = pRepo->imem; + SMemTable *pMem = pRepo->imem; SSkipListIterator *pSlIter; SCommitIter *pCommitIter; SSkipListNode *pNode; diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit2.c b/source/dnode/vnode/src/tsdb/tsdbCommit2.c index 321667c55b..07d4ef8656 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit2.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit2.c @@ -16,19 +16,19 @@ #include "tsdb.h" typedef struct { - SMemTable *pMemTable; - int32_t minutes; - int8_t precision; - TSKEY nCommitKey; - int32_t fid; - TSKEY minKey; - TSKEY maxKey; - SReadH readh; - SDFileSet wSet; - SArray *aBlkIdx; - SArray *aSupBlk; - SArray *aSubBlk; - SArray *aDelInfo; + SMemTable2 *pMemTable; + int32_t minutes; + int8_t precision; + TSKEY nCommitKey; + int32_t fid; + TSKEY minKey; + TSKEY maxKey; + SReadH readh; + SDFileSet wSet; + SArray *aBlkIdx; + SArray *aSupBlk; + SArray *aSubBlk; + SArray *aDelInfo; } SCommitH; static int32_t tsdbCommitStart(SCommitH *pCHandle, STsdb *pTsdb); @@ -39,7 +39,7 @@ int32_t tsdbBegin2(STsdb *pTsdb) { int32_t code = 0; ASSERT(pTsdb->mem == NULL); - code = tsdbMemTableCreate2(pTsdb, (SMemTable **)&pTsdb->mem); + code = tsdbMemTableCreate2(pTsdb, (SMemTable2 **)&pTsdb->mem); if (code) { tsdbError("vgId:%d failed to begin TSDB since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); goto _exit; @@ -80,8 +80,8 @@ _err: } static int32_t tsdbCommitStart(SCommitH *pCHandle, STsdb *pTsdb) { - int32_t code = 0; - SMemTable *pMemTable = (SMemTable *)pTsdb->mem; + int32_t code = 0; + SMemTable2 *pMemTable = (SMemTable2 *)pTsdb->mem; tsdbInfo("vgId:%d start to commit", TD_VID(pTsdb->pVnode)); @@ -131,9 +131,9 @@ _err: } static int32_t tsdbCommitEnd(SCommitH *pCHandle) { - int32_t code = 0; - STsdb *pTsdb = pCHandle->pMemTable->pTsdb; - SMemTable *pMemTable = (SMemTable *)pTsdb->imem; + int32_t code = 0; + STsdb *pTsdb = pCHandle->pMemTable->pTsdb; + SMemTable2 *pMemTable = (SMemTable2 *)pTsdb->imem; // end transaction code = tsdbEndFSTxn(pTsdb); diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index fa392baa16..d683c7df27 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -23,15 +23,15 @@ static char *tsdbTbDataGetUid(const void *arg); static int tsdbAppendTableRowToCols(STsdb *pTsdb, STable *pTable, SDataCols *pCols, STSchema **ppSchema, STSRow *row, bool merge); -int tsdbMemTableCreate(STsdb *pTsdb, STsdbMemTable **ppMemTable) { - STsdbMemTable *pMemTable; - SVnode *pVnode; +int tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable) { + SMemTable *pMemTable; + SVnode *pVnode; *ppMemTable = NULL; pVnode = pTsdb->pVnode; // alloc handle - pMemTable = (STsdbMemTable *)taosMemoryCalloc(1, sizeof(*pMemTable)); + pMemTable = (SMemTable *)taosMemoryCalloc(1, sizeof(*pMemTable)); if (pMemTable == NULL) { return -1; } @@ -60,7 +60,7 @@ int tsdbMemTableCreate(STsdb *pTsdb, STsdbMemTable **ppMemTable) { return 0; } -void tsdbMemTableDestroy(STsdbMemTable *pMemTable) { +void tsdbMemTableDestroy(SMemTable *pMemTable) { if (pMemTable) { taosHashCleanup(pMemTable->pHashIdx); SSkipListIterator *pIter = tSkipListCreateIter(pMemTable->pSlIdx); @@ -240,7 +240,7 @@ int tsdbLoadDataFromCache(STsdb *pTsdb, STable *pTable, SSkipListIterator *pIter int tsdbInsertTableData(STsdb *pTsdb, SSubmitMsgIter *pMsgIter, SSubmitBlk *pBlock, SSubmitBlkRsp *pRsp) { SSubmitBlkIter blkIter = {0}; - STsdbMemTable *pMemTable = pTsdb->mem; + SMemTable *pMemTable = pTsdb->mem; void *tptr; STbData *pTbData; STSRow *row; diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable2.c b/source/dnode/vnode/src/tsdb/tsdbMemTable2.c index cf8557bba3..24c81b1782 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable2.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable2.c @@ -35,21 +35,21 @@ typedef struct { #define SL_MOVE_BACKWARD 0x1 #define SL_MOVE_FROM_POS 0x2 -static int32_t tsdbGetOrCreateMemData(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid, SMemData **ppMemData); +static int32_t tsdbGetOrCreateMemData(SMemTable2 *pMemTable, tb_uid_t suid, tb_uid_t uid, SMemData **ppMemData); static int memDataPCmprFn(const void *p1, const void *p2); static int32_t tPutTSDBRow(uint8_t *p, TSDBROW *pRow); static int32_t tGetTSDBRow(uint8_t *p, TSDBROW *pRow); static int8_t tsdbMemSkipListRandLevel(SMemSkipList *pSl); -static int32_t tsdbInsertTableDataImpl(SMemTable *pMemTable, SMemData *pMemData, int64_t version, +static int32_t tsdbInsertTableDataImpl(SMemTable2 *pMemTable, SMemData *pMemData, int64_t version, SVSubmitBlk *pSubmitBlk); static void memDataMovePosTo(SMemData *pMemData, SMemSkipListNode **pos, TSDBKEY *pKey, int32_t flags); // SMemTable ============================================== -int32_t tsdbMemTableCreate2(STsdb *pTsdb, SMemTable **ppMemTable) { - int32_t code = 0; - SMemTable *pMemTable = NULL; +int32_t tsdbMemTableCreate2(STsdb *pTsdb, SMemTable2 **ppMemTable) { + int32_t code = 0; + SMemTable2 *pMemTable = NULL; - pMemTable = (SMemTable *)taosMemoryCalloc(1, sizeof(*pMemTable)); + pMemTable = (SMemTable2 *)taosMemoryCalloc(1, sizeof(*pMemTable)); if (pMemTable == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _err; @@ -75,16 +75,16 @@ _err: return code; } -void tsdbMemTableDestroy2(SMemTable *pMemTable) { +void tsdbMemTableDestroy2(SMemTable2 *pMemTable) { taosArrayDestroyEx(pMemTable->aMemData, NULL /*TODO*/); taosMemoryFree(pMemTable); } int32_t tsdbInsertTableData2(STsdb *pTsdb, int64_t version, SVSubmitBlk *pSubmitBlk) { - int32_t code = 0; - SMemTable *pMemTable = (SMemTable *)pTsdb->mem; // TODO - SMemData *pMemData; - TSDBROW row = {.version = version}; + int32_t code = 0; + SMemTable2 *pMemTable = (SMemTable2 *)pTsdb->mem; // TODO + SMemData *pMemData; + TSDBROW row = {.version = version}; ASSERT(pMemTable); ASSERT(pSubmitBlk->nData > 0); @@ -112,10 +112,10 @@ _err: } int32_t tsdbDeleteTableData2(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey) { - int32_t code = 0; - SMemTable *pMemTable = (SMemTable *)pTsdb->mem; // TODO - SMemData *pMemData; - SVBufPool *pPool = pTsdb->pVnode->inUse; + int32_t code = 0; + SMemTable2 *pMemTable = (SMemTable2 *)pTsdb->mem; // TODO + SMemData *pMemData; + SVBufPool *pPool = pTsdb->pVnode->inUse; ASSERT(pMemTable); @@ -250,7 +250,7 @@ void tsdbMemDataIterGet(SMemDataIter *pIter, TSDBROW **ppRow) { } } -static int32_t tsdbGetOrCreateMemData(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid, SMemData **ppMemData) { +static int32_t tsdbGetOrCreateMemData(SMemTable2 *pMemTable, tb_uid_t suid, tb_uid_t uid, SMemData **ppMemData) { int32_t code = 0; int32_t idx = 0; SMemData *pMemDataT = &(SMemData){.suid = suid, .uid = uid}; @@ -421,7 +421,7 @@ static void memDataMovePosTo(SMemData *pMemData, SMemSkipListNode **pos, TSDBKEY } } -static int32_t memDataDoPut(SMemTable *pMemTable, SMemData *pMemData, SMemSkipListNode **pos, TSDBROW *pRow, +static int32_t memDataDoPut(SMemTable2 *pMemTable, SMemData *pMemData, SMemSkipListNode **pos, TSDBROW *pRow, int8_t forward) { int32_t code = 0; int8_t level; @@ -475,7 +475,7 @@ _exit: return code; } -static int32_t tsdbInsertTableDataImpl(SMemTable *pMemTable, SMemData *pMemData, int64_t version, +static int32_t tsdbInsertTableDataImpl(SMemTable2 *pMemTable, SMemData *pMemData, int64_t version, SVSubmitBlk *pSubmitBlk) { int32_t code = 0; int32_t n = 0; diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 56220beea4..7693e2bce1 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -200,8 +200,8 @@ static SArray* getDefaultLoadColumns(STsdbReadHandle* pTsdbReadHandle, bool load int64_t tsdbGetNumOfRowsInMemTable(tsdbReaderT* pHandle) { STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)pHandle; - int64_t rows = 0; - STsdbMemTable* pMemTable = NULL; // pTsdbReadHandle->pMemTable; + int64_t rows = 0; + SMemTable* pMemTable = NULL; // pTsdbReadHandle->pMemTable; if (pMemTable == NULL) { return rows; } @@ -658,7 +658,7 @@ tsdbReaderT tsdbQueryLastRow(SVnode* pVnode, SQueryTableDataCond* pCond, STableL } #if 0 -tsdbReaderT tsdbQueryCacheLastT(STsdb *tsdb, SQueryTableDataCond *pCond, STableGroupInfo *groupList, uint64_t qId, STsdbMemTable* pMemRef) { +tsdbReaderT tsdbQueryCacheLastT(STsdb *tsdb, SQueryTableDataCond *pCond, STableGroupInfo *groupList, uint64_t qId, SMemTable* pMemRef) { STsdbReadHandle *pTsdbReadHandle = (STsdbReadHandle*) tsdbQueryTablesT(tsdb, pCond, groupList, qId, pMemRef); if (pTsdbReadHandle == NULL) { return NULL; @@ -2918,7 +2918,7 @@ static bool loadBlockOfActiveTable(STsdbReadHandle* pTsdbReadHandle) { // current result is empty if (pTsdbReadHandle->currentLoadExternalRows && pTsdbReadHandle->window.skey == pTsdbReadHandle->window.ekey && pTsdbReadHandle->cur.rows == 0) { - // STsdbMemTable* pMemRef = pTsdbReadHandle->pMemTable; + // SMemTable* pMemRef = pTsdbReadHandle->pMemTable; // doGetExternalRow(pTsdbReadHandle, TSDB_PREV_ROW, pMemRef); // doGetExternalRow(pTsdbReadHandle, TSDB_NEXT_ROW, pMemRef); @@ -3216,7 +3216,7 @@ bool tsdbNextDataBlock(tsdbReaderT pHandle) { } } -// static int32_t doGetExternalRow(STsdbReadHandle* pTsdbReadHandle, int16_t type, STsdbMemTable* pMemRef) { +// static int32_t doGetExternalRow(STsdbReadHandle* pTsdbReadHandle, int16_t type, SMemTable* pMemRef) { // STsdbReadHandle* pSecQueryHandle = NULL; // // if (type == TSDB_PREV_ROW && pTsdbReadHandle->prev) {