more optimize
This commit is contained in:
parent
e817950802
commit
3359b8e620
|
@ -194,11 +194,12 @@ int32_t tsdbDecmprColData(uint8_t *pIn, SBlockCol *pBlockCol, int8_t cmprAlg, in
|
|||
int32_t tsdbReadAndCheck(TdFilePtr pFD, int64_t offset, uint8_t **ppOut, int32_t size, int8_t toCheck);
|
||||
// tsdbMemTable ==============================================================================================
|
||||
// SMemTable
|
||||
int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable);
|
||||
void tsdbMemTableDestroy(SMemTable *pMemTable);
|
||||
void tsdbGetTbDataFromMemTable(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid, STbData **ppTbData);
|
||||
void tsdbRefMemTable(SMemTable *pMemTable);
|
||||
void tsdbUnrefMemTable(SMemTable *pMemTable);
|
||||
int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable);
|
||||
void tsdbMemTableDestroy(SMemTable *pMemTable);
|
||||
STbData *tsdbGetTbDataFromMemTable(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid);
|
||||
void tsdbRefMemTable(SMemTable *pMemTable);
|
||||
void tsdbUnrefMemTable(SMemTable *pMemTable);
|
||||
SArray *tsdbMemTableGetTbDataArray(SMemTable *pMemTable);
|
||||
// STbDataIter
|
||||
int32_t tsdbTbDataIterCreate(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, STbDataIter **ppIter);
|
||||
void *tsdbTbDataIterDestroy(STbDataIter *pIter);
|
||||
|
@ -359,6 +360,7 @@ struct STbData {
|
|||
SDelData *pHead;
|
||||
SDelData *pTail;
|
||||
SMemSkipList sl;
|
||||
STbData *next;
|
||||
};
|
||||
|
||||
struct SMemTable {
|
||||
|
@ -372,7 +374,11 @@ struct SMemTable {
|
|||
int64_t maxVersion;
|
||||
int64_t nRow;
|
||||
int64_t nDel;
|
||||
SArray *aTbData; // SArray<STbData*>
|
||||
struct {
|
||||
int32_t nTbData;
|
||||
int32_t nBucket;
|
||||
STbData **aBucket;
|
||||
};
|
||||
};
|
||||
|
||||
struct TSDBROW {
|
||||
|
|
|
@ -742,12 +742,12 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs
|
|||
|
||||
STbData *pMem = NULL;
|
||||
if (pIter->pReadSnap->pMem) {
|
||||
tsdbGetTbDataFromMemTable(pIter->pReadSnap->pMem, suid, uid, &pMem);
|
||||
pMem = tsdbGetTbDataFromMemTable(pIter->pReadSnap->pMem, suid, uid);
|
||||
}
|
||||
|
||||
STbData *pIMem = NULL;
|
||||
if (pIter->pReadSnap->pIMem) {
|
||||
tsdbGetTbDataFromMemTable(pIter->pReadSnap->pIMem, suid, uid, &pIMem);
|
||||
pIMem = tsdbGetTbDataFromMemTable(pIter->pReadSnap->pIMem, suid, uid);
|
||||
}
|
||||
|
||||
pIter->pTsdb = pTsdb;
|
||||
|
|
|
@ -35,6 +35,7 @@ typedef struct {
|
|||
int32_t minRow;
|
||||
int32_t maxRow;
|
||||
int8_t cmprAlg;
|
||||
SArray *aTbDataP;
|
||||
STsdbFS fs;
|
||||
// --------------
|
||||
TSKEY nextKey; // reset by each table commit
|
||||
|
@ -1212,8 +1213,8 @@ static int32_t tsdbCommitFileData(SCommitter *pCommitter) {
|
|||
if (code) goto _err;
|
||||
|
||||
// commit file data impl
|
||||
for (int32_t iTbData = 0; iTbData < taosArrayGetSize(pMemTable->aTbData); iTbData++) {
|
||||
STbData *pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData);
|
||||
for (int32_t iTbData = 0; iTbData < taosArrayGetSize(pCommitter->aTbDataP); iTbData++) {
|
||||
STbData *pTbData = (STbData *)taosArrayGetP(pCommitter->aTbDataP, iTbData);
|
||||
|
||||
// move commit until current (suid, uid)
|
||||
code = tsdbMoveCommitData(pCommitter, *(TABLEID *)pTbData);
|
||||
|
@ -1270,6 +1271,11 @@ static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter) {
|
|||
pCommitter->minRow = pTsdb->pVnode->config.tsdbCfg.minRows;
|
||||
pCommitter->maxRow = pTsdb->pVnode->config.tsdbCfg.maxRows;
|
||||
pCommitter->cmprAlg = pTsdb->pVnode->config.tsdbCfg.compression;
|
||||
pCommitter->aTbDataP = tsdbMemTableGetTbDataArray(pTsdb->imem);
|
||||
if (pCommitter->aTbDataP == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
code = tsdbFSCopy(pTsdb, &pCommitter->fs);
|
||||
if (code) goto _err;
|
||||
|
@ -1395,13 +1401,13 @@ static int32_t tsdbCommitDel(SCommitter *pCommitter) {
|
|||
int32_t iDelIdx = 0;
|
||||
int32_t nDelIdx = taosArrayGetSize(pCommitter->aDelIdx);
|
||||
int32_t iTbData = 0;
|
||||
int32_t nTbData = taosArrayGetSize(pMemTable->aTbData);
|
||||
int32_t nTbData = taosArrayGetSize(pCommitter->aTbDataP);
|
||||
STbData *pTbData;
|
||||
SDelIdx *pDelIdx;
|
||||
|
||||
ASSERT(nTbData > 0);
|
||||
|
||||
pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData);
|
||||
pTbData = (STbData *)taosArrayGetP(pCommitter->aTbDataP, iTbData);
|
||||
pDelIdx = (iDelIdx < nDelIdx) ? (SDelIdx *)taosArrayGet(pCommitter->aDelIdx, iDelIdx) : NULL;
|
||||
while (true) {
|
||||
if (pTbData == NULL && pDelIdx == NULL) break;
|
||||
|
@ -1427,7 +1433,7 @@ static int32_t tsdbCommitDel(SCommitter *pCommitter) {
|
|||
if (code) goto _err;
|
||||
|
||||
iTbData++;
|
||||
pTbData = (iTbData < nTbData) ? (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData) : NULL;
|
||||
pTbData = (iTbData < nTbData) ? (STbData *)taosArrayGetP(pCommitter->aTbDataP, iTbData) : NULL;
|
||||
continue;
|
||||
|
||||
_commit_disk_del:
|
||||
|
@ -1443,7 +1449,7 @@ static int32_t tsdbCommitDel(SCommitter *pCommitter) {
|
|||
if (code) goto _err;
|
||||
|
||||
iTbData++;
|
||||
pTbData = (iTbData < nTbData) ? (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData) : NULL;
|
||||
pTbData = (iTbData < nTbData) ? (STbData *)taosArrayGetP(pCommitter->aTbDataP, iTbData) : NULL;
|
||||
iDelIdx++;
|
||||
pDelIdx = (iDelIdx < nDelIdx) ? (SDelIdx *)taosArrayGet(pCommitter->aDelIdx, iDelIdx) : NULL;
|
||||
continue;
|
||||
|
@ -1491,6 +1497,7 @@ static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno) {
|
|||
|
||||
tsdbUnrefMemTable(pMemTable);
|
||||
tsdbFSDestroy(&pCommitter->fs);
|
||||
taosArrayDestroy(pCommitter->aTbDataP);
|
||||
|
||||
tsdbInfo("vgId:%d, tsdb end commit", TD_VID(pTsdb->pVnode));
|
||||
return code;
|
||||
|
|
|
@ -15,6 +15,7 @@
|
|||
|
||||
#include "tsdb.h"
|
||||
|
||||
#define MEM_MIN_HASH 1024
|
||||
#define SL_MAX_LEVEL 5
|
||||
|
||||
#define SL_NODE_SIZE(l) (sizeof(SMemSkipListNode) + sizeof(SMemSkipListNode *) * (l)*2)
|
||||
|
@ -49,8 +50,10 @@ int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable) {
|
|||
pMemTable->maxVersion = VERSION_MIN;
|
||||
pMemTable->nRow = 0;
|
||||
pMemTable->nDel = 0;
|
||||
pMemTable->aTbData = taosArrayInit(128, sizeof(STbData *));
|
||||
if (pMemTable->aTbData == NULL) {
|
||||
pMemTable->nTbData = 0;
|
||||
pMemTable->nBucket = MEM_MIN_HASH;
|
||||
pMemTable->aBucket = (STbData **)taosMemoryCalloc(pMemTable->nBucket, sizeof(STbData *));
|
||||
if (pMemTable->aBucket == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
taosMemoryFree(pMemTable);
|
||||
goto _err;
|
||||
|
@ -68,37 +71,30 @@ _err:
|
|||
void tsdbMemTableDestroy(SMemTable *pMemTable) {
|
||||
if (pMemTable) {
|
||||
vnodeBufPoolUnRef(pMemTable->pPool);
|
||||
taosArrayDestroy(pMemTable->aTbData);
|
||||
taosMemoryFree(pMemTable->aBucket);
|
||||
taosMemoryFree(pMemTable);
|
||||
}
|
||||
}
|
||||
|
||||
static int32_t tbDataPCmprFn(const void *p1, const void *p2) {
|
||||
STbData *pTbData1 = *(STbData **)p1;
|
||||
STbData *pTbData2 = *(STbData **)p2;
|
||||
static FORCE_INLINE STbData *tsdbGetTbDataFromMemTableImpl(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid) {
|
||||
STbData *pTbData = pMemTable->aBucket[TABS(uid) % pMemTable->nBucket];
|
||||
|
||||
if (pTbData1->suid < pTbData2->suid) {
|
||||
return -1;
|
||||
} else if (pTbData1->suid > pTbData2->suid) {
|
||||
return 1;
|
||||
while (pTbData) {
|
||||
if (pTbData->uid == uid) break;
|
||||
pTbData = pTbData->next;
|
||||
}
|
||||
|
||||
if (pTbData1->uid < pTbData2->uid) {
|
||||
return -1;
|
||||
} else if (pTbData1->uid > pTbData2->uid) {
|
||||
return 1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
return pTbData;
|
||||
}
|
||||
void tsdbGetTbDataFromMemTable(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid, STbData **ppTbData) {
|
||||
STbData *pTbData = &(STbData){.suid = suid, .uid = uid};
|
||||
|
||||
STbData *tsdbGetTbDataFromMemTable(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid) {
|
||||
STbData *pTbData;
|
||||
|
||||
taosRLockLatch(&pMemTable->latch);
|
||||
void *p = taosArraySearch(pMemTable->aTbData, &pTbData, tbDataPCmprFn, TD_EQ);
|
||||
pTbData = tsdbGetTbDataFromMemTableImpl(pMemTable, suid, uid);
|
||||
taosRUnLockLatch(&pMemTable->latch);
|
||||
|
||||
*ppTbData = p ? *(STbData **)p : NULL;
|
||||
return pTbData;
|
||||
}
|
||||
|
||||
int32_t tsdbInsertTableData(STsdb *pTsdb, int64_t version, SSubmitMsgIter *pMsgIter, SSubmitBlk *pBlock,
|
||||
|
@ -318,18 +314,44 @@ _exit:
|
|||
return pIter->pRow;
|
||||
}
|
||||
|
||||
static int32_t tsdbMemTableRehash(SMemTable *pMemTable) {
|
||||
int32_t code = 0;
|
||||
|
||||
int32_t nBucket = pMemTable->nBucket * 2;
|
||||
STbData **aBucket = (STbData **)taosMemoryCalloc(nBucket, sizeof(STbData *));
|
||||
if (aBucket == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
for (int32_t iBucket = 0; iBucket < pMemTable->nBucket; iBucket++) {
|
||||
STbData *pTbData = pMemTable->aBucket[iBucket];
|
||||
|
||||
while (pTbData) {
|
||||
STbData *pNext = pTbData->next;
|
||||
|
||||
int32_t idx = TABS(pTbData->uid) % nBucket;
|
||||
pTbData->next = aBucket[idx];
|
||||
aBucket[idx] = pTbData;
|
||||
|
||||
pTbData = pNext;
|
||||
}
|
||||
}
|
||||
|
||||
taosMemoryFree(pMemTable->aBucket);
|
||||
pMemTable->nBucket = nBucket;
|
||||
pMemTable->aBucket = aBucket;
|
||||
|
||||
_exit:
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbGetOrCreateTbData(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid, STbData **ppTbData) {
|
||||
int32_t code = 0;
|
||||
int32_t idx = 0;
|
||||
STbData *pTbData = NULL;
|
||||
STbData *pTbDataT = &(STbData){.suid = suid, .uid = uid};
|
||||
int32_t code = 0;
|
||||
|
||||
// get
|
||||
idx = taosArraySearchIdx(pMemTable->aTbData, &pTbDataT, tbDataPCmprFn, TD_GE);
|
||||
if (idx >= 0) {
|
||||
pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, idx);
|
||||
if (tbDataPCmprFn(&pTbDataT, &pTbData) == 0) goto _exit;
|
||||
}
|
||||
STbData *pTbData = tsdbGetTbDataFromMemTableImpl(pMemTable, suid, uid);
|
||||
if (pTbData) goto _exit;
|
||||
|
||||
// create
|
||||
SVBufPool *pPool = pMemTable->pTsdb->pVnode->inUse;
|
||||
|
@ -365,21 +387,23 @@ static int32_t tsdbGetOrCreateTbData(SMemTable *pMemTable, tb_uid_t suid, tb_uid
|
|||
SL_NODE_FORWARD(pTbData->sl.pTail, iLevel) = NULL;
|
||||
}
|
||||
|
||||
void *p;
|
||||
if (idx < 0) {
|
||||
idx = taosArrayGetSize(pMemTable->aTbData);
|
||||
taosWLockLatch(&pMemTable->latch);
|
||||
|
||||
if (pMemTable->nTbData >= pMemTable->nBucket) {
|
||||
code = tsdbMemTableRehash(pMemTable);
|
||||
if (code) {
|
||||
taosWUnLockLatch(&pMemTable->latch);
|
||||
goto _err;
|
||||
}
|
||||
}
|
||||
|
||||
taosWLockLatch(&pMemTable->latch);
|
||||
p = taosArrayInsert(pMemTable->aTbData, idx, &pTbData);
|
||||
int32_t idx = TABS(uid) % pMemTable->nBucket;
|
||||
pTbData->next = pMemTable->aBucket[idx];
|
||||
pMemTable->aBucket[idx] = pTbData;
|
||||
pMemTable->nTbData++;
|
||||
|
||||
taosWUnLockLatch(&pMemTable->latch);
|
||||
|
||||
tsdbDebug("vgId:%d, add table data %p at idx:%d", TD_VID(pMemTable->pTsdb->pVnode), pTbData, idx);
|
||||
|
||||
if (p == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
_exit:
|
||||
*ppTbData = pTbData;
|
||||
return code;
|
||||
|
@ -622,3 +646,41 @@ void tsdbUnrefMemTable(SMemTable *pMemTable) {
|
|||
tsdbMemTableDestroy(pMemTable);
|
||||
}
|
||||
}
|
||||
|
||||
static FORCE_INLINE int32_t tbDataPCmprFn(const void *p1, const void *p2) {
|
||||
STbData *pTbData1 = *(STbData **)p1;
|
||||
STbData *pTbData2 = *(STbData **)p2;
|
||||
|
||||
if (pTbData1->suid < pTbData2->suid) {
|
||||
return -1;
|
||||
} else if (pTbData1->suid > pTbData2->suid) {
|
||||
return 1;
|
||||
}
|
||||
|
||||
if (pTbData1->uid < pTbData2->uid) {
|
||||
return -1;
|
||||
} else if (pTbData1->uid > pTbData2->uid) {
|
||||
return 1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
SArray *tsdbMemTableGetTbDataArray(SMemTable *pMemTable) {
|
||||
SArray *aTbDataP = taosArrayInit(pMemTable->nTbData, sizeof(STbData *));
|
||||
if (aTbDataP == NULL) goto _exit;
|
||||
|
||||
for (int32_t iBucket = 0; iBucket < pMemTable->nBucket; iBucket++) {
|
||||
STbData *pTbData = pMemTable->aBucket[iBucket];
|
||||
|
||||
while (pTbData) {
|
||||
taosArrayPush(aTbDataP, &pTbData);
|
||||
pTbData = pTbData->next;
|
||||
}
|
||||
}
|
||||
|
||||
taosArraySort(aTbDataP, tbDataPCmprFn);
|
||||
|
||||
_exit:
|
||||
return aTbDataP;
|
||||
}
|
||||
|
|
|
@ -1616,7 +1616,7 @@ static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbRea
|
|||
|
||||
STbData* d = NULL;
|
||||
if (pReader->pReadSnap->pMem != NULL) {
|
||||
tsdbGetTbDataFromMemTable(pReader->pReadSnap->pMem, pReader->suid, pBlockScanInfo->uid, &d);
|
||||
d = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pMem, pReader->suid, pBlockScanInfo->uid);
|
||||
if (d != NULL) {
|
||||
code = tsdbTbDataIterCreate(d, &startKey, backward, &pBlockScanInfo->iter.iter);
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
|
@ -1637,7 +1637,7 @@ static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbRea
|
|||
|
||||
STbData* di = NULL;
|
||||
if (pReader->pReadSnap->pIMem != NULL) {
|
||||
tsdbGetTbDataFromMemTable(pReader->pReadSnap->pIMem, pReader->suid, pBlockScanInfo->uid, &di);
|
||||
di = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pIMem, pReader->suid, pBlockScanInfo->uid);
|
||||
if (di != NULL) {
|
||||
code = tsdbTbDataIterCreate(di, &startKey, backward, &pBlockScanInfo->iiter.iter);
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
|
@ -3103,7 +3103,7 @@ int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) {
|
|||
|
||||
STbData* d = NULL;
|
||||
if (pReader->pTsdb->mem != NULL) {
|
||||
tsdbGetTbDataFromMemTable(pReader->pReadSnap->pMem, pReader->suid, pBlockScanInfo->uid, &d);
|
||||
d = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pMem, pReader->suid, pBlockScanInfo->uid);
|
||||
if (d != NULL) {
|
||||
rows += tsdbGetNRowsInTbData(d);
|
||||
}
|
||||
|
@ -3111,7 +3111,7 @@ int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) {
|
|||
|
||||
STbData* di = NULL;
|
||||
if (pReader->pTsdb->imem != NULL) {
|
||||
tsdbGetTbDataFromMemTable(pReader->pReadSnap->pIMem, pReader->suid, pBlockScanInfo->uid, &di);
|
||||
di = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pIMem, pReader->suid, pBlockScanInfo->uid);
|
||||
if (di != NULL) {
|
||||
rows += tsdbGetNRowsInTbData(di);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue