fix(query): opt perf in query last file blocks.

This commit is contained in:
Haojun Liao 2022-09-07 13:18:34 +08:00
parent cf46b22267
commit e7149cce3a
2 changed files with 174 additions and 68 deletions

View File

@ -120,6 +120,92 @@ static SBlockData* loadBlockIfMissing(SLDataIter *pIter) {
return NULL; return NULL;
} }
// find the earliest block that contains the required records
static FORCE_INLINE int32_t findEarliestIndex(int32_t index, uint64_t uid, const SSttBlk* pBlockList, int32_t backward) {
int32_t i = index;
int32_t step = backward? 1:-1;
while (uid >= pBlockList[i].minUid && uid <= pBlockList[i].maxUid && i >= 0) {
i += step;
}
return i - step;
}
static int32_t binarySearchForStartBlock(SSttBlk*pBlockList, int32_t num, uint64_t uid, int32_t backward) {
int32_t midPos = -1;
if (num <= 0) {
return -1;
}
int32_t firstPos = 0;
int32_t lastPos = num - 1;
// find the first position which is bigger than the key
if ((uid > pBlockList[lastPos].maxUid) || (uid < pBlockList[firstPos].minUid)) {
return -1;
}
while (1) {
if (uid >= pBlockList[firstPos].minUid && uid <= pBlockList[firstPos].maxUid) {
return findEarliestIndex(firstPos, uid, pBlockList, backward);
}
if (uid > pBlockList[lastPos].maxUid || uid < pBlockList[firstPos].minUid) {
return -1;
}
int32_t numOfRows = lastPos - firstPos + 1;
midPos = (numOfRows >> 1u) + firstPos;
if (uid < pBlockList[midPos].minUid) {
lastPos = midPos - 1;
} else if (uid > pBlockList[midPos].maxUid) {
firstPos = midPos + 1;
} else {
return findEarliestIndex(midPos, uid, pBlockList, backward);
}
}
}
static FORCE_INLINE int32_t findEarliestRow(int32_t index, uint64_t uid, const uint64_t* uidList, int32_t backward) {
int32_t i = index;
int32_t step = backward? 1:-1;
while (uid == uidList[i] && i >= 0) {
i += step;
}
return i - step;
}
static int32_t binarySearchForStartRowIndex(uint64_t* uidList, int32_t num, uint64_t uid, int32_t backward) {
int32_t firstPos = 0;
int32_t lastPos = num - 1;
// find the first position which is bigger than the key
if ((uid > uidList[lastPos]) || (uid < uidList[firstPos])) {
return -1;
}
while (1) {
if (uid == uidList[firstPos]) {
return findEarliestRow(firstPos, uid, uidList, backward);
}
if (uid > uidList[lastPos] || uid < uidList[firstPos]) {
return -1;
}
int32_t numOfRows = lastPos - firstPos + 1;
int32_t midPos = (numOfRows >> 1u) + firstPos;
if (uid < uidList[midPos]) {
lastPos = midPos - 1;
} else if (uid > uidList[midPos]) {
firstPos = midPos + 1;
} else {
return findEarliestRow(midPos, uid, uidList, backward);
}
}
}
int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t iStt, int8_t backward, uint64_t suid, int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t iStt, int8_t backward, uint64_t suid,
uint64_t uid, STimeWindow *pTimeWindow, SVersionRange *pRange, SSttBlockLoadInfo* pBlockLoadInfo) { uint64_t uid, STimeWindow *pTimeWindow, SVersionRange *pRange, SSttBlockLoadInfo* pBlockLoadInfo) {
int32_t code = 0; int32_t code = 0;
@ -147,33 +233,7 @@ int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t
size_t size = taosArrayGetSize(pBlockLoadInfo->aSttBlk); size_t size = taosArrayGetSize(pBlockLoadInfo->aSttBlk);
// find the start block // find the start block
int32_t index = -1; int32_t index = binarySearchForStartBlock(pBlockLoadInfo->aSttBlk->pData, size, uid, backward);
if (!backward) { // asc
for (int32_t i = 0; i < size; ++i) {
SSttBlk *p = taosArrayGet(pBlockLoadInfo->aSttBlk, i);
if (p->suid != suid) {
continue;
}
if (p->minUid <= uid && p->maxUid >= uid) {
index = i;
break;
}
}
} else { // desc
for (int32_t i = size - 1; i >= 0; --i) {
SSttBlk *p = taosArrayGet(pBlockLoadInfo->aSttBlk, i);
if (p->suid != suid) {
continue;
}
if (p->minUid <= uid && p->maxUid >= uid) {
index = i;
break;
}
}
}
(*pIter)->iSttBlk = index; (*pIter)->iSttBlk = index;
if (index != -1) { if (index != -1) {
(*pIter)->pSttBlk = taosArrayGet(pBlockLoadInfo->aSttBlk, (*pIter)->iSttBlk); (*pIter)->pSttBlk = taosArrayGet(pBlockLoadInfo->aSttBlk, (*pIter)->iSttBlk);
@ -193,7 +253,7 @@ void tLDataIterNextBlock(SLDataIter *pIter) {
pIter->iSttBlk += step; pIter->iSttBlk += step;
int32_t index = -1; int32_t index = -1;
size_t size = taosArrayGetSize(pIter->pBlockLoadInfo->aSttBlk); size_t size = pIter->pBlockLoadInfo->aSttBlk->size;//taosArrayGetSize(pIter->pBlockLoadInfo->aSttBlk);
for (int32_t i = pIter->iSttBlk; i < size && i >= 0; i += step) { for (int32_t i = pIter->iSttBlk; i < size && i >= 0; i += step) {
SSttBlk *p = taosArrayGet(pIter->pBlockLoadInfo->aSttBlk, i); SSttBlk *p = taosArrayGet(pIter->pBlockLoadInfo->aSttBlk, i);
if ((!pIter->backward) && p->minUid > pIter->uid) { if ((!pIter->backward) && p->minUid > pIter->uid) {
@ -232,9 +292,8 @@ void tLDataIterNextBlock(SLDataIter *pIter) {
} }
} }
if (index == -1) {
pIter->pSttBlk = NULL; pIter->pSttBlk = NULL;
} else { if (index != -1) {
pIter->pSttBlk = (SSttBlk *)taosArrayGet(pIter->pBlockLoadInfo->aSttBlk, pIter->iSttBlk); pIter->pSttBlk = (SSttBlk *)taosArrayGet(pIter->pBlockLoadInfo->aSttBlk, pIter->iSttBlk);
} }
} }
@ -247,18 +306,23 @@ static void findNextValidRow(SLDataIter *pIter) {
SBlockData *pBlockData = loadBlockIfMissing(pIter); SBlockData *pBlockData = loadBlockIfMissing(pIter);
// mostly we only need to find the start position for a given table
if ((((i == 0) && (!pIter->backward)) || (i == pBlockData->nRow - 1 && pIter->backward)) && pBlockData->aUid != NULL) {
i = binarySearchForStartRowIndex((uint64_t*)pBlockData->aUid, pBlockData->nRow, pIter->uid, pIter->backward);
}
for (; i < pBlockData->nRow && i >= 0; i += step) { for (; i < pBlockData->nRow && i >= 0; i += step) {
if (pBlockData->aUid != NULL) { if (pBlockData->aUid != NULL) {
if (!pIter->backward) { if (!pIter->backward) {
if (pBlockData->aUid[i] < pIter->uid) { /*if (pBlockData->aUid[i] < pIter->uid) {
continue; continue;
} else if (pBlockData->aUid[i] > pIter->uid) { } else */if (pBlockData->aUid[i] > pIter->uid) {
break; break;
} }
} else { } else {
if (pBlockData->aUid[i] > pIter->uid) { /*if (pBlockData->aUid[i] > pIter->uid) {
continue; continue;
} else if (pBlockData->aUid[i] < pIter->uid) { } else */if (pBlockData->aUid[i] < pIter->uid) {
break; break;
} }
} }

View File

@ -1238,6 +1238,38 @@ static bool tryCopyDistinctRowFromFileBlock(STsdbReader* pReader, SBlockData* pB
return false; return false;
} }
static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pBlockScanInfo) {
while (1) {
bool hasVal = tMergeTreeNext(&pLastBlockReader->mergeTree);
if (!hasVal) {
return false;
}
TSDBROW row = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
TSDBKEY k = TSDBROW_KEY(&row);
if (!hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->lastBlockDelIndex, &k, pLastBlockReader->order)) {
return true;
}
}
}
static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SLastBlockReader* pLastBlockReader,
STableBlockScanInfo* pScanInfo, int64_t ts, STsdbReader* pReader) {
bool hasVal = nextRowFromLastBlocks(pLastBlockReader, pScanInfo);
if (hasVal) {
int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader);
if (next1 != ts) {
doAppendRowFromFileBlock(pReader->pResBlock, pReader, fRow->pBlockData, fRow->iRow);
return true;
}
} else {
doAppendRowFromFileBlock(pReader->pResBlock, pReader, fRow->pBlockData, fRow->iRow);
return true;
}
return false;
}
static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader* pReader, uint64_t uid) { static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader* pReader, uint64_t uid) {
// always set the newest schema version in pReader->pSchema // always set the newest schema version in pReader->pSchema
if (pReader->pSchema == NULL) { if (pReader->pSchema == NULL) {
@ -1389,8 +1421,13 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
STSRow* pTSRow = NULL; STSRow* pTSRow = NULL;
SRowMerger merge = {0}; SRowMerger merge = {0};
TSDBROW fRow = tMergeTreeGetRow(&pLastBlockReader->mergeTree); TSDBROW fRow = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
// only last block exists
if ((!mergeBlockData) || (tsLastBlock != pBlockData->aTSKEY[pDumpInfo->rowIndex])) {
if (tryCopyDistinctRowFromSttBlock(&fRow, pLastBlockReader, pBlockScanInfo, tsLastBlock, pReader)) {
return TSDB_CODE_SUCCESS;
} else {
tRowMergerInit(&merge, &fRow, pReader->pSchema); tRowMergerInit(&merge, &fRow, pReader->pSchema);
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge); doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge);
@ -1408,6 +1445,27 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
taosMemoryFree(pTSRow); taosMemoryFree(pTSRow);
tRowMergerClear(&merge); tRowMergerClear(&merge);
}
} else { // not merge block data
tRowMergerInit(&merge, &fRow, pReader->pSchema);
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge);
// merge with block data if ts == key
if (mergeBlockData && (tsLastBlock == pBlockData->aTSKEY[pDumpInfo->rowIndex])) {
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
}
int32_t code = tRowMergerGetRow(&merge, &pTSRow);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);
taosMemoryFree(pTSRow);
tRowMergerClear(&merge);
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -1858,21 +1916,6 @@ static bool isValidFileBlockRow(SBlockData* pBlockData, SFileBlockDumpInfo* pDum
static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); } static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); }
static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pBlockScanInfo) {
while (1) {
bool hasVal = tMergeTreeNext(&pLastBlockReader->mergeTree);
if (!hasVal) {
return false;
}
TSDBROW row = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
TSDBKEY k = TSDBROW_KEY(&row);
if (!hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->lastBlockDelIndex, &k, pLastBlockReader->order)) {
return true;
}
}
}
static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScanInfo* pScanInfo, STsdbReader* pReader) { static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScanInfo* pScanInfo, STsdbReader* pReader) {
// the last block reader has been initialized for this table. // the last block reader has been initialized for this table.
if (pLBlockReader->uid == pScanInfo->uid) { if (pLBlockReader->uid == pScanInfo->uid) {
@ -1906,8 +1949,7 @@ static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScan
static int64_t getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader) { static int64_t getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader) {
TSDBROW row = tMergeTreeGetRow(&pLastBlockReader->mergeTree); TSDBROW row = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
TSDBKEY key = TSDBROW_KEY(&row); return TSDBROW_TS(&row);
return key.ts;
} }
static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader) { return pLastBlockReader->mergeTree.pIter != NULL; } static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader) { return pLastBlockReader->mergeTree.pIter != NULL; }