From 3359b8e6204a02b5d3365c7a24ff44767f28df08 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Mon, 15 Aug 2022 10:16:07 +0000 Subject: [PATCH] more optimize --- source/dnode/vnode/src/inc/tsdb.h | 18 ++- source/dnode/vnode/src/tsdb/tsdbCache.c | 4 +- source/dnode/vnode/src/tsdb/tsdbCommit.c | 19 ++- source/dnode/vnode/src/tsdb/tsdbMemTable.c | 144 +++++++++++++++------ source/dnode/vnode/src/tsdb/tsdbRead.c | 8 +- 5 files changed, 134 insertions(+), 59 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index f46726e647..0a03334cfd 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -194,11 +194,12 @@ int32_t tsdbDecmprColData(uint8_t *pIn, SBlockCol *pBlockCol, int8_t cmprAlg, in int32_t tsdbReadAndCheck(TdFilePtr pFD, int64_t offset, uint8_t **ppOut, int32_t size, int8_t toCheck); // tsdbMemTable ============================================================================================== // SMemTable -int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable); -void tsdbMemTableDestroy(SMemTable *pMemTable); -void tsdbGetTbDataFromMemTable(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid, STbData **ppTbData); -void tsdbRefMemTable(SMemTable *pMemTable); -void tsdbUnrefMemTable(SMemTable *pMemTable); +int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable); +void tsdbMemTableDestroy(SMemTable *pMemTable); +STbData *tsdbGetTbDataFromMemTable(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid); +void tsdbRefMemTable(SMemTable *pMemTable); +void tsdbUnrefMemTable(SMemTable *pMemTable); +SArray *tsdbMemTableGetTbDataArray(SMemTable *pMemTable); // STbDataIter int32_t tsdbTbDataIterCreate(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, STbDataIter **ppIter); void *tsdbTbDataIterDestroy(STbDataIter *pIter); @@ -359,6 +360,7 @@ struct STbData { SDelData *pHead; SDelData *pTail; SMemSkipList sl; + STbData *next; }; struct SMemTable { @@ -372,7 +374,11 @@ struct SMemTable { int64_t maxVersion; int64_t nRow; int64_t nDel; - SArray *aTbData; // SArray + struct { + int32_t nTbData; + int32_t nBucket; + STbData **aBucket; + }; }; struct TSDBROW { diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 3cff6bef27..24d6b2f385 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -742,12 +742,12 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs STbData *pMem = NULL; if (pIter->pReadSnap->pMem) { - tsdbGetTbDataFromMemTable(pIter->pReadSnap->pMem, suid, uid, &pMem); + pMem = tsdbGetTbDataFromMemTable(pIter->pReadSnap->pMem, suid, uid); } STbData *pIMem = NULL; if (pIter->pReadSnap->pIMem) { - tsdbGetTbDataFromMemTable(pIter->pReadSnap->pIMem, suid, uid, &pIMem); + pIMem = tsdbGetTbDataFromMemTable(pIter->pReadSnap->pIMem, suid, uid); } pIter->pTsdb = pTsdb; diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index ae7c5f1a90..f59a8e53f1 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -35,6 +35,7 @@ typedef struct { int32_t minRow; int32_t maxRow; int8_t cmprAlg; + SArray *aTbDataP; STsdbFS fs; // -------------- TSKEY nextKey; // reset by each table commit @@ -1212,8 +1213,8 @@ static int32_t tsdbCommitFileData(SCommitter *pCommitter) { if (code) goto _err; // commit file data impl - for (int32_t iTbData = 0; iTbData < taosArrayGetSize(pMemTable->aTbData); iTbData++) { - STbData *pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData); + for (int32_t iTbData = 0; iTbData < taosArrayGetSize(pCommitter->aTbDataP); iTbData++) { + STbData *pTbData = (STbData *)taosArrayGetP(pCommitter->aTbDataP, iTbData); // move commit until current (suid, uid) code = tsdbMoveCommitData(pCommitter, *(TABLEID *)pTbData); @@ -1270,6 +1271,11 @@ static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter) { pCommitter->minRow = pTsdb->pVnode->config.tsdbCfg.minRows; pCommitter->maxRow = pTsdb->pVnode->config.tsdbCfg.maxRows; pCommitter->cmprAlg = pTsdb->pVnode->config.tsdbCfg.compression; + pCommitter->aTbDataP = tsdbMemTableGetTbDataArray(pTsdb->imem); + if (pCommitter->aTbDataP == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } code = tsdbFSCopy(pTsdb, &pCommitter->fs); if (code) goto _err; @@ -1395,13 +1401,13 @@ static int32_t tsdbCommitDel(SCommitter *pCommitter) { int32_t iDelIdx = 0; int32_t nDelIdx = taosArrayGetSize(pCommitter->aDelIdx); int32_t iTbData = 0; - int32_t nTbData = taosArrayGetSize(pMemTable->aTbData); + int32_t nTbData = taosArrayGetSize(pCommitter->aTbDataP); STbData *pTbData; SDelIdx *pDelIdx; ASSERT(nTbData > 0); - pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData); + pTbData = (STbData *)taosArrayGetP(pCommitter->aTbDataP, iTbData); pDelIdx = (iDelIdx < nDelIdx) ? (SDelIdx *)taosArrayGet(pCommitter->aDelIdx, iDelIdx) : NULL; while (true) { if (pTbData == NULL && pDelIdx == NULL) break; @@ -1427,7 +1433,7 @@ static int32_t tsdbCommitDel(SCommitter *pCommitter) { if (code) goto _err; iTbData++; - pTbData = (iTbData < nTbData) ? (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData) : NULL; + pTbData = (iTbData < nTbData) ? (STbData *)taosArrayGetP(pCommitter->aTbDataP, iTbData) : NULL; continue; _commit_disk_del: @@ -1443,7 +1449,7 @@ static int32_t tsdbCommitDel(SCommitter *pCommitter) { if (code) goto _err; iTbData++; - pTbData = (iTbData < nTbData) ? (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData) : NULL; + pTbData = (iTbData < nTbData) ? (STbData *)taosArrayGetP(pCommitter->aTbDataP, iTbData) : NULL; iDelIdx++; pDelIdx = (iDelIdx < nDelIdx) ? (SDelIdx *)taosArrayGet(pCommitter->aDelIdx, iDelIdx) : NULL; continue; @@ -1491,6 +1497,7 @@ static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno) { tsdbUnrefMemTable(pMemTable); tsdbFSDestroy(&pCommitter->fs); + taosArrayDestroy(pCommitter->aTbDataP); tsdbInfo("vgId:%d, tsdb end commit", TD_VID(pTsdb->pVnode)); return code; diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index 8ae0e824cf..278b657b9e 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -15,6 +15,7 @@ #include "tsdb.h" +#define MEM_MIN_HASH 1024 #define SL_MAX_LEVEL 5 #define SL_NODE_SIZE(l) (sizeof(SMemSkipListNode) + sizeof(SMemSkipListNode *) * (l)*2) @@ -49,8 +50,10 @@ int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable) { pMemTable->maxVersion = VERSION_MIN; pMemTable->nRow = 0; pMemTable->nDel = 0; - pMemTable->aTbData = taosArrayInit(128, sizeof(STbData *)); - if (pMemTable->aTbData == NULL) { + pMemTable->nTbData = 0; + pMemTable->nBucket = MEM_MIN_HASH; + pMemTable->aBucket = (STbData **)taosMemoryCalloc(pMemTable->nBucket, sizeof(STbData *)); + if (pMemTable->aBucket == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; taosMemoryFree(pMemTable); goto _err; @@ -68,37 +71,30 @@ _err: void tsdbMemTableDestroy(SMemTable *pMemTable) { if (pMemTable) { vnodeBufPoolUnRef(pMemTable->pPool); - taosArrayDestroy(pMemTable->aTbData); + taosMemoryFree(pMemTable->aBucket); taosMemoryFree(pMemTable); } } -static int32_t tbDataPCmprFn(const void *p1, const void *p2) { - STbData *pTbData1 = *(STbData **)p1; - STbData *pTbData2 = *(STbData **)p2; +static FORCE_INLINE STbData *tsdbGetTbDataFromMemTableImpl(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid) { + STbData *pTbData = pMemTable->aBucket[TABS(uid) % pMemTable->nBucket]; - if (pTbData1->suid < pTbData2->suid) { - return -1; - } else if (pTbData1->suid > pTbData2->suid) { - return 1; + while (pTbData) { + if (pTbData->uid == uid) break; + pTbData = pTbData->next; } - if (pTbData1->uid < pTbData2->uid) { - return -1; - } else if (pTbData1->uid > pTbData2->uid) { - return 1; - } - - return 0; + return pTbData; } -void tsdbGetTbDataFromMemTable(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid, STbData **ppTbData) { - STbData *pTbData = &(STbData){.suid = suid, .uid = uid}; + +STbData *tsdbGetTbDataFromMemTable(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid) { + STbData *pTbData; taosRLockLatch(&pMemTable->latch); - void *p = taosArraySearch(pMemTable->aTbData, &pTbData, tbDataPCmprFn, TD_EQ); + pTbData = tsdbGetTbDataFromMemTableImpl(pMemTable, suid, uid); taosRUnLockLatch(&pMemTable->latch); - *ppTbData = p ? *(STbData **)p : NULL; + return pTbData; } int32_t tsdbInsertTableData(STsdb *pTsdb, int64_t version, SSubmitMsgIter *pMsgIter, SSubmitBlk *pBlock, @@ -318,18 +314,44 @@ _exit: return pIter->pRow; } +static int32_t tsdbMemTableRehash(SMemTable *pMemTable) { + int32_t code = 0; + + int32_t nBucket = pMemTable->nBucket * 2; + STbData **aBucket = (STbData **)taosMemoryCalloc(nBucket, sizeof(STbData *)); + if (aBucket == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + + for (int32_t iBucket = 0; iBucket < pMemTable->nBucket; iBucket++) { + STbData *pTbData = pMemTable->aBucket[iBucket]; + + while (pTbData) { + STbData *pNext = pTbData->next; + + int32_t idx = TABS(pTbData->uid) % nBucket; + pTbData->next = aBucket[idx]; + aBucket[idx] = pTbData; + + pTbData = pNext; + } + } + + taosMemoryFree(pMemTable->aBucket); + pMemTable->nBucket = nBucket; + pMemTable->aBucket = aBucket; + +_exit: + return code; +} + static int32_t tsdbGetOrCreateTbData(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid, STbData **ppTbData) { - int32_t code = 0; - int32_t idx = 0; - STbData *pTbData = NULL; - STbData *pTbDataT = &(STbData){.suid = suid, .uid = uid}; + int32_t code = 0; // get - idx = taosArraySearchIdx(pMemTable->aTbData, &pTbDataT, tbDataPCmprFn, TD_GE); - if (idx >= 0) { - pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, idx); - if (tbDataPCmprFn(&pTbDataT, &pTbData) == 0) goto _exit; - } + STbData *pTbData = tsdbGetTbDataFromMemTableImpl(pMemTable, suid, uid); + if (pTbData) goto _exit; // create SVBufPool *pPool = pMemTable->pTsdb->pVnode->inUse; @@ -365,21 +387,23 @@ static int32_t tsdbGetOrCreateTbData(SMemTable *pMemTable, tb_uid_t suid, tb_uid SL_NODE_FORWARD(pTbData->sl.pTail, iLevel) = NULL; } - void *p; - if (idx < 0) { - idx = taosArrayGetSize(pMemTable->aTbData); + taosWLockLatch(&pMemTable->latch); + + if (pMemTable->nTbData >= pMemTable->nBucket) { + code = tsdbMemTableRehash(pMemTable); + if (code) { + taosWUnLockLatch(&pMemTable->latch); + goto _err; + } } - taosWLockLatch(&pMemTable->latch); - p = taosArrayInsert(pMemTable->aTbData, idx, &pTbData); + int32_t idx = TABS(uid) % pMemTable->nBucket; + pTbData->next = pMemTable->aBucket[idx]; + pMemTable->aBucket[idx] = pTbData; + pMemTable->nTbData++; + taosWUnLockLatch(&pMemTable->latch); - tsdbDebug("vgId:%d, add table data %p at idx:%d", TD_VID(pMemTable->pTsdb->pVnode), pTbData, idx); - - if (p == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } _exit: *ppTbData = pTbData; return code; @@ -622,3 +646,41 @@ void tsdbUnrefMemTable(SMemTable *pMemTable) { tsdbMemTableDestroy(pMemTable); } } + +static FORCE_INLINE int32_t tbDataPCmprFn(const void *p1, const void *p2) { + STbData *pTbData1 = *(STbData **)p1; + STbData *pTbData2 = *(STbData **)p2; + + if (pTbData1->suid < pTbData2->suid) { + return -1; + } else if (pTbData1->suid > pTbData2->suid) { + return 1; + } + + if (pTbData1->uid < pTbData2->uid) { + return -1; + } else if (pTbData1->uid > pTbData2->uid) { + return 1; + } + + return 0; +} + +SArray *tsdbMemTableGetTbDataArray(SMemTable *pMemTable) { + SArray *aTbDataP = taosArrayInit(pMemTable->nTbData, sizeof(STbData *)); + if (aTbDataP == NULL) goto _exit; + + for (int32_t iBucket = 0; iBucket < pMemTable->nBucket; iBucket++) { + STbData *pTbData = pMemTable->aBucket[iBucket]; + + while (pTbData) { + taosArrayPush(aTbDataP, &pTbData); + pTbData = pTbData->next; + } + } + + taosArraySort(aTbDataP, tbDataPCmprFn); + +_exit: + return aTbDataP; +} diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index c6ae1c529c..3e2da64701 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -1616,7 +1616,7 @@ static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbRea STbData* d = NULL; if (pReader->pReadSnap->pMem != NULL) { - tsdbGetTbDataFromMemTable(pReader->pReadSnap->pMem, pReader->suid, pBlockScanInfo->uid, &d); + d = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pMem, pReader->suid, pBlockScanInfo->uid); if (d != NULL) { code = tsdbTbDataIterCreate(d, &startKey, backward, &pBlockScanInfo->iter.iter); if (code == TSDB_CODE_SUCCESS) { @@ -1637,7 +1637,7 @@ static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbRea STbData* di = NULL; if (pReader->pReadSnap->pIMem != NULL) { - tsdbGetTbDataFromMemTable(pReader->pReadSnap->pIMem, pReader->suid, pBlockScanInfo->uid, &di); + di = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pIMem, pReader->suid, pBlockScanInfo->uid); if (di != NULL) { code = tsdbTbDataIterCreate(di, &startKey, backward, &pBlockScanInfo->iiter.iter); if (code == TSDB_CODE_SUCCESS) { @@ -3103,7 +3103,7 @@ int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) { STbData* d = NULL; if (pReader->pTsdb->mem != NULL) { - tsdbGetTbDataFromMemTable(pReader->pReadSnap->pMem, pReader->suid, pBlockScanInfo->uid, &d); + d = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pMem, pReader->suid, pBlockScanInfo->uid); if (d != NULL) { rows += tsdbGetNRowsInTbData(d); } @@ -3111,7 +3111,7 @@ int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) { STbData* di = NULL; if (pReader->pTsdb->imem != NULL) { - tsdbGetTbDataFromMemTable(pReader->pReadSnap->pIMem, pReader->suid, pBlockScanInfo->uid, &di); + di = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pIMem, pReader->suid, pBlockScanInfo->uid); if (di != NULL) { rows += tsdbGetNRowsInTbData(di); }