refactor(query): opt tsdb read perf.
This commit is contained in:
parent
717e6706bb
commit
1f645ab1d9
|
@ -192,6 +192,9 @@ static SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond
|
||||||
static int64_t getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader);
|
static int64_t getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader);
|
||||||
static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader);
|
static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader);
|
||||||
static int32_t doBuildDataBlock(STsdbReader* pReader);
|
static int32_t doBuildDataBlock(STsdbReader* pReader);
|
||||||
|
static TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader);
|
||||||
|
|
||||||
|
static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); }
|
||||||
|
|
||||||
static int32_t setColumnIdSlotList(STsdbReader* pReader, SSDataBlock* pBlock) {
|
static int32_t setColumnIdSlotList(STsdbReader* pReader, SSDataBlock* pBlock) {
|
||||||
SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
|
SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
|
||||||
|
@ -682,12 +685,154 @@ static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter) {
|
||||||
|
|
||||||
static SDataBlk* getCurrentBlock(SDataBlockIter* pBlockIter) { return &pBlockIter->block; }
|
static SDataBlk* getCurrentBlock(SDataBlockIter* pBlockIter) { return &pBlockIter->block; }
|
||||||
|
|
||||||
|
int32_t binarySearchForTs(char* pValue, int num, TSKEY key, int order) {
|
||||||
|
int32_t midPos = -1;
|
||||||
|
int32_t numOfRows;
|
||||||
|
|
||||||
|
ASSERT(order == TSDB_ORDER_ASC || order == TSDB_ORDER_DESC);
|
||||||
|
|
||||||
|
TSKEY* keyList = (TSKEY*)pValue;
|
||||||
|
int32_t firstPos = 0;
|
||||||
|
int32_t lastPos = num - 1;
|
||||||
|
|
||||||
|
if (order == TSDB_ORDER_DESC) {
|
||||||
|
// find the first position which is smaller than the key
|
||||||
|
while (1) {
|
||||||
|
if (key >= keyList[firstPos]) return firstPos;
|
||||||
|
if (key == keyList[lastPos]) return lastPos;
|
||||||
|
|
||||||
|
if (key < keyList[lastPos]) {
|
||||||
|
lastPos += 1;
|
||||||
|
if (lastPos >= num) {
|
||||||
|
return -1;
|
||||||
|
} else {
|
||||||
|
return lastPos;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
numOfRows = lastPos - firstPos + 1;
|
||||||
|
midPos = (numOfRows >> 1) + firstPos;
|
||||||
|
|
||||||
|
if (key < keyList[midPos]) {
|
||||||
|
firstPos = midPos + 1;
|
||||||
|
} else if (key > keyList[midPos]) {
|
||||||
|
lastPos = midPos - 1;
|
||||||
|
} else {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
} else {
|
||||||
|
// find the first position which is bigger than the key
|
||||||
|
while (1) {
|
||||||
|
if (key <= keyList[firstPos]) return firstPos;
|
||||||
|
if (key == keyList[lastPos]) return lastPos;
|
||||||
|
|
||||||
|
if (key > keyList[lastPos]) {
|
||||||
|
lastPos = lastPos + 1;
|
||||||
|
if (lastPos >= num)
|
||||||
|
return -1;
|
||||||
|
else
|
||||||
|
return lastPos;
|
||||||
|
}
|
||||||
|
|
||||||
|
numOfRows = lastPos - firstPos + 1;
|
||||||
|
midPos = (numOfRows >> 1u) + firstPos;
|
||||||
|
|
||||||
|
if (key < keyList[midPos]) {
|
||||||
|
lastPos = midPos - 1;
|
||||||
|
} else if (key > keyList[midPos]) {
|
||||||
|
firstPos = midPos + 1;
|
||||||
|
} else {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return midPos;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int doBinarySearchKey(TSKEY* keyList, int num, int pos, TSKEY key, int order) {
|
||||||
|
// start end position
|
||||||
|
int s, e;
|
||||||
|
s = pos;
|
||||||
|
|
||||||
|
// check
|
||||||
|
assert(pos >=0 && pos < num);
|
||||||
|
assert(num > 0);
|
||||||
|
|
||||||
|
if (order == TSDB_ORDER_ASC) {
|
||||||
|
// find the first position which is smaller than the key
|
||||||
|
e = num - 1;
|
||||||
|
if (key < keyList[pos])
|
||||||
|
return -1;
|
||||||
|
while (1) {
|
||||||
|
// check can return
|
||||||
|
if (key >= keyList[e])
|
||||||
|
return e;
|
||||||
|
if (key <= keyList[s])
|
||||||
|
return s;
|
||||||
|
if (e - s <= 1)
|
||||||
|
return s;
|
||||||
|
|
||||||
|
// change start or end position
|
||||||
|
int mid = s + (e - s + 1)/2;
|
||||||
|
if (keyList[mid] > key)
|
||||||
|
e = mid;
|
||||||
|
else if(keyList[mid] < key)
|
||||||
|
s = mid;
|
||||||
|
else
|
||||||
|
return mid;
|
||||||
|
}
|
||||||
|
} else { // DESC
|
||||||
|
// find the first position which is bigger than the key
|
||||||
|
e = 0;
|
||||||
|
if (key > keyList[pos])
|
||||||
|
return -1;
|
||||||
|
while (1) {
|
||||||
|
// check can return
|
||||||
|
if (key <= keyList[e])
|
||||||
|
return e;
|
||||||
|
if (key >= keyList[s])
|
||||||
|
return s;
|
||||||
|
if (s - e <= 1)
|
||||||
|
return s;
|
||||||
|
|
||||||
|
// change start or end position
|
||||||
|
int mid = s - (s - e + 1)/2;
|
||||||
|
if (keyList[mid] < key)
|
||||||
|
e = mid;
|
||||||
|
else if(keyList[mid] > key)
|
||||||
|
s = mid;
|
||||||
|
else
|
||||||
|
return mid;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t getEndPosInDataBlock(STsdbReader* pReader, SBlockData* pBlockData, SDataBlk* pBlock, int32_t pos) {
|
||||||
|
// NOTE: reverse the order to find the end position in data block
|
||||||
|
int32_t endPos = -1;
|
||||||
|
bool asc = ASCENDING_TRAVERSE(pReader->order);
|
||||||
|
|
||||||
|
if (asc && pReader->window.ekey >= pBlock->maxKey.ts) {
|
||||||
|
endPos = pBlock->nRow - 1;
|
||||||
|
} else if (!asc && pReader->window.skey <= pBlock->minKey.ts) {
|
||||||
|
endPos = 0;
|
||||||
|
} else {
|
||||||
|
endPos = doBinarySearchKey(pBlockData->aTSKEY, pBlock->nRow, pos, pReader->window.ekey, pReader->order);
|
||||||
|
assert(endPos != -1);
|
||||||
|
}
|
||||||
|
|
||||||
|
return endPos;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo) {
|
static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo) {
|
||||||
SReaderStatus* pStatus = &pReader->status;
|
SReaderStatus* pStatus = &pReader->status;
|
||||||
SDataBlockIter* pBlockIter = &pStatus->blockIter;
|
SDataBlockIter* pBlockIter = &pStatus->blockIter;
|
||||||
|
|
||||||
SBlockData* pBlockData = &pStatus->fileBlockData;
|
SBlockData* pBlockData = &pStatus->fileBlockData;
|
||||||
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter);
|
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
|
||||||
SDataBlk* pBlock = getCurrentBlock(pBlockIter);
|
SDataBlk* pBlock = getCurrentBlock(pBlockIter);
|
||||||
SSDataBlock* pResBlock = pReader->pResBlock;
|
SSDataBlock* pResBlock = pReader->pResBlock;
|
||||||
int32_t numOfOutputCols = blockDataGetNumOfCols(pResBlock);
|
int32_t numOfOutputCols = blockDataGetNumOfCols(pResBlock);
|
||||||
|
@ -700,23 +845,36 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn
|
||||||
bool asc = ASCENDING_TRAVERSE(pReader->order);
|
bool asc = ASCENDING_TRAVERSE(pReader->order);
|
||||||
int32_t step = asc ? 1 : -1;
|
int32_t step = asc ? 1 : -1;
|
||||||
|
|
||||||
int32_t rowIndex = 0;
|
if (asc && pReader->window.skey <= pBlock->minKey.ts) {
|
||||||
int32_t remain = asc ? (pBlockData->nRow - pDumpInfo->rowIndex) : (pDumpInfo->rowIndex + 1);
|
pDumpInfo->rowIndex = 0;
|
||||||
|
} else if (!asc && pReader->window.ekey >= pBlock->maxKey.ts) {
|
||||||
int32_t endIndex = 0;
|
pDumpInfo->rowIndex = pBlock->nRow - 1;
|
||||||
if (remain <= pReader->capacity) {
|
|
||||||
endIndex = pBlockData->nRow;
|
|
||||||
} else {
|
} else {
|
||||||
endIndex = pDumpInfo->rowIndex + step * pReader->capacity;
|
int32_t pos = asc? pBlock->nRow-1:0;
|
||||||
|
int32_t order = (pReader->order == TSDB_ORDER_ASC)? TSDB_ORDER_DESC:TSDB_ORDER_ASC;
|
||||||
|
pDumpInfo->rowIndex = doBinarySearchKey(pBlockData->aTSKEY, pBlock->nRow, pos, pReader->window.skey, order);
|
||||||
|
}
|
||||||
|
|
||||||
|
// time window check
|
||||||
|
int32_t endIndex = getEndPosInDataBlock(pReader, pBlockData, pBlock, pDumpInfo->rowIndex) + step;
|
||||||
|
int32_t remain = asc ? (endIndex - pDumpInfo->rowIndex) : (pDumpInfo->rowIndex - endIndex);
|
||||||
|
if (remain > pReader->capacity) { // output buffer check
|
||||||
remain = pReader->capacity;
|
remain = pReader->capacity;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t i = 0;
|
int32_t rowIndex = 0;
|
||||||
|
|
||||||
|
int32_t i = 0;
|
||||||
SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, i);
|
SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, i);
|
||||||
if (pColData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
|
if (pColData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
|
||||||
for (int32_t j = pDumpInfo->rowIndex; j < endIndex && j >= 0; j += step) {
|
if (asc) {
|
||||||
colDataAppend(pColData, rowIndex++, (const char*)&pBlockData->aTSKEY[j], false);
|
memcpy(pColData->pData, &pBlockData->aTSKEY[pDumpInfo->rowIndex], remain);
|
||||||
|
} else {
|
||||||
|
for (int32_t j = pDumpInfo->rowIndex; rowIndex < remain; j += step) {
|
||||||
|
colDataAppendInt64(pColData, rowIndex++, &pBlockData->aTSKEY[j]);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
i += 1;
|
i += 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -730,13 +888,32 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn
|
||||||
if (pData->cid < pColData->info.colId) {
|
if (pData->cid < pColData->info.colId) {
|
||||||
colIndex += 1;
|
colIndex += 1;
|
||||||
} else if (pData->cid == pColData->info.colId) {
|
} else if (pData->cid == pColData->info.colId) {
|
||||||
for (int32_t j = pDumpInfo->rowIndex; j < endIndex && j >= 0; j += step) {
|
if (pData->flag == HAS_NONE || pData->flag == HAS_NULL) {
|
||||||
tColDataGetValue(pData, j, &cv);
|
colDataAppendNNULL(pColData, 0, remain);
|
||||||
doCopyColVal(pColData, rowIndex++, i, &cv, pSupInfo);
|
} else {
|
||||||
|
if (IS_NUMERIC_TYPE(pColData->info.type) && asc) {
|
||||||
|
uint8_t* p = pData->pData + tDataTypes[pData->type].bytes * pDumpInfo->rowIndex;
|
||||||
|
memcpy(pColData->pData, p, remain);
|
||||||
|
|
||||||
|
// null value exists, check one-by-one
|
||||||
|
if (pData->flag != HAS_VALUE) {
|
||||||
|
for (int32_t j = pDumpInfo->rowIndex; rowIndex < remain; j += step, rowIndex++) {
|
||||||
|
uint8_t v = GET_BIT2(pData->pBitMap, j);
|
||||||
|
if (v == 0 || v == 1) {
|
||||||
|
colDataSetNull_f(pColData->nullbitmap, rowIndex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
for (int32_t j = pDumpInfo->rowIndex; rowIndex < remain; j += step) {
|
||||||
|
tColDataGetValue(pData, j, &cv);
|
||||||
|
doCopyColVal(pColData, rowIndex++, i, &cv, pSupInfo);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
colIndex += 1;
|
colIndex += 1;
|
||||||
i += 1;
|
i += 1;
|
||||||
ASSERT(rowIndex == remain);
|
|
||||||
} else { // the specified column does not exist in file block, fill with null data
|
} else { // the specified column does not exist in file block, fill with null data
|
||||||
colDataAppendNNULL(pColData, 0, remain);
|
colDataAppendNNULL(pColData, 0, remain);
|
||||||
i += 1;
|
i += 1;
|
||||||
|
@ -752,7 +929,13 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn
|
||||||
pResBlock->info.rows = remain;
|
pResBlock->info.rows = remain;
|
||||||
pDumpInfo->rowIndex += step * remain;
|
pDumpInfo->rowIndex += step * remain;
|
||||||
|
|
||||||
setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order);
|
if (pDumpInfo->rowIndex >= 0 && pDumpInfo->rowIndex < pBlock->nRow) {
|
||||||
|
int64_t ts = pBlockData->aTSKEY[pDumpInfo->rowIndex];
|
||||||
|
setBlockAllDumped(pDumpInfo, ts, pReader->order);
|
||||||
|
} else {
|
||||||
|
int64_t k = asc? pBlock->maxKey.ts:pBlock->minKey.ts;
|
||||||
|
setBlockAllDumped(pDumpInfo, k, pReader->order);
|
||||||
|
}
|
||||||
|
|
||||||
double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
|
double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
|
||||||
pReader->cost.blockLoadTime += elapsedTime;
|
pReader->cost.blockLoadTime += elapsedTime;
|
||||||
|
@ -760,7 +943,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn
|
||||||
int32_t unDumpedRows = asc ? pBlock->nRow - pDumpInfo->rowIndex : pDumpInfo->rowIndex + 1;
|
int32_t unDumpedRows = asc ? pBlock->nRow - pDumpInfo->rowIndex : pDumpInfo->rowIndex + 1;
|
||||||
tsdbDebug("%p copy file block to sdatablock, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
|
tsdbDebug("%p copy file block to sdatablock, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
|
||||||
", rows:%d, remain:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s",
|
", rows:%d, remain:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s",
|
||||||
pReader, pBlockIter->index, pFBlock->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, remain, unDumpedRows,
|
pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, remain, unDumpedRows,
|
||||||
pBlock->minVer, pBlock->maxVer, elapsedTime, pReader->idStr);
|
pBlock->minVer, pBlock->maxVer, elapsedTime, pReader->idStr);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -1148,26 +1331,31 @@ static bool overlapWithDelSkyline(STableBlockScanInfo* pBlockScanInfo, const SDa
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// 1. the version of all rows should be less than the endVersion
|
typedef struct {
|
||||||
// 2. current block should not overlap with next neighbor block
|
bool overlapWithNeighborBlock;
|
||||||
// 3. current timestamp should not be overlap with each other
|
bool hasDupTs;
|
||||||
// 4. output buffer should be large enough to hold all rows in current block
|
bool overlapWithDelInfo;
|
||||||
// 5. delete info should not overlap with current block data
|
bool overlapWithLastBlock;
|
||||||
static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pFBlock, SDataBlk* pBlock,
|
bool overlapWithKeyInBuf;
|
||||||
STableBlockScanInfo* pScanInfo, TSDBKEY key, SLastBlockReader* pLastBlockReader) {
|
bool partiallyRequired;
|
||||||
|
bool moreThanCapcity;
|
||||||
|
} SDataBlockToLoadInfo;
|
||||||
|
|
||||||
|
static void getBlockToLoadInfo(SDataBlockToLoadInfo* pInfo, SFileDataBlockInfo* pBlockInfo, SDataBlk* pBlock,
|
||||||
|
STableBlockScanInfo* pScanInfo, TSDBKEY keyInBuf, SLastBlockReader* pLastBlockReader,
|
||||||
|
STsdbReader* pReader) {
|
||||||
int32_t neighborIndex = 0;
|
int32_t neighborIndex = 0;
|
||||||
SDataBlk* pNeighbor = getNeighborBlockOfSameTable(pFBlock, pScanInfo, &neighborIndex, pReader->order);
|
SDataBlk* pNeighbor = getNeighborBlockOfSameTable(pBlockInfo, pScanInfo, &neighborIndex, pReader->order);
|
||||||
|
|
||||||
// overlap with neighbor
|
// overlap with neighbor
|
||||||
bool overlapWithNeighbor = false;
|
|
||||||
if (pNeighbor) {
|
if (pNeighbor) {
|
||||||
overlapWithNeighbor = overlapWithNeighborBlock(pBlock, pNeighbor, pReader->order);
|
pInfo->overlapWithNeighborBlock = overlapWithNeighborBlock(pBlock, pNeighbor, pReader->order);
|
||||||
taosMemoryFree(pNeighbor);
|
taosMemoryFree(pNeighbor);
|
||||||
}
|
}
|
||||||
|
|
||||||
// has duplicated ts of different version in this block
|
// has duplicated ts of different version in this block
|
||||||
bool hasDup = (pBlock->nSubBlock == 1) ? pBlock->hasDup : true;
|
pInfo->hasDupTs = (pBlock->nSubBlock == 1) ? pBlock->hasDup : true;
|
||||||
bool overlapWithDel = overlapWithDelSkyline(pScanInfo, pBlock, pReader->order);
|
pInfo->overlapWithDelInfo = overlapWithDelSkyline(pScanInfo, pBlock, pReader->order);
|
||||||
|
|
||||||
// todo here we need to each key in the last files to identify if it is really overlapped with last block
|
// todo here we need to each key in the last files to identify if it is really overlapped with last block
|
||||||
// todo
|
// todo
|
||||||
|
@ -1179,25 +1367,48 @@ static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pFBloc
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
bool moreThanOutputCapacity = pBlock->nRow > pReader->capacity;
|
pInfo->moreThanCapcity = pBlock->nRow > pReader->capacity;
|
||||||
bool partiallyRequired = dataBlockPartiallyRequired(&pReader->window, &pReader->verRange, pBlock);
|
pInfo->partiallyRequired = dataBlockPartiallyRequired(&pReader->window, &pReader->verRange, pBlock);
|
||||||
bool overlapWithKey = keyOverlapFileBlock(key, pBlock, &pReader->verRange);
|
pInfo->overlapWithKeyInBuf = keyOverlapFileBlock(keyInBuf, pBlock, &pReader->verRange);
|
||||||
|
}
|
||||||
|
|
||||||
bool loadDataBlock = (overlapWithNeighbor || hasDup || partiallyRequired || overlapWithKey ||
|
// 1. the version of all rows should be less than the endVersion
|
||||||
moreThanOutputCapacity || overlapWithDel || overlapWithlastBlock);
|
// 2. current block should not overlap with next neighbor block
|
||||||
|
// 3. current timestamp should not be overlap with each other
|
||||||
|
// 4. output buffer should be large enough to hold all rows in current block
|
||||||
|
// 5. delete info should not overlap with current block data
|
||||||
|
// 6. current block should not contain the duplicated ts
|
||||||
|
static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pBlockInfo, SDataBlk* pBlock,
|
||||||
|
STableBlockScanInfo* pScanInfo, TSDBKEY keyInBuf, SLastBlockReader* pLastBlockReader) {
|
||||||
|
SDataBlockToLoadInfo info = {0};
|
||||||
|
getBlockToLoadInfo(&info, pBlockInfo, pBlock, pScanInfo, keyInBuf, pLastBlockReader, pReader);
|
||||||
|
|
||||||
|
bool loadDataBlock =
|
||||||
|
(info.overlapWithNeighborBlock || info.hasDupTs || info.partiallyRequired || info.overlapWithKeyInBuf ||
|
||||||
|
info.moreThanCapcity || info.overlapWithDelInfo || info.overlapWithLastBlock);
|
||||||
|
|
||||||
// log the reason why load the datablock for profile
|
// log the reason why load the datablock for profile
|
||||||
if (loadDataBlock) {
|
if (loadDataBlock) {
|
||||||
tsdbDebug("%p uid:%" PRIu64
|
tsdbDebug("%p uid:%" PRIu64
|
||||||
" need to load the datablock, overlapwithneighborblock:%d, hasDup:%d, partiallyRequired:%d, "
|
" need to load the datablock, overlapwithneighborblock:%d, hasDup:%d, partiallyRequired:%d, "
|
||||||
"overlapWithKey:%d, greaterThanBuf:%d, overlapWithDel:%d, overlapWithlastBlock:%d, %s",
|
"overlapWithKey:%d, greaterThanBuf:%d, overlapWithDel:%d, overlapWithlastBlock:%d, %s",
|
||||||
pReader, pFBlock->uid, overlapWithNeighbor, hasDup, partiallyRequired, overlapWithKey,
|
pReader, pBlockInfo->uid, info.overlapWithNeighborBlock, info.hasDupTs, info.partiallyRequired,
|
||||||
moreThanOutputCapacity, overlapWithDel, overlapWithlastBlock, pReader->idStr);
|
info.overlapWithKeyInBuf, info.moreThanCapcity, info.overlapWithDelInfo, info.overlapWithLastBlock,
|
||||||
|
pReader->idStr);
|
||||||
}
|
}
|
||||||
|
|
||||||
return loadDataBlock;
|
return loadDataBlock;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static bool isCleanFileDataBlock(STsdbReader* pReader, SFileDataBlockInfo* pBlockInfo, SDataBlk* pBlock,
|
||||||
|
STableBlockScanInfo* pScanInfo, TSDBKEY keyInBuf, SLastBlockReader* pLastBlockReader) {
|
||||||
|
SDataBlockToLoadInfo info = {0};
|
||||||
|
getBlockToLoadInfo(&info, pBlockInfo, pBlock, pScanInfo, keyInBuf, pLastBlockReader, pReader);
|
||||||
|
bool isCleanFileBlock = !(info.overlapWithNeighborBlock || info.hasDupTs || info.overlapWithKeyInBuf ||
|
||||||
|
info.overlapWithDelInfo || info.overlapWithLastBlock);
|
||||||
|
return isCleanFileBlock;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, int64_t endKey) {
|
static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, int64_t endKey) {
|
||||||
if (!(pBlockScanInfo->iiter.hasVal || pBlockScanInfo->iter.hasVal)) {
|
if (!(pBlockScanInfo->iiter.hasVal || pBlockScanInfo->iter.hasVal)) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -1918,8 +2129,6 @@ static bool isValidFileBlockRow(SBlockData* pBlockData, SFileBlockDumpInfo* pDum
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); }
|
|
||||||
|
|
||||||
static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScanInfo* pScanInfo, STsdbReader* pReader) {
|
static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScanInfo* pScanInfo, STsdbReader* pReader) {
|
||||||
// the last block reader has been initialized for this table.
|
// the last block reader has been initialized for this table.
|
||||||
if (pLBlockReader->uid == pScanInfo->uid) {
|
if (pLBlockReader->uid == pScanInfo->uid) {
|
||||||
|
@ -1961,12 +2170,11 @@ static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader) { return pLas
|
||||||
int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key,
|
int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key,
|
||||||
STsdbReader* pReader) {
|
STsdbReader* pReader) {
|
||||||
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
|
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
|
||||||
|
|
||||||
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
|
|
||||||
|
|
||||||
if (tryCopyDistinctRowFromFileBlock(pReader, pBlockData, key, pDumpInfo)) {
|
if (tryCopyDistinctRowFromFileBlock(pReader, pBlockData, key, pDumpInfo)) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
} else {
|
} else {
|
||||||
|
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
|
||||||
|
|
||||||
STSRow* pTSRow = NULL;
|
STSRow* pTSRow = NULL;
|
||||||
SRowMerger merge = {0};
|
SRowMerger merge = {0};
|
||||||
|
|
||||||
|
@ -1989,13 +2197,19 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI
|
||||||
SBlockData* pBlockData, SLastBlockReader* pLastBlockReader) {
|
SBlockData* pBlockData, SLastBlockReader* pLastBlockReader) {
|
||||||
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
|
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
|
||||||
|
|
||||||
int64_t key = (pBlockData->nRow > 0) ? pBlockData->aTSKEY[pDumpInfo->rowIndex] : INT64_MIN;
|
int64_t key = (pBlockData->nRow > 0) ? pBlockData->aTSKEY[pDumpInfo->rowIndex] : INT64_MIN;
|
||||||
TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader);
|
|
||||||
TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader);
|
|
||||||
|
|
||||||
if (pBlockScanInfo->iter.hasVal && pBlockScanInfo->iiter.hasVal) {
|
if (pBlockScanInfo->iter.hasVal && pBlockScanInfo->iiter.hasVal) {
|
||||||
return doMergeMultiLevelRows(pReader, pBlockScanInfo, pBlockData, pLastBlockReader);
|
return doMergeMultiLevelRows(pReader, pBlockScanInfo, pBlockData, pLastBlockReader);
|
||||||
} else {
|
} else {
|
||||||
|
TSDBROW *pRow = NULL, *piRow = NULL;
|
||||||
|
if (pBlockScanInfo->iter.hasVal) {
|
||||||
|
pRow = getValidMemRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pBlockScanInfo->iiter.hasVal) {
|
||||||
|
piRow = getValidMemRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader);
|
||||||
|
}
|
||||||
|
|
||||||
// imem + file + last block
|
// imem + file + last block
|
||||||
if (pBlockScanInfo->iiter.hasVal) {
|
if (pBlockScanInfo->iiter.hasVal) {
|
||||||
return doMergeBufAndFileRows(pReader, pBlockScanInfo, piRow, &pBlockScanInfo->iiter, key, pLastBlockReader);
|
return doMergeBufAndFileRows(pReader, pBlockScanInfo, piRow, &pBlockScanInfo->iiter, key, pLastBlockReader);
|
||||||
|
@ -2015,20 +2229,29 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
|
||||||
SSDataBlock* pResBlock = pReader->pResBlock;
|
SSDataBlock* pResBlock = pReader->pResBlock;
|
||||||
|
|
||||||
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
|
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
|
||||||
|
SLastBlockReader* pLastBlockReader = pReader->status.fileIter.pLastBlockReader;
|
||||||
|
|
||||||
|
int64_t st = taosGetTimestampUs();
|
||||||
|
|
||||||
STableBlockScanInfo* pBlockScanInfo = NULL;
|
STableBlockScanInfo* pBlockScanInfo = NULL;
|
||||||
if (pBlockInfo != NULL) {
|
if (pBlockInfo != NULL) {
|
||||||
pBlockScanInfo = taosHashGet(pReader->status.pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
|
pBlockScanInfo = taosHashGet(pReader->status.pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
|
||||||
} else {
|
SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
|
||||||
|
TSDBKEY keyInBuf = getCurrentKeyInBuf(pBlockScanInfo, pReader);
|
||||||
|
|
||||||
|
// it is a clean block, load it directly
|
||||||
|
if (isCleanFileDataBlock(pReader, pBlockInfo, pBlock, pBlockScanInfo, keyInBuf, pLastBlockReader)) {
|
||||||
|
copyBlockDataToSDataBlock(pReader, pBlockScanInfo);
|
||||||
|
goto _end;
|
||||||
|
}
|
||||||
|
} else { // file blocks not exist
|
||||||
pBlockScanInfo = pReader->status.pTableIter;
|
pBlockScanInfo = pReader->status.pTableIter;
|
||||||
}
|
}
|
||||||
|
|
||||||
SLastBlockReader* pLastBlockReader = pReader->status.fileIter.pLastBlockReader;
|
|
||||||
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
|
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
|
||||||
SBlockData* pBlockData = &pReader->status.fileBlockData;
|
SBlockData* pBlockData = &pReader->status.fileBlockData;
|
||||||
int32_t step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1;
|
int32_t step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1;
|
||||||
|
|
||||||
int64_t st = taosGetTimestampUs();
|
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
// todo check the validate of row in file block
|
// todo check the validate of row in file block
|
||||||
|
@ -2071,6 +2294,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
_end:
|
||||||
pResBlock->info.uid = pBlockScanInfo->uid;
|
pResBlock->info.uid = pBlockScanInfo->uid;
|
||||||
blockDataUpdateTsWindow(pResBlock, 0);
|
blockDataUpdateTsWindow(pResBlock, 0);
|
||||||
|
|
||||||
|
@ -2174,7 +2398,7 @@ _err:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader) {
|
TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader) {
|
||||||
TSDBKEY key = {.ts = TSKEY_INITIAL_VAL};
|
TSDBKEY key = {.ts = TSKEY_INITIAL_VAL};
|
||||||
TSDBROW* pRow = getValidMemRow(&pScanInfo->iter, pScanInfo->delSkyline, pReader);
|
TSDBROW* pRow = getValidMemRow(&pScanInfo->iter, pScanInfo->delSkyline, pReader);
|
||||||
if (pRow != NULL) {
|
if (pRow != NULL) {
|
||||||
|
@ -2370,12 +2594,12 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
|
||||||
}
|
}
|
||||||
|
|
||||||
initLastBlockReader(pLastBlockReader, pScanInfo, pReader);
|
initLastBlockReader(pLastBlockReader, pScanInfo, pReader);
|
||||||
TSDBKEY key = getCurrentKeyInBuf(pScanInfo, pReader);
|
TSDBKEY keyInBuf = getCurrentKeyInBuf(pScanInfo, pReader);
|
||||||
|
|
||||||
if (pBlockInfo == NULL) { // build data block from last data file
|
if (pBlockInfo == NULL) { // build data block from last data file
|
||||||
ASSERT(pBlockIter->numOfBlocks == 0);
|
ASSERT(pBlockIter->numOfBlocks == 0);
|
||||||
code = buildComposedDataBlock(pReader);
|
code = buildComposedDataBlock(pReader);
|
||||||
} else if (fileBlockShouldLoad(pReader, pBlockInfo, pBlock, pScanInfo, key, pLastBlockReader)) {
|
} else if (fileBlockShouldLoad(pReader, pBlockInfo, pBlock, pScanInfo, keyInBuf, pLastBlockReader)) {
|
||||||
tBlockDataReset(&pStatus->fileBlockData);
|
tBlockDataReset(&pStatus->fileBlockData);
|
||||||
code = tBlockDataInit(&pStatus->fileBlockData, pReader->suid, pScanInfo->uid, pReader->pSchema);
|
code = tBlockDataInit(&pStatus->fileBlockData, pReader->suid, pScanInfo->uid, pReader->pSchema);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
@ -2389,7 +2613,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
|
||||||
|
|
||||||
// build composed data block
|
// build composed data block
|
||||||
code = buildComposedDataBlock(pReader);
|
code = buildComposedDataBlock(pReader);
|
||||||
} else if (bufferDataInFileBlockGap(pReader->order, key, pBlock)) {
|
} else if (bufferDataInFileBlockGap(pReader->order, keyInBuf, pBlock)) {
|
||||||
// data in memory that are earlier than current file block
|
// data in memory that are earlier than current file block
|
||||||
// todo rows in buffer should be less than the file block in asc, greater than file block in desc
|
// todo rows in buffer should be less than the file block in asc, greater than file block in desc
|
||||||
int64_t endKey = (ASCENDING_TRAVERSE(pReader->order)) ? pBlock->minKey.ts : pBlock->maxKey.ts;
|
int64_t endKey = (ASCENDING_TRAVERSE(pReader->order)) ? pBlock->minKey.ts : pBlock->maxKey.ts;
|
||||||
|
@ -3120,11 +3344,11 @@ int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, S
|
||||||
}
|
}
|
||||||
|
|
||||||
SColVal cv = {0};
|
SColVal cv = {0};
|
||||||
int32_t numOfInputCols = taosArrayGetSize(pBlockData->aIdx);
|
int32_t numOfInputCols = pBlockData->aIdx->size;
|
||||||
int32_t numOfOutputCols = blockDataGetNumOfCols(pResBlock);
|
int32_t numOfOutputCols = pResBlock->pDataBlock->size;
|
||||||
|
|
||||||
while (i < numOfOutputCols && j < numOfInputCols) {
|
while (i < numOfOutputCols && j < numOfInputCols) {
|
||||||
SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, i);
|
SColumnInfoData* pCol = TARRAY_GET_ELEM(pResBlock->pDataBlock, i);
|
||||||
SColData* pData = tBlockDataGetColDataByIdx(pBlockData, j);
|
SColData* pData = tBlockDataGetColDataByIdx(pBlockData, j);
|
||||||
|
|
||||||
if (pData->cid < pCol->info.colId) {
|
if (pData->cid < pCol->info.colId) {
|
||||||
|
|
Loading…
Reference in New Issue