refactor some codes
This commit is contained in:
parent
9c1db2eff0
commit
56ed56ca00
|
@ -49,8 +49,6 @@ enum {
|
|||
|
||||
#define IS_DISK_DATA_BLOCK(q) ((q)->fileId >= 0)
|
||||
|
||||
// static int32_t copyDataFromMMapBuffer(int fd, SQInfo *pQInfo, SQueryFilesInfo *pQueryFile, char *buf, uint64_t
|
||||
// offset, int32_t size);
|
||||
static int32_t readDataFromDiskFile(int fd, SQInfo *pQInfo, SQueryFilesInfo *pQueryFile, char *buf, uint64_t offset,
|
||||
int32_t size);
|
||||
|
||||
|
@ -70,12 +68,12 @@ static int32_t getNextDataFileCompInfo(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj
|
|||
static void setGroupOutputBuffer(SQueryRuntimeEnv *pRuntimeEnv, SOutputRes *pResult);
|
||||
|
||||
static void getAlignedIntervalQueryRange(SQueryRuntimeEnv *pRuntimeEnv, TSKEY keyInData, TSKEY skey, TSKEY ekey);
|
||||
static void doApplyIntervalQueryOnBlock(SMeterQuerySupportObj *pSupporter, SMeterQueryInfo *pInfo,
|
||||
static void doApplyIntervalQueryOnBlock(SMeterQuerySupportObj *pSupporter, SMeterQueryInfo *pMeterQueryInfo,
|
||||
SBlockInfo *pBlockInfo, int64_t *pPrimaryCol, SField *pFields,
|
||||
__block_search_fn_t searchFn);
|
||||
|
||||
static int32_t saveResult(SMeterQuerySupportObj *pSupporter, SMeterQueryInfo *pMeterQueryInfo, int32_t numOfResult);
|
||||
static void applyIntervalQueryOnBlock(SMeterQuerySupportObj *pSupporter, SMeterDataInfo *pInfoEx, int64_t *pPrimaryData,
|
||||
static void applyIntervalQueryOnBlock(SMeterQuerySupportObj *pSupporter, SMeterDataInfo *pMeterDataInfo,
|
||||
SBlockInfo *pBlockInfo, int32_t blockStatus, SField *pFields,
|
||||
__block_search_fn_t searchFn);
|
||||
|
||||
|
@ -1728,6 +1726,8 @@ int32_t numOfClosedSlidingWindow(SSlidingWindowInfo *pSlidingWindowInfo) {
|
|||
return i;
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
void closeSlidingWindow(SSlidingWindowInfo* pSlidingWindowInfo, int32_t slot) {
|
||||
|
@ -2112,7 +2112,7 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t *
|
|||
SSlidingWindowInfo *pSlidingWindowInfo = &pRuntimeEnv->swindowResInfo;
|
||||
|
||||
// query completed
|
||||
if (lastKey >= pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery) ||
|
||||
if ((lastKey >= pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) ||
|
||||
(lastKey <= pQuery->ekey && !QUERY_IS_ASC_QUERY(pQuery))) {
|
||||
closeAllSlidingWindow(pSlidingWindowInfo);
|
||||
|
||||
|
@ -5027,7 +5027,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
|
|||
static void updatelastkey(SQuery *pQuery, SMeterQueryInfo *pMeterQInfo) { pMeterQInfo->lastKey = pQuery->lastKey; }
|
||||
|
||||
void queryOnBlock(SMeterQuerySupportObj *pSupporter, int64_t *primaryKeys, int32_t blockStatus,
|
||||
SBlockInfo *pBlockBasicInfo, SMeterDataInfo *pDataHeadInfoEx, SField *pFields,
|
||||
SBlockInfo *pBlockBasicInfo, SMeterDataInfo *pMeterDataInfo, SField *pFields,
|
||||
__block_search_fn_t searchFn) {
|
||||
/* cache blocks may be assign to other meter, abort */
|
||||
if (pBlockBasicInfo->size <= 0) {
|
||||
|
@ -5043,18 +5043,17 @@ void queryOnBlock(SMeterQuerySupportObj *pSupporter, int64_t *primaryKeys, int32
|
|||
|
||||
// note: only fixed number of output for each group by operation
|
||||
if (numOfRes > 0) {
|
||||
pSupporter->pResult[pDataHeadInfoEx->groupIdx].numOfRows = numOfRes;
|
||||
pSupporter->pResult[pMeterDataInfo->groupIdx].numOfRows = numOfRes;
|
||||
}
|
||||
|
||||
// used to decide the correct start position in cache after check all data in files
|
||||
updatelastkey(pQuery, pDataHeadInfoEx->pMeterQInfo);
|
||||
updatelastkey(pQuery, pMeterDataInfo->pMeterQInfo);
|
||||
if (pRuntimeEnv->pTSBuf != NULL) {
|
||||
pDataHeadInfoEx->pMeterQInfo->cur = tsBufGetCursor(pRuntimeEnv->pTSBuf);
|
||||
pMeterDataInfo->pMeterQInfo->cur = tsBufGetCursor(pRuntimeEnv->pTSBuf);
|
||||
}
|
||||
|
||||
} else {
|
||||
applyIntervalQueryOnBlock(pSupporter, pDataHeadInfoEx, primaryKeys, pBlockBasicInfo, blockStatus, pFields,
|
||||
searchFn);
|
||||
applyIntervalQueryOnBlock(pSupporter, pMeterDataInfo, pBlockBasicInfo, blockStatus, pFields, searchFn);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -5265,13 +5264,13 @@ static tFilePage *getFilePage(SMeterQuerySupportObj *pSupporter, int32_t pageId)
|
|||
return (tFilePage *)(pSupporter->meterOutputMMapBuf + DEFAULT_INTERN_BUF_SIZE * pageId);
|
||||
}
|
||||
|
||||
static tFilePage *getMeterDataPage(SMeterQuerySupportObj *pSupporter, SMeterDataInfo *pInfoEx, int32_t pageId) {
|
||||
SMeterQueryInfo *pInfo = pInfoEx->pMeterQInfo;
|
||||
if (pageId >= pInfo->numOfPages) {
|
||||
static tFilePage *getMeterDataPage(SMeterQuerySupportObj *pSupporter, SMeterDataInfo *pMeterDataInfo, int32_t pageId) {
|
||||
SMeterQueryInfo *pMeterQueryInfo = pMeterDataInfo->pMeterQInfo;
|
||||
if (pageId >= pMeterQueryInfo->numOfPages) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
int32_t realId = pInfo->pageList[pageId];
|
||||
int32_t realId = pMeterQueryInfo->pageList[pageId];
|
||||
return getFilePage(pSupporter, realId);
|
||||
}
|
||||
|
||||
|
@ -5281,14 +5280,14 @@ typedef struct Position {
|
|||
} Position;
|
||||
|
||||
typedef struct SCompSupporter {
|
||||
SMeterDataInfo ** pInfoEx;
|
||||
SMeterDataInfo ** pMeterDataInfo;
|
||||
Position * pPosition;
|
||||
SMeterQuerySupportObj *pSupporter;
|
||||
} SCompSupporter;
|
||||
|
||||
int64_t getCurrentTimestamp(SCompSupporter *pSupportor, int32_t meterIdx) {
|
||||
Position * pPos = &pSupportor->pPosition[meterIdx];
|
||||
tFilePage *pPage = getMeterDataPage(pSupportor->pSupporter, pSupportor->pInfoEx[meterIdx], pPos->pageIdx);
|
||||
tFilePage *pPage = getMeterDataPage(pSupportor->pSupporter, pSupportor->pMeterDataInfo[meterIdx], pPos->pageIdx);
|
||||
return *(int64_t *)(pPage->data + TSDB_KEYSIZE * pPos->rowIdx);
|
||||
}
|
||||
|
||||
|
@ -5311,10 +5310,10 @@ int32_t meterResultComparator(const void *pLeft, const void *pRight, void *param
|
|||
return -1;
|
||||
}
|
||||
|
||||
tFilePage *pPageLeft = getMeterDataPage(supportor->pSupporter, supportor->pInfoEx[left], leftPos.pageIdx);
|
||||
tFilePage *pPageLeft = getMeterDataPage(supportor->pSupporter, supportor->pMeterDataInfo[left], leftPos.pageIdx);
|
||||
int64_t leftTimestamp = *(int64_t *)(pPageLeft->data + TSDB_KEYSIZE * leftPos.rowIdx);
|
||||
|
||||
tFilePage *pPageRight = getMeterDataPage(supportor->pSupporter, supportor->pInfoEx[right], rightPos.pageIdx);
|
||||
tFilePage *pPageRight = getMeterDataPage(supportor->pSupporter, supportor->pMeterDataInfo[right], rightPos.pageIdx);
|
||||
int64_t rightTimestamp = *(int64_t *)(pPageRight->data + TSDB_KEYSIZE * rightPos.rowIdx);
|
||||
|
||||
if (leftTimestamp == rightTimestamp) {
|
||||
|
@ -5465,17 +5464,17 @@ int32_t doMergeMetersResultsToGroupRes(SMeterQuerySupportObj *pSupporter, SQuery
|
|||
cs.pPosition[pos].pageIdx += 1; // try next page
|
||||
|
||||
// check if current page is empty or not. if it is empty, ignore it and try next
|
||||
if (cs.pPosition[pos].pageIdx <= cs.pInfoEx[pos]->pMeterQInfo->numOfPages - 1) {
|
||||
if (cs.pPosition[pos].pageIdx <= cs.pMeterDataInfo[pos]->pMeterQInfo->numOfPages - 1) {
|
||||
tFilePage *newPage = getMeterDataPage(cs.pSupporter, pValidMeter[pos], position->pageIdx);
|
||||
if (newPage->numOfElems <= 0) {
|
||||
// if current source data page is null, it must be the last page of source output page
|
||||
cs.pPosition[pos].pageIdx += 1;
|
||||
assert(cs.pPosition[pos].pageIdx >= cs.pInfoEx[pos]->pMeterQInfo->numOfPages - 1);
|
||||
assert(cs.pPosition[pos].pageIdx >= cs.pMeterDataInfo[pos]->pMeterQInfo->numOfPages - 1);
|
||||
}
|
||||
}
|
||||
|
||||
// the following code must be executed if current source pages are exhausted
|
||||
if (cs.pPosition[pos].pageIdx >= cs.pInfoEx[pos]->pMeterQInfo->numOfPages) {
|
||||
if (cs.pPosition[pos].pageIdx >= cs.pMeterDataInfo[pos]->pMeterQInfo->numOfPages) {
|
||||
cs.pPosition[pos].pageIdx = -1;
|
||||
cs.pPosition[pos].rowIdx = -1;
|
||||
|
||||
|
@ -5635,7 +5634,7 @@ int32_t doCloseAllOpenedResults(SMeterQuerySupportObj *pSupporter) {
|
|||
void disableFunctForSuppleScan(SQueryRuntimeEnv *pRuntimeEnv, int32_t order) {
|
||||
SQuery *pQuery = pRuntimeEnv->pQuery;
|
||||
|
||||
if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || pQuery->slidingTime > 0 && pQuery->nAggTimeInterval > 0) {
|
||||
if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || (pQuery->slidingTime > 0 && pQuery->nAggTimeInterval > 0)) {
|
||||
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
|
||||
pRuntimeEnv->pCtx[i].order = (pRuntimeEnv->pCtx[i].order) ^ 1;
|
||||
}
|
||||
|
@ -6693,11 +6692,11 @@ void cleanBlockOrderSupporter(SBlockOrderSupporter *pSupporter, int32_t numOfTab
|
|||
|
||||
int32_t createDataBlocksInfoEx(SMeterDataInfo **pMeterDataInfo, int32_t numOfMeters,
|
||||
SMeterDataBlockInfoEx **pDataBlockInfoEx, int32_t numOfCompBlocks,
|
||||
int32_t *nAllocBlocksInfoSize, int64_t addr) {
|
||||
int32_t *numOfAllocBlocks, int64_t addr) {
|
||||
// release allocated memory first
|
||||
freeDataBlockFieldInfo(*pDataBlockInfoEx, *nAllocBlocksInfoSize);
|
||||
freeDataBlockFieldInfo(*pDataBlockInfoEx, *numOfAllocBlocks);
|
||||
|
||||
if (*nAllocBlocksInfoSize == 0 || *nAllocBlocksInfoSize < numOfCompBlocks) {
|
||||
if (*numOfAllocBlocks == 0 || *numOfAllocBlocks < numOfCompBlocks) {
|
||||
char *tmp = realloc((*pDataBlockInfoEx), sizeof(SMeterDataBlockInfoEx) * numOfCompBlocks);
|
||||
if (tmp == NULL) {
|
||||
tfree(*pDataBlockInfoEx);
|
||||
|
@ -6706,7 +6705,7 @@ int32_t createDataBlocksInfoEx(SMeterDataInfo **pMeterDataInfo, int32_t numOfMet
|
|||
|
||||
*pDataBlockInfoEx = (SMeterDataBlockInfoEx *)tmp;
|
||||
memset((*pDataBlockInfoEx), 0, sizeof(SMeterDataBlockInfoEx) * numOfCompBlocks);
|
||||
*nAllocBlocksInfoSize = numOfCompBlocks;
|
||||
*numOfAllocBlocks = numOfCompBlocks;
|
||||
}
|
||||
|
||||
SBlockOrderSupporter supporter = {0};
|
||||
|
@ -6740,14 +6739,14 @@ int32_t createDataBlocksInfoEx(SMeterDataInfo **pMeterDataInfo, int32_t numOfMet
|
|||
supporter.pDataBlockInfoEx[numOfQualMeters] = (SMeterDataBlockInfoEx *)buf;
|
||||
|
||||
for (int32_t k = 0; k < pMeterDataInfo[j]->numOfBlocks; ++k) {
|
||||
SMeterDataBlockInfoEx *pInfoEx = &supporter.pDataBlockInfoEx[numOfQualMeters][k];
|
||||
|
||||
pInfoEx->pBlock.compBlock = &pBlock[k];
|
||||
pInfoEx->pBlock.fields = NULL;
|
||||
|
||||
pInfoEx->pMeterDataInfo = pMeterDataInfo[j];
|
||||
pInfoEx->groupIdx = pMeterDataInfo[j]->groupIdx; // set the group index
|
||||
pInfoEx->blockIndex = pMeterDataInfo[j]->start + k; // set the block index in original meter
|
||||
SMeterDataBlockInfoEx *pBlockInfoEx = &supporter.pDataBlockInfoEx[numOfQualMeters][k];
|
||||
|
||||
pBlockInfoEx->pBlock.compBlock = &pBlock[k];
|
||||
pBlockInfoEx->pBlock.fields = NULL;
|
||||
|
||||
pBlockInfoEx->pMeterDataInfo = pMeterDataInfo[j];
|
||||
pBlockInfoEx->groupIdx = pMeterDataInfo[j]->groupIdx; // set the group index
|
||||
pBlockInfoEx->blockIndex = pMeterDataInfo[j]->start + k; // set the block index in original meter
|
||||
cnt++;
|
||||
}
|
||||
|
||||
|
@ -7013,7 +7012,7 @@ int32_t setIntervalQueryExecutionContext(SMeterQuerySupportObj *pSupporter, int3
|
|||
return 0;
|
||||
}
|
||||
|
||||
static void doApplyIntervalQueryOnBlock(SMeterQuerySupportObj *pSupporter, SMeterQueryInfo *pInfo,
|
||||
static void doApplyIntervalQueryOnBlock(SMeterQuerySupportObj *pSupporter, SMeterQueryInfo *pMeterQueryInfo,
|
||||
SBlockInfo *pBlockInfo, int64_t *pPrimaryCol, SField *pFields,
|
||||
__block_search_fn_t searchFn) {
|
||||
SQueryRuntimeEnv *pRuntimeEnv = &pSupporter->runtimeEnv;
|
||||
|
@ -7029,10 +7028,10 @@ static void doApplyIntervalQueryOnBlock(SMeterQuerySupportObj *pSupporter, SMete
|
|||
assert(steps > 0);
|
||||
|
||||
// NOTE: in case of stable query, only ONE(or ZERO) row of result generated for each query range
|
||||
if (pInfo->lastResRows == 0) {
|
||||
pInfo->lastResRows = numOfRes;
|
||||
if (pMeterQueryInfo->lastResRows == 0) {
|
||||
pMeterQueryInfo->lastResRows = numOfRes;
|
||||
} else {
|
||||
assert(pInfo->lastResRows == 1);
|
||||
assert(pMeterQueryInfo->lastResRows == 1);
|
||||
}
|
||||
|
||||
int32_t pos = pQuery->pos + steps * factor;
|
||||
|
@ -7071,20 +7070,20 @@ static void doApplyIntervalQueryOnBlock(SMeterQuerySupportObj *pSupporter, SMete
|
|||
if (pQuery->lastKey > pSupporter->rawEKey || nextKey > pSupporter->rawEKey) {
|
||||
/* whole query completed, save result and abort */
|
||||
assert(queryCompleted);
|
||||
saveResult(pSupporter, pInfo, pInfo->lastResRows);
|
||||
saveResult(pSupporter, pMeterQueryInfo, pMeterQueryInfo->lastResRows);
|
||||
|
||||
// save the pQuery->lastKey for retrieve data in cache, actually, there will be no qualified data in cache.
|
||||
saveIntervalQueryRange(pRuntimeEnv, pInfo);
|
||||
saveIntervalQueryRange(pRuntimeEnv, pMeterQueryInfo);
|
||||
} else if (pQuery->ekey == pBlockInfo->keyLast) {
|
||||
/* current interval query is completed, set the next query range on other data blocks if exist */
|
||||
int64_t prevEKey = pQuery->ekey;
|
||||
|
||||
getAlignedIntervalQueryRange(pRuntimeEnv, pQuery->lastKey, pSupporter->rawSKey, pSupporter->rawEKey);
|
||||
saveIntervalQueryRange(pRuntimeEnv, pInfo);
|
||||
saveIntervalQueryRange(pRuntimeEnv, pMeterQueryInfo);
|
||||
|
||||
assert(queryCompleted && prevEKey < pQuery->skey);
|
||||
if (pInfo->lastResRows > 0) {
|
||||
saveResult(pSupporter, pInfo, pInfo->lastResRows);
|
||||
if (pMeterQueryInfo->lastResRows > 0) {
|
||||
saveResult(pSupporter, pMeterQueryInfo, pMeterQueryInfo->lastResRows);
|
||||
}
|
||||
} else {
|
||||
/*
|
||||
|
@ -7095,7 +7094,7 @@ static void doApplyIntervalQueryOnBlock(SMeterQuerySupportObj *pSupporter, SMete
|
|||
* With the information of the directly next data block, whether locates in cache or disk,
|
||||
* current interval query being completed or not can be decided.
|
||||
*/
|
||||
saveIntervalQueryRange(pRuntimeEnv, pInfo);
|
||||
saveIntervalQueryRange(pRuntimeEnv, pMeterQueryInfo);
|
||||
assert(pQuery->lastKey > pBlockInfo->keyLast && pQuery->lastKey <= pQuery->ekey);
|
||||
|
||||
/*
|
||||
|
@ -7103,7 +7102,7 @@ static void doApplyIntervalQueryOnBlock(SMeterQuerySupportObj *pSupporter, SMete
|
|||
* merge with other meters in the same group
|
||||
*/
|
||||
if (queryCompleted) {
|
||||
saveResult(pSupporter, pInfo, pInfo->lastResRows);
|
||||
saveResult(pSupporter, pMeterQueryInfo, pMeterQueryInfo->lastResRows);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -7119,23 +7118,23 @@ static void doApplyIntervalQueryOnBlock(SMeterQuerySupportObj *pSupporter, SMete
|
|||
if (pQuery->lastKey < pSupporter->rawEKey || (nextKey < pSupporter->rawEKey && nextKey != -1)) {
|
||||
/* whole query completed, save result and abort */
|
||||
assert(queryCompleted);
|
||||
saveResult(pSupporter, pInfo, pInfo->lastResRows);
|
||||
saveResult(pSupporter, pMeterQueryInfo, pMeterQueryInfo->lastResRows);
|
||||
|
||||
/*
|
||||
* save the pQuery->lastKey for retrieve data in cache, actually,
|
||||
* there will be no qualified data in cache.
|
||||
*/
|
||||
saveIntervalQueryRange(pRuntimeEnv, pInfo);
|
||||
saveIntervalQueryRange(pRuntimeEnv, pMeterQueryInfo);
|
||||
} else if (pQuery->ekey == pBlockInfo->keyFirst) {
|
||||
// current interval query is completed, set the next query range on other data blocks if exist
|
||||
int64_t prevEKey = pQuery->ekey;
|
||||
|
||||
getAlignedIntervalQueryRange(pRuntimeEnv, pQuery->lastKey, pSupporter->rawSKey, pSupporter->rawEKey);
|
||||
saveIntervalQueryRange(pRuntimeEnv, pInfo);
|
||||
saveIntervalQueryRange(pRuntimeEnv, pMeterQueryInfo);
|
||||
|
||||
assert(queryCompleted && prevEKey > pQuery->skey);
|
||||
if (pInfo->lastResRows > 0) {
|
||||
saveResult(pSupporter, pInfo, pInfo->lastResRows);
|
||||
if (pMeterQueryInfo->lastResRows > 0) {
|
||||
saveResult(pSupporter, pMeterQueryInfo, pMeterQueryInfo->lastResRows);
|
||||
}
|
||||
} else {
|
||||
/*
|
||||
|
@ -7146,7 +7145,7 @@ static void doApplyIntervalQueryOnBlock(SMeterQuerySupportObj *pSupporter, SMete
|
|||
* With the information of the directly next data block, whether locates in cache or disk,
|
||||
* current interval query being completed or not can be decided.
|
||||
*/
|
||||
saveIntervalQueryRange(pRuntimeEnv, pInfo);
|
||||
saveIntervalQueryRange(pRuntimeEnv, pMeterQueryInfo);
|
||||
assert(pQuery->lastKey < pBlockInfo->keyFirst && pQuery->lastKey >= pQuery->ekey);
|
||||
|
||||
/*
|
||||
|
@ -7154,7 +7153,7 @@ static void doApplyIntervalQueryOnBlock(SMeterQuerySupportObj *pSupporter, SMete
|
|||
* flag, and merge with other meters in the same group
|
||||
*/
|
||||
if (queryCompleted) {
|
||||
saveResult(pSupporter, pInfo, pInfo->lastResRows);
|
||||
saveResult(pSupporter, pMeterQueryInfo, pMeterQueryInfo->lastResRows);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -7163,14 +7162,14 @@ static void doApplyIntervalQueryOnBlock(SMeterQuerySupportObj *pSupporter, SMete
|
|||
}
|
||||
|
||||
assert(queryCompleted);
|
||||
saveResult(pSupporter, pInfo, pInfo->lastResRows);
|
||||
saveResult(pSupporter, pMeterQueryInfo, pMeterQueryInfo->lastResRows);
|
||||
|
||||
assert((nextKey >= pQuery->lastKey && QUERY_IS_ASC_QUERY(pQuery)) ||
|
||||
(nextKey <= pQuery->lastKey && !QUERY_IS_ASC_QUERY(pQuery)));
|
||||
|
||||
/* still in the same block to query */
|
||||
getAlignedIntervalQueryRange(pRuntimeEnv, nextKey, pSupporter->rawSKey, pSupporter->rawEKey);
|
||||
saveIntervalQueryRange(pRuntimeEnv, pInfo);
|
||||
saveIntervalQueryRange(pRuntimeEnv, pMeterQueryInfo);
|
||||
|
||||
int32_t newPos = searchFn((char *)pPrimaryCol, pBlockInfo->size, pQuery->skey, pQuery->order.order);
|
||||
assert(newPos == pQuery->pos + steps * factor);
|
||||
|
@ -7179,26 +7178,25 @@ static void doApplyIntervalQueryOnBlock(SMeterQuerySupportObj *pSupporter, SMete
|
|||
}
|
||||
}
|
||||
|
||||
static void doApplyIntervalQueryOnBlock_rv(SMeterQuerySupportObj *pSupporter, SMeterQueryInfo *pInfo,
|
||||
static void doApplyIntervalQueryOnBlock_rv(SMeterQuerySupportObj *pSupporter, SMeterQueryInfo *pMeterQueryInfo,
|
||||
SBlockInfo *pBlockInfo, int64_t *pPrimaryCol, SField *pFields,
|
||||
__block_search_fn_t searchFn) {
|
||||
SQueryRuntimeEnv *pRuntimeEnv = &pSupporter->runtimeEnv;
|
||||
SQuery * pQuery = pRuntimeEnv->pQuery;
|
||||
int32_t factor = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
|
||||
|
||||
int64_t nextKey = -1;
|
||||
bool completed = false;
|
||||
|
||||
while (1) {
|
||||
int64_t nextKey = -1;
|
||||
int32_t numOfRes = 0;
|
||||
|
||||
int32_t steps = applyFunctionsOnBlock(pRuntimeEnv, pBlockInfo, pPrimaryCol, pFields, searchFn, &numOfRes);
|
||||
assert(steps > 0);
|
||||
|
||||
// NOTE: in case of stable query, only ONE(or ZERO) row of result generated for each query range
|
||||
if (pInfo->lastResRows == 0) {
|
||||
pInfo->lastResRows = numOfRes;
|
||||
if (pMeterQueryInfo->lastResRows == 0) {
|
||||
pMeterQueryInfo->lastResRows = numOfRes;
|
||||
} else {
|
||||
assert(pInfo->lastResRows == 1);
|
||||
assert(pMeterQueryInfo->lastResRows == 1);
|
||||
}
|
||||
|
||||
int32_t pos = pQuery->pos + steps * factor;
|
||||
|
@ -7212,6 +7210,7 @@ static void doApplyIntervalQueryOnBlock_rv(SMeterQuerySupportObj *pSupporter, SM
|
|||
}
|
||||
|
||||
// all data satisfy current query are checked, query completed
|
||||
bool completed = false;
|
||||
if (QUERY_IS_ASC_QUERY(pQuery)) {
|
||||
completed = (pQuery->lastKey > pQuery->ekey);
|
||||
} else {
|
||||
|
@ -7232,7 +7231,7 @@ static void doApplyIntervalQueryOnBlock_rv(SMeterQuerySupportObj *pSupporter, SM
|
|||
* With the information of the directly next data block, whether locates in cache or disk,
|
||||
* current interval query being completed or not can be decided.
|
||||
*/
|
||||
saveIntervalQueryRange(pRuntimeEnv, pInfo);
|
||||
saveIntervalQueryRange(pRuntimeEnv, pMeterQueryInfo);
|
||||
|
||||
if (QUERY_IS_ASC_QUERY(pQuery)) {
|
||||
assert(pQuery->lastKey > pBlockInfo->keyLast && pQuery->lastKey <= pQuery->ekey);
|
||||
|
@ -7247,36 +7246,28 @@ static void doApplyIntervalQueryOnBlock_rv(SMeterQuerySupportObj *pSupporter, SM
|
|||
|
||||
if (pQuery->ekey == pSupporter->rawEKey) {
|
||||
/* whole query completed, save result and abort */
|
||||
saveResult(pSupporter, pInfo, pInfo->lastResRows);
|
||||
saveResult(pSupporter, pMeterQueryInfo, pMeterQueryInfo->lastResRows);
|
||||
|
||||
// save the pQuery->lastKey for retrieve data in cache, actually, there will be no qualified data in cache.
|
||||
saveIntervalQueryRange(pRuntimeEnv, pInfo);
|
||||
saveIntervalQueryRange(pRuntimeEnv, pMeterQueryInfo);
|
||||
|
||||
return;
|
||||
} else if ((QUERY_IS_ASC_QUERY(pQuery) && pQuery->ekey == pBlockInfo->keyLast) ||
|
||||
(QUERY_IS_ASC_QUERY(pQuery) && pQuery->ekey == pBlockInfo->keyFirst)) {
|
||||
// /* current interval query is completed, set the next query range on other data blocks if exist */
|
||||
// int64_t prevEKey = pQuery->ekey;
|
||||
//
|
||||
// getAlignedIntervalQueryRange(pRuntimeEnv, pQuery->lastKey, pSupporter->rawSKey, pSupporter->rawEKey);
|
||||
saveIntervalQueryRange(pRuntimeEnv, pInfo);
|
||||
//
|
||||
// assert(prevEKey < pQuery->skey);
|
||||
// if (pInfo->lastResRows > 0) {
|
||||
// saveResult(pSupporter, pInfo, pInfo->lastResRows);
|
||||
// }
|
||||
//
|
||||
(!QUERY_IS_ASC_QUERY(pQuery) && pQuery->ekey == pBlockInfo->keyFirst)) {
|
||||
/* current interval query is completed, set the next query range on other data blocks if exist */
|
||||
saveIntervalQueryRange(pRuntimeEnv, pMeterQueryInfo);
|
||||
return;
|
||||
}
|
||||
|
||||
saveResult(pSupporter, pInfo, pInfo->lastResRows);
|
||||
|
||||
saveResult(pSupporter, pMeterQueryInfo, pMeterQueryInfo->lastResRows);
|
||||
|
||||
assert(pos >= 0 && pos < pBlockInfo->size);
|
||||
assert((nextKey >= pQuery->lastKey && QUERY_IS_ASC_QUERY(pQuery)) ||
|
||||
(nextKey <= pQuery->lastKey && !QUERY_IS_ASC_QUERY(pQuery)));
|
||||
|
||||
/* still in the same block to query */
|
||||
getAlignedIntervalQueryRange(pRuntimeEnv, nextKey, pSupporter->rawSKey, pSupporter->rawEKey);
|
||||
saveIntervalQueryRange(pRuntimeEnv, pInfo);
|
||||
saveIntervalQueryRange(pRuntimeEnv, pMeterQueryInfo);
|
||||
|
||||
int32_t newPos = searchFn((char *)pPrimaryCol, pBlockInfo->size, pQuery->skey, pQuery->order.order);
|
||||
assert(newPos == pQuery->pos + steps * factor);
|
||||
|
@ -7691,19 +7682,20 @@ void copyFromGroupBuf(SQInfo *pQInfo, SOutputRes *result) {
|
|||
assert(pQuery->pointsRead <= pQuery->pointsToRead);
|
||||
}
|
||||
|
||||
static void applyIntervalQueryOnBlock(SMeterQuerySupportObj *pSupporter, SMeterDataInfo *pInfoEx, int64_t *pPrimaryData,
|
||||
static void applyIntervalQueryOnBlock(SMeterQuerySupportObj *pSupporter, SMeterDataInfo *pMeterDataInfo,
|
||||
SBlockInfo *pBlockInfo, int32_t blockStatus, SField *pFields,
|
||||
__block_search_fn_t searchFn) {
|
||||
SQueryRuntimeEnv *pRuntimeEnv = &pSupporter->runtimeEnv;
|
||||
SQuery * pQuery = pRuntimeEnv->pQuery;
|
||||
SMeterQueryInfo * pInfo = pInfoEx->pMeterQInfo;
|
||||
SMeterQueryInfo * pMeterQueryInfo = pMeterDataInfo->pMeterQInfo;
|
||||
|
||||
int64_t* pPrimaryKey = (int64_t*) pRuntimeEnv->primaryColBuffer->data;
|
||||
/*
|
||||
* for each block, we need to handle the previous query, since the determination of previous query being completed
|
||||
* or not is based on the start key of current block.
|
||||
*/
|
||||
TSKEY key = getNextAccessedKeyInData(pQuery, pPrimaryData, pBlockInfo, blockStatus);
|
||||
setIntervalQueryRange(pInfoEx->pMeterQInfo, pSupporter, key);
|
||||
TSKEY key = getNextAccessedKeyInData(pQuery, pPrimaryKey, pBlockInfo, blockStatus);
|
||||
setIntervalQueryRange(pMeterDataInfo->pMeterQInfo, pSupporter, key);
|
||||
|
||||
if (((pQuery->skey > pQuery->ekey) && QUERY_IS_ASC_QUERY(pQuery)) ||
|
||||
((pQuery->skey < pQuery->ekey) && !QUERY_IS_ASC_QUERY(pQuery))) {
|
||||
|
@ -7714,18 +7706,18 @@ static void applyIntervalQueryOnBlock(SMeterQuerySupportObj *pSupporter, SMeterD
|
|||
((pBlockInfo->keyFirst > pQuery->ekey) && !QUERY_IS_ASC_QUERY(pQuery))) {
|
||||
int32_t numOfRes = 0;
|
||||
/* current block is included in this interval */
|
||||
int32_t steps = applyFunctionsOnBlock(pRuntimeEnv, pBlockInfo, pPrimaryData, pFields, searchFn, &numOfRes);
|
||||
int32_t steps = applyFunctionsOnBlock(pRuntimeEnv, pBlockInfo, pPrimaryKey, pFields, searchFn, &numOfRes);
|
||||
assert(numOfRes <= 1 && numOfRes >= 0 && steps > 0);
|
||||
|
||||
if (pInfo->lastResRows == 0) {
|
||||
pInfo->lastResRows = numOfRes;
|
||||
if (pMeterQueryInfo->lastResRows == 0) {
|
||||
pMeterQueryInfo->lastResRows = numOfRes;
|
||||
} else {
|
||||
assert(pInfo->lastResRows == 1);
|
||||
assert(pMeterQueryInfo->lastResRows == 1);
|
||||
}
|
||||
|
||||
saveIntervalQueryRange(pRuntimeEnv, pInfo);
|
||||
saveIntervalQueryRange(pRuntimeEnv, pMeterQueryInfo);
|
||||
} else {
|
||||
doApplyIntervalQueryOnBlock(pSupporter, pInfo, pBlockInfo, pPrimaryData, pFields, searchFn);
|
||||
doApplyIntervalQueryOnBlock(pSupporter, pMeterQueryInfo, pBlockInfo, pPrimaryKey, pFields, searchFn);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue