|
|
|
@ -956,9 +956,9 @@ static int32_t loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBl
|
|
|
|
|
return code;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
SDataCols* pTSCol = pQueryHandle->rhelper.pDataCols[0];
|
|
|
|
|
SDataCols* pTsCol = pQueryHandle->rhelper.pDataCols[0];
|
|
|
|
|
if (pCheckInfo->lastKey < pBlock->keyLast) {
|
|
|
|
|
cur->pos = binarySearchForKey(pTSCol->cols[0].pData, pBlock->numOfRows, pCheckInfo->lastKey, pQueryHandle->order);
|
|
|
|
|
cur->pos = binarySearchForKey(pTsCol->cols[0].pData, pBlock->numOfRows, pCheckInfo->lastKey, pQueryHandle->order);
|
|
|
|
|
} else {
|
|
|
|
|
cur->pos = pBlock->numOfRows - 1;
|
|
|
|
|
}
|
|
|
|
@ -1704,7 +1704,32 @@ static int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numO
|
|
|
|
|
return TSDB_CODE_SUCCESS;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static int32_t getDataBlocksInFilesImpl(STsdbQueryHandle* pQueryHandle, bool* exists) {
|
|
|
|
|
static int32_t getFirstFileDataBlock(STsdbQueryHandle* pQueryHandle, bool* exists);
|
|
|
|
|
|
|
|
|
|
static int32_t getDataBlockRv(STsdbQueryHandle* pQueryHandle, STableBlockInfo* pNext, bool *exists) {
|
|
|
|
|
int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order)? 1 : -1;
|
|
|
|
|
SQueryFilePos* cur = &pQueryHandle->cur;
|
|
|
|
|
|
|
|
|
|
while(1) {
|
|
|
|
|
int32_t code = loadFileDataBlock(pQueryHandle, pNext->compBlock, pNext->pTableCheckInfo, exists);
|
|
|
|
|
if (code != TSDB_CODE_SUCCESS || *exists) {
|
|
|
|
|
return code;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if ((cur->slot == pQueryHandle->numOfBlocks - 1 && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
|
|
|
|
|
(cur->slot == 0 && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
|
|
|
|
|
// all data blocks in current file has been checked already, try next file if exists
|
|
|
|
|
return getFirstFileDataBlock(pQueryHandle, exists);
|
|
|
|
|
} else { // next block of the same file
|
|
|
|
|
cur->slot += step;
|
|
|
|
|
cur->mixBlock = false;
|
|
|
|
|
cur->blockCompleted = false;
|
|
|
|
|
pNext = &pQueryHandle->pDataBlockInfo[cur->slot];
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static int32_t getFirstFileDataBlock(STsdbQueryHandle* pQueryHandle, bool* exists) {
|
|
|
|
|
pQueryHandle->numOfBlocks = 0;
|
|
|
|
|
SQueryFilePos* cur = &pQueryHandle->cur;
|
|
|
|
|
|
|
|
|
@ -1789,7 +1814,23 @@ static int32_t getDataBlocksInFilesImpl(STsdbQueryHandle* pQueryHandle, bool* ex
|
|
|
|
|
cur->fid = pQueryHandle->pFileGroup->fileId;
|
|
|
|
|
|
|
|
|
|
STableBlockInfo* pBlockInfo = &pQueryHandle->pDataBlockInfo[cur->slot];
|
|
|
|
|
return loadFileDataBlock(pQueryHandle, pBlockInfo->compBlock, pBlockInfo->pTableCheckInfo, exists);
|
|
|
|
|
return getDataBlockRv(pQueryHandle, pBlockInfo, exists);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static bool isEndFileDataBlock(SQueryFilePos* cur, int32_t numOfBlocks, bool ascTrav) {
|
|
|
|
|
assert(cur != NULL && numOfBlocks > 0);
|
|
|
|
|
return (cur->slot == numOfBlocks - 1 && ascTrav) || (cur->slot == 0 && !ascTrav);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void moveToNextDataBlockInCurrentFile(STsdbQueryHandle* pQueryHandle) {
|
|
|
|
|
int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order)? 1 : -1;
|
|
|
|
|
|
|
|
|
|
SQueryFilePos* cur = &pQueryHandle->cur;
|
|
|
|
|
assert(cur->slot < pQueryHandle->numOfBlocks && cur->slot >= 0);
|
|
|
|
|
|
|
|
|
|
cur->slot += step;
|
|
|
|
|
cur->mixBlock = false;
|
|
|
|
|
cur->blockCompleted = false;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static int32_t getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle, bool* exists) {
|
|
|
|
@ -1800,14 +1841,14 @@ static int32_t getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle, bool* exists
|
|
|
|
|
if (!pQueryHandle->locateStart) {
|
|
|
|
|
pQueryHandle->locateStart = true;
|
|
|
|
|
STsdbCfg* pCfg = &pQueryHandle->pTsdb->config;
|
|
|
|
|
int32_t fid = getFileIdFromKey(pQueryHandle->window.skey, pCfg->daysPerFile, pCfg->precision);
|
|
|
|
|
int32_t fid = getFileIdFromKey(pQueryHandle->window.skey, pCfg->daysPerFile, pCfg->precision);
|
|
|
|
|
|
|
|
|
|
pthread_rwlock_rdlock(&pQueryHandle->pTsdb->tsdbFileH->fhlock);
|
|
|
|
|
tsdbInitFileGroupIter(pFileHandle, &pQueryHandle->fileIter, pQueryHandle->order);
|
|
|
|
|
tsdbSeekFileGroupIter(&pQueryHandle->fileIter, fid);
|
|
|
|
|
pthread_rwlock_unlock(&pQueryHandle->pTsdb->tsdbFileH->fhlock);
|
|
|
|
|
|
|
|
|
|
return getDataBlocksInFilesImpl(pQueryHandle, exists);
|
|
|
|
|
return getFirstFileDataBlock(pQueryHandle, exists);
|
|
|
|
|
} else {
|
|
|
|
|
// check if current file block is all consumed
|
|
|
|
|
STableBlockInfo* pBlockInfo = &pQueryHandle->pDataBlockInfo[cur->slot];
|
|
|
|
@ -1815,27 +1856,26 @@ static int32_t getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle, bool* exists
|
|
|
|
|
|
|
|
|
|
// current block is done, try next
|
|
|
|
|
if ((!cur->mixBlock) || cur->blockCompleted) {
|
|
|
|
|
if ((cur->slot == pQueryHandle->numOfBlocks - 1 && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
|
|
|
|
|
(cur->slot == 0 && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
|
|
|
|
|
// all data blocks in current file has been checked already, try next file if exists
|
|
|
|
|
return getDataBlocksInFilesImpl(pQueryHandle, exists);
|
|
|
|
|
} else {
|
|
|
|
|
// next block of the same file
|
|
|
|
|
int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order) ? 1 : -1;
|
|
|
|
|
cur->slot += step;
|
|
|
|
|
|
|
|
|
|
cur->mixBlock = false;
|
|
|
|
|
cur->blockCompleted = false;
|
|
|
|
|
|
|
|
|
|
STableBlockInfo* pNext = &pQueryHandle->pDataBlockInfo[cur->slot];
|
|
|
|
|
return loadFileDataBlock(pQueryHandle, pNext->compBlock, pNext->pTableCheckInfo, exists);
|
|
|
|
|
}
|
|
|
|
|
// all data blocks in current file has been checked already, try next file if exists
|
|
|
|
|
} else {
|
|
|
|
|
tsdbDebug("%p continue in current data block, index:%d, pos:%d, %p", pQueryHandle, cur->slot, cur->pos, pQueryHandle->qinfo);
|
|
|
|
|
tsdbDebug("%p continue in current data block, index:%d, pos:%d, %p", pQueryHandle, cur->slot, cur->pos,
|
|
|
|
|
pQueryHandle->qinfo);
|
|
|
|
|
int32_t code = handleDataMergeIfNeeded(pQueryHandle, pBlockInfo->compBlock, pCheckInfo);
|
|
|
|
|
*exists = pQueryHandle->realNumOfRows > 0;
|
|
|
|
|
*exists = (pQueryHandle->realNumOfRows > 0);
|
|
|
|
|
|
|
|
|
|
return code;
|
|
|
|
|
if (code != TSDB_CODE_SUCCESS || *exists) {
|
|
|
|
|
return code;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// current block is empty, try next block in file
|
|
|
|
|
// all data blocks in current file has been checked already, try next file if exists
|
|
|
|
|
if (isEndFileDataBlock(cur, pQueryHandle->numOfBlocks, ASCENDING_TRAVERSE(pQueryHandle->order))) {
|
|
|
|
|
return getFirstFileDataBlock(pQueryHandle, exists);
|
|
|
|
|
} else {
|
|
|
|
|
moveToNextDataBlockInCurrentFile(pQueryHandle);
|
|
|
|
|
STableBlockInfo* pNext = &pQueryHandle->pDataBlockInfo[cur->slot];
|
|
|
|
|
return getDataBlockRv(pQueryHandle, pNext, exists);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|