fix bugs found in regression test.

This commit is contained in:
hjxilinx 2020-02-22 00:16:44 +08:00
parent e9dc2db741
commit 54da5e5f7e
4 changed files with 318 additions and 399 deletions

View File

@ -3859,10 +3859,11 @@ int32_t parseFillClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySQL) {
for (int32_t i = numOfFillVal; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) {
TAOS_FIELD* pFields = tscFieldInfoGetField(pQueryInfo, i);
tVariantDump(&lastItem->pVar, (char*)&pQueryInfo->defaultVal[i], pFields->type);
if (pFields->type == TSDB_DATA_TYPE_BINARY || pFields->type == TSDB_DATA_TYPE_NCHAR) {
setNull((char*)(&pQueryInfo->defaultVal[i]), pFields->type, pFields->bytes);
} else {
tVariantDump(&lastItem->pVar, (char*)&pQueryInfo->defaultVal[i], pFields->type);
}
}
}

View File

@ -67,10 +67,9 @@ static int32_t getNextDataFileCompInfo(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj
static void setWindowResOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pResult);
static void resetMergeResultBuf(SQuery *pQuery, SQLFunctionCtx *pCtx);
static void resetMergeResultBuf(SQuery *pQuery, SQLFunctionCtx *pCtx, SResultInfo *pResultInfo);
static int32_t flushFromResultBuf(STableQuerySupportObj *pSupporter, const SQuery *pQuery,
const SQueryRuntimeEnv *pRuntimeEnv);
// static void validateTimestampForSupplementResult(SQueryRuntimeEnv *pRuntimeEnv, int64_t numOfIncrementRes);
static void getBasicCacheInfoSnapshot(SQuery *pQuery, SCacheInfo *pCacheInfo, int32_t vid);
static TSKEY getQueryPositionForCacheInvalid(SQueryRuntimeEnv *pRuntimeEnv, __block_search_fn_t searchFn);
static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, int32_t functionId);
@ -81,6 +80,10 @@ static int32_t getGroupResultId(int32_t groupIndex) {
return base + (groupIndex * 10000);
}
static FORCE_INLINE bool isIntervalQuery(SQuery* pQuery) {
return pQuery->intervalTime > 0;
}
// check the offset value integrity
static FORCE_INLINE int32_t validateHeaderOffsetSegment(SQInfo *pQInfo, char *filePath, int32_t vid, char *data,
int32_t size) {
@ -244,7 +247,7 @@ static void vnodeInitLoadCompBlockInfo(SLoadCompBlockInfo *pCompBlockLoadInfo) {
}
static int32_t vnodeIsDatablockLoaded(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj *pMeterObj, int32_t fileIndex,
bool loadPrimaryTS) {
bool loadTS) {
SQuery * pQuery = pRuntimeEnv->pQuery;
SLoadDataBlockInfo *pLoadInfo = &pRuntimeEnv->loadBlockInfo;
@ -252,7 +255,7 @@ static int32_t vnodeIsDatablockLoaded(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj *
if (pLoadInfo->fileId == pQuery->fileId && pLoadInfo->slotIdx == pQuery->slot && pQuery->slot != -1 &&
pLoadInfo->sid == pMeterObj->sid && pLoadInfo->fileListIndex == fileIndex) {
// previous load operation does not load the primary timestamp column, we only need to load the timestamp column
if (pLoadInfo->tsLoaded == false && pLoadInfo->tsLoaded != loadPrimaryTS) {
if (pLoadInfo->tsLoaded == false && pLoadInfo->tsLoaded != loadTS) {
return DISK_BLOCK_LOAD_TS;
} else {
return DISK_BLOCK_NO_NEED_TO_LOAD;
@ -917,7 +920,7 @@ static int32_t loadDataBlockIntoMem(SCompBlock *pBlock, SField **pField, SQueryR
int32_t ret = 0;
/* the first round always be 1, the secondary round is determined by queried function */
int32_t round = pRuntimeEnv->scanFlag;
int32_t round = (IS_MASTER_SCAN(pRuntimeEnv)) ? 0 : 1;
while (j < pBlock->numOfCols && i < pQuery->numOfCols) {
if ((*pField)[j].colId < pQuery->colList[i].data.colId) {
@ -1005,16 +1008,6 @@ SBlockInfo getBlockBasicInfo(SQueryRuntimeEnv *pRuntimeEnv, void *pBlock, int32_
return blockInfo;
}
// static bool checkQueryRangeAgainstNextBlock(SBlockInfo *pBlockInfo, SQuery *pQuery) {
// if ((QUERY_IS_ASC_QUERY(pQuery) && pBlockInfo->keyFirst > pQuery->ekey) ||
// (!QUERY_IS_ASC_QUERY(pQuery) && pBlockInfo->keyLast < pQuery->ekey)) {
// setQueryStatus(pQuery, QUERY_COMPLETED);
// return false;
// }
//
// return true;
//}
/**
*
* @param pQuery
@ -1108,6 +1101,7 @@ SCacheBlock *getCacheDataBlock(SMeterObj *pMeterObj, SQueryRuntimeEnv *pRuntimeE
return NULL;
}
vnodeFreeFields(pQuery);
getBasicCacheInfoSnapshot(pQuery, pCacheInfo, pMeterObj->vnode);
SCacheBlock *pBlock = pCacheInfo->cacheBlocks[slot];
@ -1621,10 +1615,16 @@ static int32_t getForwardStepsInBlock(int32_t numOfPoints, __block_search_fn_t s
return forwardStep;
}
/**
* NOTE: the query status only set for the first scan of master scan.
*/
static void doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKey, SWindowResInfo *pWindowResInfo) {
SQuery *pQuery = pRuntimeEnv->pQuery;
if (pRuntimeEnv->scanFlag != MASTER_SCAN || (!isIntervalQuery(pQuery))) {
return;
}
if (pQuery->slidingTime > 0 && pQuery->intervalTime > 0 && IS_MASTER_SCAN(pRuntimeEnv)) { // query completed
// query completed
if ((lastKey >= pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) ||
(lastKey <= pQuery->ekey && !QUERY_IS_ASC_QUERY(pQuery))) {
closeAllTimeWindow(pWindowResInfo);
@ -1671,7 +1671,6 @@ static void doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKey,
assert(pWindowResInfo->prevSKey != 0);
}
}
static int32_t getNumOfRowsInTimeWindow(SQuery *pQuery, SBlockInfo *pBlockInfo, TSKEY *pPrimaryColumn, int32_t startPos,
TSKEY ekey, __block_search_fn_t searchFn, bool updateLastKey) {
@ -1798,6 +1797,23 @@ static int32_t getNextQualifiedWindow(SQueryRuntimeEnv *pRuntimeEnv, STimeWindow
}
}
static TSKEY reviseWindowEkey(SQuery* pQuery, STimeWindow* pWindow) {
TSKEY ekey = -1;
if (QUERY_IS_ASC_QUERY(pQuery)) {
ekey = pWindow->ekey;
if (ekey > pQuery->ekey) {
ekey = pQuery->ekey;
}
} else {
ekey = pWindow->skey;
if (ekey < pQuery->ekey) {
ekey = pQuery->ekey;
}
}
return ekey;
}
/**
*
* @param pRuntimeEnv
@ -1847,7 +1863,7 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t
}
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
if (pQuery->intervalTime > 0 && pQuery->slidingTime > 0) {
if (isIntervalQuery(pQuery) && pQuery->slidingTime > 0) {
int32_t offset = GET_COL_DATA_POS(pQuery, 0, step);
TSKEY ts = primaryKeyCol[offset];
@ -1856,19 +1872,7 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t
return 0;
}
TSKEY ekey = -1;
if (QUERY_IS_ASC_QUERY(pQuery)) {
ekey = win.ekey;
if (ekey > pQuery->ekey) {
ekey = pQuery->ekey;
}
} else {
ekey = win.skey;
if (ekey < pQuery->ekey) {
ekey = pQuery->ekey;
}
}
TSKEY ekey = reviseWindowEkey(pQuery, &win);
forwardStep = getNumOfRowsInTimeWindow(pQuery, pBlockInfo, primaryKeyCol, pQuery->pos, ekey, searchFn, true);
SWindowStatus *pStatus = getTimeWindowResStatus(pWindowResInfo, curTimeWindow(pWindowResInfo));
@ -1890,19 +1894,7 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t
break;
}
ekey = -1;
if (QUERY_IS_ASC_QUERY(pQuery)) {
ekey = nextWin.ekey;
if (ekey > pQuery->ekey) {
ekey = pQuery->ekey;
}
} else {
ekey = nextWin.skey;
if (ekey < pQuery->ekey) {
ekey = pQuery->ekey;
}
}
ekey = reviseWindowEkey(pQuery, &nextWin);
forwardStep = getNumOfRowsInTimeWindow(pQuery, pBlockInfo, primaryKeyCol, startPos, ekey, searchFn, true);
pStatus = getTimeWindowResStatus(pWindowResInfo, curTimeWindow(pWindowResInfo));
@ -1929,7 +1921,7 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t
* because the results of group by normal column is put into intermediate buffer.
*/
int32_t num = 0;
if (pQuery->intervalTime == 0) {
if (!isIntervalQuery(pQuery)) {
num = getNumOfResult(pRuntimeEnv) - prevNumOfRes;
}
@ -2103,7 +2095,8 @@ void clearFirstNTimeWindow(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) {
SWindowResult *pResult = &pWindowResInfo->pResult[k];
int32_t *p = (int32_t *)taosGetDataFromHashTable(pWindowResInfo->hashList, (const char *)&pResult->window.skey,
TSDB_KEYSIZE);
int32_t v = (*p - remain);
int32_t v = (*p - num);
assert(v >= 0 && v <= pWindowResInfo->size);
// todo add the update function for hash table
taosDeleteFromHashTable(pWindowResInfo->hashList, (const char *)&pResult->window.skey, TSDB_KEYSIZE);
@ -2238,7 +2231,7 @@ static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx
}
// in the supplementary scan, only the following functions need to be executed
if (!IS_MASTER_SCAN(pRuntimeEnv) &&
if (IS_SUPPLEMENT_SCAN(pRuntimeEnv) &&
!(functionId == TSDB_FUNC_LAST_DST || functionId == TSDB_FUNC_FIRST_DST || functionId == TSDB_FUNC_FIRST ||
functionId == TSDB_FUNC_LAST || functionId == TSDB_FUNC_TAG || functionId == TSDB_FUNC_TS)) {
return false;
@ -2327,8 +2320,8 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t *
continue;
}
// sliding window query
if (pQuery->slidingTime > 0 && pQuery->intervalTime > 0) {
// interval window query
if (isIntervalQuery(pQuery)) {
// decide the time window according to the primary timestamp
int64_t ts = primaryKeyCol[offset];
STimeWindow win = getActiveTimeWindow(pWindowResInfo, ts, pQuery);
@ -2381,6 +2374,9 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t *
}
}
// update the lastKey
lastKey = primaryKeyCol[offset];
// all startOffset are identical
offset -= pCtx[0].startOffset;
@ -2393,6 +2389,7 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t *
}
if (pRuntimeEnv->pTSBuf != NULL) {
// if timestamp filter list is empty, quit current query
if (!tsBufNextPos(pRuntimeEnv->pTSBuf)) {
setQueryStatus(pQuery, QUERY_NO_DATA_TO_CHECK);
@ -2418,7 +2415,7 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t *
* because the results of group by normal column is put into intermediate buffer.
*/
int32_t num = 0;
if (!groupbyStateValue && !(pQuery->intervalTime > 0 && pQuery->slidingTime > 0)) {
if (!groupbyStateValue && !(isIntervalQuery(pQuery) && pQuery->slidingTime > 0)) {
num = getNumOfResult(pRuntimeEnv) - prevNumOfRes;
}
@ -2470,10 +2467,12 @@ static void validateQueryRangeAndData(SQueryRuntimeEnv *pRuntimeEnv, const TSKEY
!QUERY_IS_ASC_QUERY(pQuery)));
}
static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SBlockInfo *pBlockInfo,
int64_t *pPrimaryColumn, SField *pFields, __block_search_fn_t searchFn,
int32_t *numOfRes, SWindowResInfo *pWindowResInfo) {
static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SBlockInfo *pBlockInfo, SField *pFields,
__block_search_fn_t searchFn, int32_t *numOfRes,
SWindowResInfo *pWindowResInfo) {
SQuery *pQuery = pRuntimeEnv->pQuery;
TSKEY * pPrimaryColumn = (TSKEY *)pRuntimeEnv->primaryColBuffer->data;
validateQueryRangeAndData(pRuntimeEnv, pPrimaryColumn, pBlockInfo);
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
@ -2499,8 +2498,9 @@ static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SBlockI
doCheckQueryCompleted(pRuntimeEnv, lastKey, pWindowResInfo);
// interval query with limit applied
if (pQuery->intervalTime > 0 && pQuery->limit.limit > 0 &&
pQuery->limit.limit <= numOfClosedTimeWindow(pWindowResInfo)) {
if (isIntervalQuery(pQuery) && pQuery->limit.limit > 0 &&
(pQuery->limit.limit + pQuery->limit.offset) <= numOfClosedTimeWindow(pWindowResInfo) &&
pRuntimeEnv->scanFlag == MASTER_SCAN) {
setQueryStatus(pQuery, QUERY_COMPLETED);
}
@ -2723,6 +2723,12 @@ static void setCtxTagColumnInfo(SQuery *pQuery, SQLFunctionCtx *pCtx) {
}
}
static void setWindowResultInfo(SResultInfo *pResultInfo, SQuery *pQuery, bool isStableQuery) {
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
setResultInfoBuf(&pResultInfo[i], pQuery->pSelectExpr[i].interResBytes, isStableQuery);
}
}
static int32_t setupQueryRuntimeEnv(SMeterObj *pMeterObj, SQuery *pQuery, SQueryRuntimeEnv *pRuntimeEnv,
SColumnModel *pTagsSchema, int16_t order, bool isSTableQuery) {
dTrace("QInfo:%p setup runtime env", GET_QINFO_ADDR(pQuery));
@ -2791,11 +2797,10 @@ static int32_t setupQueryRuntimeEnv(SMeterObj *pMeterObj, SQuery *pQuery, SQuery
if (i > 0) {
pRuntimeEnv->offset[i] = pRuntimeEnv->offset[i - 1] + pRuntimeEnv->pCtx[i - 1].outputBytes;
}
}
// set the intermediate result output buffer
SResultInfo *pResInfo = &pRuntimeEnv->resultInfo[i];
setResultInfoBuf(pResInfo, pQuery->pSelectExpr[i].interResBytes, isSTableQuery);
}
setWindowResultInfo(pRuntimeEnv->resultInfo, pQuery, isSTableQuery);
// if it is group by normal column, do not set output buffer, the output buffer is pResult
if (!isGroupbyNormalCol(pQuery->pGroupbyExpr) && !isSTableQuery) {
@ -4074,7 +4079,7 @@ static bool forwardQueryStartPosIfNeeded(SQInfo *pQInfo, STableQuerySupportObj *
* pQuery->limit.offset times. Since hole exists, pQuery->intervalTime*pQuery->limit.offset value is
* not valid. otherwise, we only forward pQuery->limit.offset number of points
*/
if (pQuery->intervalTime > 0) {
if (isIntervalQuery(pQuery)) {
int16_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
__block_search_fn_t searchFn = vnodeSearchKeyFunc[pRuntimeEnv->pMeterObj->searchAlgorithm];
SWindowResInfo * pWindowResInfo = &pRuntimeEnv->windowResInfo;
@ -4375,9 +4380,9 @@ void pointInterpSupporterDestroy(SPointInterpoSupporter *pPointInterpSupport) {
static void allocMemForInterpo(STableQuerySupportObj *pSupporter, SQuery *pQuery, SMeterObj *pMeterObj) {
if (pQuery->interpoType != TSDB_INTERPO_NONE) {
assert(pQuery->intervalTime > 0 || (pQuery->intervalTime == 0 && isPointInterpoQuery(pQuery)));
assert(isIntervalQuery(pQuery) || (pQuery->intervalTime == 0 && isPointInterpoQuery(pQuery)));
if (pQuery->intervalTime > 0) {
if (isIntervalQuery(pQuery)) {
pSupporter->runtimeEnv.pInterpoBuf = malloc(POINTER_BYTES * pQuery->numOfOutputCols);
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
@ -4395,7 +4400,7 @@ static int32_t getInitialPageNum(STableQuerySupportObj *pSupporter) {
if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) {
num = 128;
} else if (pQuery->intervalTime > 0) { // time window query, allocate one page for each table
} else if (isIntervalQuery(pQuery)) { // time window query, allocate one page for each table
num = pSupporter->numOfMeters;
} else { // for super table query, one page for each subset
num = pSupporter->pSidSet->numOfSubSet;
@ -4560,7 +4565,7 @@ int32_t vnodeQueryTablePrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, STableQuery
vnodeRecordAllFiles(pQInfo, pMeterObj->vnode);
pRuntimeEnv->numOfRowsPerPage = getNumOfRowsInResultPage(pQuery, false);
if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || (pQuery->intervalTime > 0 && pQuery->slidingTime > 0)) {
if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || (isIntervalQuery(pQuery) && pQuery->slidingTime > 0)) {
int32_t rows = getInitialPageNum(pSupporter);
code = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, rows, pQuery->rowSize);
@ -4602,16 +4607,15 @@ int32_t vnodeQueryTablePrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, STableQuery
return TSDB_CODE_SUCCESS;
}
} else { // find the skey and ekey in case of sliding query
if (pQuery->slidingTime > 0 && pQuery->intervalTime > 0) {
if (isIntervalQuery(pQuery)) {
STimeWindow win = {0};
// getActualRange(pSupporter, &win);
//
// // there is no qualified data with respect to the primary timestamp
// if (win.skey > win.ekey) {
// sem_post(&pQInfo->dataReady);
// pQInfo->over = 1;
// return TSDB_CODE_SUCCESS;
// }
// find the minimum value for descending order query
TSKEY minKey = -1;
if (!QUERY_IS_ASC_QUERY(pQuery)) {
minKey = getGreaterEqualTimestamp(pRuntimeEnv);
}
int64_t skey = 0;
if ((normalizedFirstQueryRange(dataInDisk, dataInCache, pSupporter, &interpInfo, &skey) == false) ||
(isFixedOutputQuery(pQuery) && !isTopBottomQuery(pQuery) && (pQuery->limit.offset > 0)) ||
@ -4624,7 +4628,7 @@ int32_t vnodeQueryTablePrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, STableQuery
}
if (!QUERY_IS_ASC_QUERY(pQuery)) {
win.skey = getGreaterEqualTimestamp(pRuntimeEnv);
win.skey = minKey;
win.ekey = skey;
} else {
win.skey = skey;
@ -4704,9 +4708,9 @@ void vnodeQueryFreeQInfoEx(SQInfo *pQInfo) {
}
if (pSupporter->pSidSet != NULL || isGroupbyNormalCol(pQInfo->query.pGroupbyExpr) ||
(pQuery->intervalTime > 0 && pQuery->slidingTime > 0)) {
(isIntervalQuery(pQuery) && pQuery->slidingTime > 0)) {
int32_t size = 0;
if (isGroupbyNormalCol(pQInfo->query.pGroupbyExpr) || (pQuery->intervalTime > 0 && pQuery->slidingTime > 0)) {
if (isGroupbyNormalCol(pQInfo->query.pGroupbyExpr) || (isIntervalQuery(pQuery) && pQuery->slidingTime > 0)) {
size = 10000;
} else if (pSupporter->pSidSet != NULL) {
size = pSupporter->pSidSet->numOfSubSet;
@ -5139,31 +5143,39 @@ static int32_t moveToNextBlock(SQueryRuntimeEnv *pRuntimeEnv, int32_t step, __bl
return DISK_DATA_LOADED;
}
static void doHandleDataBlockImpl(SQueryRuntimeEnv *pRuntimeEnv, SBlockInfo *pblockInfo, __block_search_fn_t searchFn,
int32_t *numOfRes, int32_t blockLoadStatus, int32_t *forwardStep) {
static int32_t doHandleDataBlockImpl(SQueryRuntimeEnv *pRuntimeEnv, SBlockInfo *pBlockInfo,
__block_search_fn_t searchFn, int32_t blockLoadStatus, int32_t *forwardStep) {
SQuery * pQuery = pRuntimeEnv->pQuery;
SQueryCostSummary *pSummary = &pRuntimeEnv->summary;
int32_t numOfRes = 0;
if (IS_DISK_DATA_BLOCK(pQuery) && blockLoadStatus != DISK_DATA_LOADED) {
*forwardStep = pBlockInfo->size;
return numOfRes;
}
SField *pFields = NULL;
if (IS_DISK_DATA_BLOCK(pQuery)) {
pFields = pQuery->pFields[pQuery->slot];
} else { // in case of cache data block, no need to load operation
assert(vnodeIsDatablockLoaded(pRuntimeEnv, pRuntimeEnv->pMeterObj, -1, true) == DISK_BLOCK_NO_NEED_TO_LOAD);
pFields = NULL;
}
TSKEY * primaryKeys = (TSKEY *)pRuntimeEnv->primaryColBuffer->data;
int64_t start = taosGetTimestampUs();
*pblockInfo = getBlockInfo(pRuntimeEnv);
*forwardStep =
tableApplyFunctionsOnBlock(pRuntimeEnv, pBlockInfo, pFields, searchFn, &numOfRes, &pRuntimeEnv->windowResInfo);
int64_t elapsedTime = taosGetTimestampUs() - start;
if (IS_DISK_DATA_BLOCK(pQuery)) {
if (blockLoadStatus == DISK_DATA_LOADED) {
*forwardStep = tableApplyFunctionsOnBlock(pRuntimeEnv, pblockInfo, primaryKeys, pQuery->pFields[pQuery->slot],
searchFn, numOfRes, &pRuntimeEnv->windowResInfo);
pSummary->fileTimeUs += elapsedTime;
} else {
*forwardStep = pblockInfo->size;
pSummary->cacheTimeUs += elapsedTime;
}
pSummary->fileTimeUs += (taosGetTimestampUs() - start);
} else {
assert(vnodeIsDatablockLoaded(pRuntimeEnv, pRuntimeEnv->pMeterObj, -1, true) == DISK_BLOCK_NO_NEED_TO_LOAD);
*forwardStep = tableApplyFunctionsOnBlock(pRuntimeEnv, pblockInfo, primaryKeys, NULL, searchFn, numOfRes,
&pRuntimeEnv->windowResInfo);
pSummary->cacheTimeUs += (taosGetTimestampUs() - start);
}
return numOfRes;
}
// previous time window may not be of the same size of pQuery->intervalTime
@ -5176,13 +5188,11 @@ static void getNextTimeWindow(SQuery *pQuery, STimeWindow *pTimeWindow) {
static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
SQuery * pQuery = pRuntimeEnv->pQuery;
bool LOAD_DATA = true;
int32_t forwardStep = 0;
int64_t cnt = 0;
SMeterObj *pMeterObj = pRuntimeEnv->pMeterObj;
bool LOAD_DATA = true;
int64_t cnt = 0;
__block_search_fn_t searchFn = vnodeSearchKeyFunc[pMeterObj->searchAlgorithm];
int32_t blockLoadStatus = DISK_DATA_LOADED;
@ -5204,9 +5214,9 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
return cnt;
}
int32_t numOfRes = 0;
SBlockInfo blockInfo = {0};
doHandleDataBlockImpl(pRuntimeEnv, &blockInfo, searchFn, &numOfRes, blockLoadStatus, &forwardStep);
int32_t forwardStep = 0;
SBlockInfo blockInfo = getBlockInfo(pRuntimeEnv);
/*int32_t numOfRes = */ doHandleDataBlockImpl(pRuntimeEnv, &blockInfo, searchFn, blockLoadStatus, &forwardStep);
dTrace("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64
", fileId:%d, slot:%d, pos:%d, bstatus:%d, rows:%d, checked:%d",
@ -5261,7 +5271,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
} // while(1)
if (pQuery->intervalTime > 0) {
if (isIntervalQuery(pQuery)) {
if (Q_STATUS_EQUAL(pQuery->over, QUERY_COMPLETED | QUERY_NO_DATA_TO_CHECK)) {
closeAllTimeWindow(&pRuntimeEnv->windowResInfo);
} else if (Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL)) { // check if window needs to be closed
@ -5331,8 +5341,7 @@ void vnodeSetTagValueInParam(tSidSet *pSidSet, SQueryRuntimeEnv *pRuntimeEnv, SM
}
}
static void doMerge(SQueryRuntimeEnv *pRuntimeEnv, int64_t timestamp, SWindowResult* pWindowRes, /*int32_t inputIdx,*/
bool mergeFlag) {
static void doMerge(SQueryRuntimeEnv *pRuntimeEnv, int64_t timestamp, SWindowResult *pWindowRes, bool mergeFlag) {
SQuery * pQuery = pRuntimeEnv->pQuery;
SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx;
@ -5481,57 +5490,54 @@ void UNUSED_FUNC displayInterResult(SData **pdata, SQuery *pQuery, int32_t numOf
}
}
static tFilePage *getMeterDataPage(SQueryDiskbasedResultBuf *pResultBuf, SMeterQueryInfo *pMeterQueryInfo,
int32_t index) {
SIDList pList = getDataBufPagesIdList(pResultBuf, pMeterQueryInfo->sid);
return getResultBufferPageById(pResultBuf, pList.pData[index]);
}
// static tFilePage *getMeterDataPage(SQueryDiskbasedResultBuf *pResultBuf, SMeterQueryInfo *pMeterQueryInfo,
// int32_t index) {
// SIDList pList = getDataBufPagesIdList(pResultBuf, pMeterQueryInfo->sid);
// return getResultBufferPageById(pResultBuf, pList.pData[index]);
//}
typedef struct Position {
int32_t pageIdx;
int32_t rowIdx;
} Position;
// typedef struct Position {
// int32_t pageIdx;
// int32_t rowIdx;
//} Position;
typedef struct SCompSupporter {
SMeterDataInfo ** pMeterDataInfo;
Position * pPosition;
int32_t * position;
STableQuerySupportObj *pSupporter;
} SCompSupporter;
int64_t getCurrentTimestamp(SCompSupporter *pSupportor, int32_t meterIdx) {
Position * pPos = &pSupportor->pPosition[meterIdx];
tFilePage *pPage = getMeterDataPage(pSupportor->pSupporter->runtimeEnv.pResultBuf,
pSupportor->pMeterDataInfo[meterIdx]->pMeterQInfo, pPos->pageIdx);
return *(int64_t *)(pPage->data + TSDB_KEYSIZE * pPos->rowIdx);
}
int32_t tableResultComparFn(const void *pLeft, const void *pRight, void *param) {
int32_t left = *(int32_t *)pLeft;
int32_t right = *(int32_t *)pRight;
SCompSupporter * supporter = (SCompSupporter *)param;
SQueryDiskbasedResultBuf *pResultBuf = supporter->pSupporter->runtimeEnv.pResultBuf;
SQueryRuntimeEnv *pRuntimeEnv = &supporter->pSupporter->runtimeEnv;
Position leftPos = supporter->pPosition[left];
Position rightPos = supporter->pPosition[right];
int32_t leftPos = supporter->position[left];
int32_t rightPos = supporter->position[right];
/* left source is exhausted */
if (leftPos.pageIdx == -1 && leftPos.rowIdx == -1) {
if (leftPos == -1) {
return 1;
}
/* right source is exhausted*/
if (rightPos.pageIdx == -1 && rightPos.rowIdx == -1) {
if (rightPos == -1) {
return -1;
}
//!!!!!
tFilePage *pPageLeft = getMeterDataPage(pResultBuf, supporter->pMeterDataInfo[left]->pMeterQInfo, leftPos.pageIdx);
int64_t leftTimestamp = *(int64_t *)(pPageLeft->data + TSDB_KEYSIZE * leftPos.rowIdx);
SWindowResInfo *pWindowResInfo1 = &supporter->pMeterDataInfo[left]->pMeterQInfo->windowResInfo;
SWindowResult * pWindowRes1 = getWindowResult(pWindowResInfo1, leftPos);
tFilePage *pPageRight = getMeterDataPage(pResultBuf, supporter->pMeterDataInfo[right]->pMeterQInfo, rightPos.pageIdx);
int64_t rightTimestamp = *(int64_t *)(pPageRight->data + TSDB_KEYSIZE * rightPos.rowIdx);
char *b1 = getPosInResultPage(pRuntimeEnv, PRIMARYKEY_TIMESTAMP_COL_INDEX, pWindowRes1);
TSKEY leftTimestamp = GET_INT64_VAL(b1);
SWindowResInfo *pWindowResInfo2 = &supporter->pMeterDataInfo[right]->pMeterQInfo->windowResInfo;
SWindowResult * pWindowRes2 = getWindowResult(pWindowResInfo2, rightPos);
char *b2 = getPosInResultPage(pRuntimeEnv, PRIMARYKEY_TIMESTAMP_COL_INDEX, pWindowRes2);
TSKEY rightTimestamp = GET_INT64_VAL(b2);
if (leftTimestamp == rightTimestamp) {
return 0;
@ -5626,7 +5632,6 @@ void copyResToQueryResultBuf(STableQuerySupportObj *pSupporter, SQuery *pQuery)
int64_t getNumOfResultWindowRes(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pWindowRes) {
SQuery *pQuery = pRuntimeEnv->pQuery;
// bool hasMainFunction = hasMainOutput(pQuery);
int64_t maxOutput = 0;
for (int32_t j = 0; j < pQuery->numOfOutputCols; ++j) {
@ -5636,8 +5641,7 @@ int64_t getNumOfResultWindowRes(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult* pW
* ts, tag, tagprj function can not decide the output number of current query
* the number of output result is decided by main output
*/
if (/*hasMainFunction &&*/
(functionId == TSDB_FUNC_TS || functionId == TSDB_FUNC_TAG || functionId == TSDB_FUNC_TAGPRJ)) {
if (functionId == TSDB_FUNC_TS || functionId == TSDB_FUNC_TAG || functionId == TSDB_FUNC_TAGPRJ) {
continue;
}
@ -5652,29 +5656,18 @@ int64_t getNumOfResultWindowRes(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult* pW
int32_t doMergeMetersResultsToGroupRes(STableQuerySupportObj *pSupporter, SQuery *pQuery, SQueryRuntimeEnv *pRuntimeEnv,
SMeterDataInfo *pMeterDataInfo, int32_t start, int32_t end) {
// calculate the maximum required space
if (pSupporter->groupResultSize == 0) {
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
pSupporter->groupResultSize += sizeof(tFilePage) + pQuery->pointsToRead * pRuntimeEnv->pCtx[i].outputBytes;
}
}
tFilePage ** buffer = (tFilePage **)pQuery->sdata;
Position * posList = calloc(1, sizeof(Position) * (end - start));
int32_t * posList = calloc((end - start), sizeof(int32_t));
SMeterDataInfo **pTableList = malloc(POINTER_BYTES * (end - start));
// todo opt for the case of one table per group
int32_t numOfMeters = 0;
for (int32_t i = start; i < end; ++i) {
int32_t sid = pMeterDataInfo[i].pMeterQInfo->sid;
SIDList list = getDataBufPagesIdList(pRuntimeEnv->pResultBuf, sid);
SIDList list = getDataBufPagesIdList(pRuntimeEnv->pResultBuf, sid);
if (list.size > 0 && pMeterDataInfo[i].pMeterQInfo->windowResInfo.size > 0) {
pTableList[numOfMeters] = &pMeterDataInfo[i];
// set the merge start position: page:0, index:0
posList[numOfMeters].pageIdx = 0;
posList[numOfMeters].rowIdx = 0;
numOfMeters += 1;
}
}
@ -5682,43 +5675,40 @@ int32_t doMergeMetersResultsToGroupRes(STableQuerySupportObj *pSupporter, SQuery
if (numOfMeters == 0) {
tfree(posList);
tfree(pTableList);
assert(pSupporter->numOfGroupResultPages == 0);
return 0;
}
SCompSupporter cs = {pTableList, posList, pSupporter};
SLoserTreeInfo *pTree = NULL;
SLoserTreeInfo *pTree = NULL;
tLoserTreeCreate(&pTree, numOfMeters, &cs, tableResultComparFn);
SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx;
resetMergeResultBuf(pQuery, pCtx);
SResultInfo *pResultInfo = calloc(pQuery->numOfOutputCols, sizeof(SResultInfo));
setWindowResultInfo(pResultInfo, pQuery, pRuntimeEnv->stableQuery);
resetMergeResultBuf(pQuery, pRuntimeEnv->pCtx, pResultInfo);
int64_t lastTimestamp = -1;
int64_t startt = taosGetTimestampMs();
while (1) {//todo add iterator
while (1) {
int32_t pos = pTree->pNode[0].index;
Position * position = &cs.pPosition[pos];
// SQueryDiskbasedResultBuf *pResultBuf = cs.pSupporter->runtimeEnv.pResultBuf;
// tFilePage *pPage = getMeterDataPage(pResultBuf, pTableList[pos]->pMeterQInfo, position->pageIdx);
SWindowResInfo *pWindowResInfo = &pTableList[pos]->pMeterQInfo->windowResInfo;
SWindowResult* pWindowRes = getWindowResult(pWindowResInfo, position->rowIdx);
SWindowResult * pWindowRes = getWindowResult(pWindowResInfo, cs.position[pos]);
char *b = getPosInResultPage(pRuntimeEnv, PRIMARYKEY_TIMESTAMP_COL_INDEX, pWindowRes);
TSKEY ts = GET_INT64_VAL(b);
// int64_t ts = getCurrentTimestamp(&cs, pos);
assert(ts > 0 && ts == pWindowRes->window.skey);
int64_t num = getNumOfResultWindowRes(pRuntimeEnv, pWindowRes);
if (num <= 0) {
cs.pPosition[pos].rowIdx += 1;
cs.position[pos] += 1;
if (cs.pPosition[pos].rowIdx >= pWindowResInfo->size) {
cs.pPosition[pos].rowIdx = -1;
if (cs.position[pos] >= pWindowResInfo->size) {
cs.position[pos] = -1;
// all input sources are exhausted
if (--numOfMeters == 0) {
@ -5727,63 +5717,30 @@ int32_t doMergeMetersResultsToGroupRes(STableQuerySupportObj *pSupporter, SQuery
}
} else {
if (ts == lastTimestamp) { // merge with the last one
doMerge(pRuntimeEnv, ts, pWindowRes, /*position->rowIdx,*/ true);
} else {
// copy data to disk buffer
doMerge(pRuntimeEnv, ts, pWindowRes, true);
} else { // copy data to disk buffer
if (buffer[0]->numOfElems == pQuery->pointsToRead) {
if (flushFromResultBuf(pSupporter, pQuery, pRuntimeEnv) != TSDB_CODE_SUCCESS) {
return -1;
}
resetMergeResultBuf(pQuery, pCtx);
resetMergeResultBuf(pQuery, pRuntimeEnv->pCtx, pResultInfo);
}
// pPage = getMeterDataPage(pResultBuf, pTableList[pos]->pMeterQInfo, position->pageIdx);
// if (pPage->numOfElems <= 0) { // current source data page is empty
// do nothing
// } else {
doMerge(pRuntimeEnv, ts, pWindowRes, /*position->rowIdx,*/ false);
doMerge(pRuntimeEnv, ts, pWindowRes, false);
buffer[0]->numOfElems += 1;
// }
}
lastTimestamp = ts;
if (cs.pPosition[pos].rowIdx >= pWindowResInfo->size) {
cs.pPosition[pos].rowIdx = -1;
cs.position[pos] += 1;
if (cs.position[pos] >= pWindowResInfo->size) {
cs.position[pos] = -1;
// all input sources are exhausted
if (--numOfMeters == 0) {
break;
}
// if (cs.pPosition[pos].rowIdx >= pPage->numOfElems - 1) {
// cs.pPosition[pos].rowIdx = 0;
// cs.pPosition[pos].pageIdx += 1; // try next page
//
// // check if current page is empty or not. if it is empty, ignore it and try next
// SIDList list = getDataBufPagesIdList(pRuntimeEnv->pResultBuf, cs.pMeterDataInfo[pos]->pMeterQInfo->sid);
// if (cs.pPosition[pos].pageIdx <= list.size - 1) {
// tFilePage *newPage = getMeterDataPage(pResultBuf, pTableList[pos]->pMeterQInfo, position->pageIdx);
//
// // if current source data page is null, it must be the last page of source output page
// if (newPage->numOfElems <= 0) {
// cs.pPosition[pos].pageIdx += 1;
// assert(cs.pPosition[pos].pageIdx >= list.size - 1);
// }
// }
//
// // the following code must be executed if current source pages are exhausted
// if (cs.pPosition[pos].pageIdx >= list.size) {
// cs.pPosition[pos].pageIdx = -1;
// cs.pPosition[pos].rowIdx = -1;
//
// // all input sources are exhausted
// if (--numOfMeters == 0) {
// break;
// }
// }
} else {
cs.pPosition[pos].rowIdx += 1;
}
}
@ -5797,6 +5754,7 @@ int32_t doMergeMetersResultsToGroupRes(STableQuerySupportObj *pSupporter, SQuery
tfree(pTree);
tfree(pTableList);
tfree(posList);
tfree(pResultInfo);
return -1;
}
@ -5812,6 +5770,7 @@ int32_t doMergeMetersResultsToGroupRes(STableQuerySupportObj *pSupporter, SQuery
tfree(pTree);
tfree(pTableList);
tfree(posList);
tfree(pResultInfo);
pSupporter->offset = 0;
@ -5855,11 +5814,13 @@ int32_t flushFromResultBuf(STableQuerySupportObj *pSupporter, const SQuery *pQue
return TSDB_CODE_SUCCESS;
}
void resetMergeResultBuf(SQuery *pQuery, SQLFunctionCtx *pCtx) {
void resetMergeResultBuf(SQuery *pQuery, SQLFunctionCtx *pCtx, SResultInfo *pResultInfo) {
for (int32_t k = 0; k < pQuery->numOfOutputCols; ++k) {
pCtx[k].aOutputBuf = pQuery->sdata[k]->data - pCtx[k].outputBytes;
pCtx[k].size = 1;
pCtx[k].startOffset = 0;
pCtx[k].resultInfo = &pResultInfo[k];
pQuery->sdata[k]->len = 0;
}
}
@ -5873,7 +5834,7 @@ void setMeterDataInfo(SMeterDataInfo *pMeterDataInfo, SMeterObj *pMeterObj, int3
void disableFunctForSuppleScan(SQueryRuntimeEnv *pRuntimeEnv, int32_t order) {
SQuery *pQuery = pRuntimeEnv->pQuery;
if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || (pQuery->slidingTime > 0 && pQuery->intervalTime > 0)) {
if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || (pQuery->slidingTime > 0 && isIntervalQuery(pQuery))) {
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
pRuntimeEnv->pCtx[i].order = (pRuntimeEnv->pCtx[i].order) ^ 1;
}
@ -5936,11 +5897,8 @@ void createQueryResultInfo(SQuery *pQuery, SWindowResult *pResultRow, bool isSTa
pResultRow->resultInfo = calloc((size_t)numOfCols, sizeof(SResultInfo));
pResultRow->pos = *posInfo;
for (int32_t i = 0; i < numOfCols; ++i) {
SResultInfo *pResultInfo = &pResultRow->resultInfo[i];
size_t size = pQuery->pSelectExpr[i].interResBytes;
setResultInfoBuf(pResultInfo, (int32_t)size, isSTableQuery);
}
// set the intermediate result output buffer
setWindowResultInfo(pResultRow->resultInfo, pQuery, isSTableQuery);
}
void clearTimeWindowResBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pWindowRes) {
@ -6116,24 +6074,17 @@ typedef struct SQueryStatus {
SPositionInfo start;
SPositionInfo next;
SPositionInfo end;
TSKEY skey;
TSKEY ekey;
int8_t overStatus;
TSKEY lastKey;
STSCursor cur;
} SQueryStatus;
// todo refactor
static void queryStatusSave(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatus *pStatus) {
SQuery *pQuery = pRuntimeEnv->pQuery;
pStatus->overStatus = pQuery->over;
pStatus->lastKey = pQuery->lastKey;
pStatus->skey = pQuery->skey;
pStatus->ekey = pQuery->ekey;
pStatus->start = pRuntimeEnv->startPos;
pStatus->next = pRuntimeEnv->nextPos;
pStatus->end = pRuntimeEnv->endPos;
@ -6157,9 +6108,6 @@ static void queryStatusRestore(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatus *pSta
SWAP(pQuery->skey, pQuery->ekey, TSKEY);
pQuery->lastKey = pStatus->lastKey;
pQuery->skey = pStatus->skey;
pQuery->ekey = pStatus->ekey;
pQuery->over = pStatus->overStatus;
pRuntimeEnv->startPos = pStatus->start;
@ -6213,20 +6161,11 @@ void setQueryStatus(SQuery *pQuery, int8_t status) {
}
}
void vnodeScanAllData(SQueryRuntimeEnv *pRuntimeEnv) {
bool needScanDataBlocksAgain(SQueryRuntimeEnv *pRuntimeEnv) {
SQuery *pQuery = pRuntimeEnv->pQuery;
setQueryStatus(pQuery, QUERY_NOT_COMPLETED);
bool toContinue = false;
/* store the start query position */
savePointPosition(&pRuntimeEnv->startPos, pQuery->fileId, pQuery->slot, pQuery->pos);
int64_t skey = pQuery->lastKey;
while (1) {
doScanAllDataBlocks(pRuntimeEnv);
bool toContinue = true;
if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || (pQuery->intervalTime > 0 && pQuery->slidingTime > 0)) {
if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || (isIntervalQuery(pQuery) && pQuery->slidingTime > 0)) {
// for each group result, call the finalize function for each column
SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
@ -6242,7 +6181,7 @@ void vnodeScanAllData(SQueryRuntimeEnv *pRuntimeEnv) {
aAggs[pQuery->pSelectExpr[j].pBase.functionId].xNextStep(&pRuntimeEnv->pCtx[j]);
SResultInfo *pResInfo = GET_RES_INFO(&pRuntimeEnv->pCtx[j]);
toContinue &= (pResInfo->complete);
toContinue |= (!pResInfo->complete);
}
}
} else {
@ -6250,21 +6189,40 @@ void vnodeScanAllData(SQueryRuntimeEnv *pRuntimeEnv) {
aAggs[pQuery->pSelectExpr[j].pBase.functionId].xNextStep(&pRuntimeEnv->pCtx[j]);
SResultInfo *pResInfo = GET_RES_INFO(&pRuntimeEnv->pCtx[j]);
toContinue &= (pResInfo->complete);
toContinue |= (!pResInfo->complete);
}
}
if (toContinue) {
return toContinue;
}
void vnodeScanAllData(SQueryRuntimeEnv *pRuntimeEnv) {
SQuery *pQuery = pRuntimeEnv->pQuery;
setQueryStatus(pQuery, QUERY_NOT_COMPLETED);
/* store the start query position */
savePointPosition(&pRuntimeEnv->startPos, pQuery->fileId, pQuery->slot, pQuery->pos);
int64_t skey = pQuery->lastKey;
SET_MASTER_SCAN_FLAG(pRuntimeEnv);
while (1) {
doScanAllDataBlocks(pRuntimeEnv);
if (!needScanDataBlocksAgain(pRuntimeEnv)) {
break;
}
// set the correct start position, and load the corresponding block in buffer if required.
TSKEY actKey = loadRequiredBlockIntoMem(pRuntimeEnv, &pRuntimeEnv->startPos);
assert((QUERY_IS_ASC_QUERY(pQuery) && actKey >= pQuery->skey) ||
(!QUERY_IS_ASC_QUERY(pQuery) && actKey <= pQuery->skey));
/*
* set the correct start position, and load the corresponding block in buffer for next
* round scan all data blocks.
*/
TSKEY key = loadRequiredBlockIntoMem(pRuntimeEnv, &pRuntimeEnv->startPos);
assert((QUERY_IS_ASC_QUERY(pQuery) && key >= pQuery->skey) ||
(!QUERY_IS_ASC_QUERY(pQuery) && key <= pQuery->skey));
setQueryStatus(pQuery, QUERY_NOT_COMPLETED);
pQuery->lastKey = pQuery->skey;
pRuntimeEnv->scanFlag = REPEAT_SCAN;
/* check if query is killed or not */
if (isQueryKilled(pQuery)) {
@ -6285,7 +6243,7 @@ void vnodeScanAllData(SQueryRuntimeEnv *pRuntimeEnv) {
void doFinalizeResult(SQueryRuntimeEnv *pRuntimeEnv) {
SQuery *pQuery = pRuntimeEnv->pQuery;
if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || (pQuery->intervalTime > 0 && pQuery->slidingTime > 0)) {
if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || (isIntervalQuery(pQuery) && pQuery->slidingTime > 0)) {
// for each group result, call the finalize function for each column
SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) {
@ -6361,7 +6319,7 @@ int64_t getNumOfResult(SQueryRuntimeEnv *pRuntimeEnv) {
*/
// void forwardIntervalQueryRange(STableQuerySupportObj *pSupporter, SQueryRuntimeEnv *pRuntimeEnv) {
// SQuery *pQuery = pRuntimeEnv->pQuery;
// if (pQuery->slidingTime > 0 && pQuery->intervalTime > 0) {
// if (pQuery->slidingTime > 0 && isIntervalQuery(pQuery)) {
// if ((QUERY_IS_ASC_QUERY(pQuery) && pQuery->lastKey >= pQuery->ekey) ||
// (!QUERY_IS_ASC_QUERY(pQuery) && pQuery->lastKey <= pQuery->ekey)) {
// setQueryStatus(pQuery, QUERY_COMPLETED);
@ -7067,21 +7025,6 @@ int32_t setAdditionalInfo(STableQuerySupportObj *pSupporter, int32_t meterIdx, S
return 0;
}
// int64_t getNextAccessedKeyInData(SQuery *pQuery, int64_t *pPrimaryCol, SBlockInfo *pBlockInfo, int32_t blockStatus) {
// assert(pQuery->pos >= 0 && pQuery->pos <= pBlockInfo->size - 1);
//
// TSKEY key = -1;
// if (IS_DATA_BLOCK_LOADED(blockStatus)) {
// key = pPrimaryCol[pQuery->pos];
// } else { // while the data block is not loaded, the position must be the first or last position
// assert(pQuery->pos == pBlockInfo->size - 1 || pQuery->pos == 0);
// key = QUERY_IS_ASC_QUERY(pQuery) ? pBlockInfo->keyFirst : pBlockInfo->keyLast;
// }
//
// assert((key >= pQuery->skey && QUERY_IS_ASC_QUERY(pQuery)) || (key <= pQuery->skey && !QUERY_IS_ASC_QUERY(pQuery)));
// return key;
//}
/*
* There are two cases to handle:
*
@ -7130,7 +7073,7 @@ void setIntervalQueryRange(SMeterQueryInfo *pMeterQueryInfo, STableQuerySupportO
doGetAlignedIntervalQueryRangeImpl(pQuery, win.skey, win.skey, win.ekey, &skey1, &ekey1, &windowSKey, &windowEKey);
pWindowResInfo->startTime = windowSKey;
assert(pWindowResInfo->startTime > 0);
// assert(pWindowResInfo->startTime > 0);
if (pWindowResInfo->prevSKey == 0) {
if (QUERY_IS_ASC_QUERY(pQuery)) {
@ -7213,7 +7156,7 @@ int32_t LoadDatablockOnDemand(SCompBlock *pBlock, SField **pFields, uint8_t *blk
pQuery->pSelectExpr[i].pBase.colInfo.colId, *blkStatus);
}
if (pRuntimeEnv->pTSBuf > 0 || (pQuery->intervalTime > 0 && pQuery->slidingTime > 0)) {
if (pRuntimeEnv->pTSBuf > 0 || (isIntervalQuery(pQuery) && pQuery->slidingTime > 0)) {
req |= BLK_DATA_ALL_NEEDED;
}
}
@ -7296,14 +7239,14 @@ int32_t LoadDatablockOnDemand(SCompBlock *pBlock, SField **pFields, uint8_t *blk
}
bool onDemandLoadDatablock(SQuery *pQuery, int16_t queryRangeSet) {
return (pQuery->intervalTime == 0) || ((queryRangeSet == 1) && (pQuery->intervalTime > 0));
return (pQuery->intervalTime == 0) || ((queryRangeSet == 1) && (isIntervalQuery(pQuery)));
}
static int32_t getNumOfSubset(STableQuerySupportObj *pSupporter) {
SQuery *pQuery = pSupporter->runtimeEnv.pQuery;
int32_t totalSubset = 0;
if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || (pQuery->intervalTime > 0)) {
if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || (isIntervalQuery(pQuery))) {
totalSubset = numOfClosedTimeWindow(&pSupporter->runtimeEnv.windowResInfo);
} else {
totalSubset = pSupporter->pSidSet->numOfSubSet;
@ -7433,7 +7376,6 @@ void stableApplyFunctionsOnBlock(STableQuerySupportObj *pSupporter, SMeterDataIn
assert(numOfRes >= 0);
// todo merge refactor
updateWindowResNumOfRes(pRuntimeEnv, pMeterDataInfo);
updatelastkey(pQuery, pMeterQueryInfo);
}

View File

@ -1091,18 +1091,15 @@ static void vnodeSingleMeterIntervalMainLooper(STableQuerySupportObj *pSupporter
while (1) {
initCtxOutputBuf(pRuntimeEnv);
vnodeScanAllData(pRuntimeEnv);
if (isQueryKilled(pQuery)) {
return;
}
assert(!Q_STATUS_EQUAL(pQuery->over, QUERY_NOT_COMPLETED));
doFinalizeResult(pRuntimeEnv);
// int64_t maxOutput = getNumOfResult(pRuntimeEnv);
// here we can ignore the records in case of no interpolation
// todo handle offset, in case of top/bottom interval query
if ((pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf != NULL) && pQuery->limit.offset > 0 &&
@ -1113,30 +1110,17 @@ static void vnodeSingleMeterIntervalMainLooper(STableQuerySupportObj *pSupporter
int32_t c = MIN(numOfClosed, pQuery->limit.offset);
clearFirstNTimeWindow(pRuntimeEnv, c);
pQuery->limit.offset -= c;
} else {
// pQuery->pointsRead += maxOutput;
// forwardCtxOutputBuf(pRuntimeEnv, maxOutput);
}
if (Q_STATUS_EQUAL(pQuery->over, QUERY_NO_DATA_TO_CHECK | QUERY_COMPLETED)) {
break;
}
// load the data block for the next retrieve
loadRequiredBlockIntoMem(pRuntimeEnv, &pRuntimeEnv->nextPos);
if (Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL)) {
break;
}
// /*
// * the scan limitation mechanism is upon here,
// * 1. since there is only one(k) record is generated in one scan operation
// * 2. remain space is not sufficient for next query output, abort
// */
// if ((pQuery->pointsRead % pQuery->pointsToRead == 0 && pQuery->pointsRead != 0) ||
// ((pQuery->pointsRead + maxOutput) > pQuery->pointsToRead)) {
// setQueryStatus(pQuery, QUERY_RESBUF_FULL);
// break;
// }
}
}
@ -1262,7 +1246,9 @@ void vnodeSingleTableQuery(SSchedMsg *pMsg) {
// here we have scan all qualified data in both data file and cache
if (Q_STATUS_EQUAL(pQuery->over, QUERY_NO_DATA_TO_CHECK | QUERY_COMPLETED)) {
// continue to get push data from the group result
if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || pQuery->intervalTime > 0) {
if (isGroupbyNormalCol(pQuery->pGroupbyExpr) ||
(pQuery->intervalTime > 0 && pQInfo->pointsReturned < pQuery->limit.limit)) {
//todo limit the output for interval query?
pQuery->pointsRead = 0;
pSupporter->subgroupIdx = 0; // always start from 0

View File

@ -13,9 +13,6 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <assert.h>
#include <stdint.h>
#include "os.h"
#include "taosmsg.h"
#include "textbuffer.h"
@ -118,14 +115,9 @@ int32_t taosGetNumOfResWithoutLimit(SInterpolationInfo* pInterpoInfo, int64_t* p
if (numOfAvailRawData > 0) {
int32_t finalNumOfResult = 0;
// if (pInterpoInfo->order == TSQL_SO_ASC) {
// get last timestamp, calculate the result size
int64_t lastKey = pPrimaryKeyArray[pInterpoInfo->numOfRawDataInRows - 1];
finalNumOfResult = (int32_t)(labs(lastKey - pInterpoInfo->startTimestamp) / nInterval) + 1;
// } else { // todo error less than one!!!
// TSKEY lastKey = pPrimaryKeyArray[0];
// finalNumOfResult = (int32_t)((pInterpoInfo->startTimestamp - lastKey) / nInterval) + 1;
// }
assert(finalNumOfResult >= numOfAvailRawData);
return finalNumOfResult;
@ -140,7 +132,9 @@ int32_t taosGetNumOfResWithoutLimit(SInterpolationInfo* pInterpoInfo, int64_t* p
}
}
bool taosHasRemainsDataForInterpolation(SInterpolationInfo* pInterpoInfo) { return taosNumOfRemainPoints(pInterpoInfo) > 0; }
bool taosHasRemainsDataForInterpolation(SInterpolationInfo* pInterpoInfo) {
return taosNumOfRemainPoints(pInterpoInfo) > 0;
}
int32_t taosNumOfRemainPoints(SInterpolationInfo* pInterpoInfo) {
if (pInterpoInfo->rowIdx == -1 || pInterpoInfo->numOfRawDataInRows == 0) {
@ -197,28 +191,22 @@ int taosDoLinearInterpolation(int32_t type, SPoint* point1, SPoint* point2, SPoi
return 0;
}
static char* getPos(char* data, int32_t bytes, int32_t order, int32_t capacity, int32_t index) {
// if (order == TSQL_SO_ASC) {
return data + index * bytes;
// } else {
// return data + (capacity - index - 1) * bytes;
// }
}
static char* getPos(char* data, int32_t bytes, int32_t index) { return data + index * bytes; }
static void setTagsValueInInterpolation(tFilePage** data, char** pTags, SColumnModel* pModel, int32_t order, int32_t start,
int32_t capacity, int32_t num) {
static void setTagsValueInInterpolation(tFilePage** data, char** pTags, SColumnModel* pModel, int32_t order,
int32_t start, int32_t capacity, int32_t num) {
for (int32_t j = 0, i = start; i < pModel->numOfCols; ++i, ++j) {
SSchema* pSchema = getColumnModelSchema(pModel, i);
char* val1 = getPos(data[i]->data, pSchema->bytes, order, capacity, num);
char* val1 = getPos(data[i]->data, pSchema->bytes, num);
assignVal(val1, pTags[j], pSchema->bytes, pSchema->type);
}
}
static void doInterpoResultImpl(SInterpolationInfo* pInterpoInfo, int16_t interpoType, tFilePage** data,
SColumnModel* pModel, int32_t* num, char** srcData, int64_t nInterval, int64_t* defaultVal,
int64_t currentTimestamp, int32_t capacity, int32_t numOfTags, char** pTags,
bool outOfBound) {
SColumnModel* pModel, int32_t* num, char** srcData, int64_t nInterval,
int64_t* defaultVal, int64_t currentTimestamp, int32_t capacity, int32_t numOfTags,
char** pTags, bool outOfBound) {
char** prevValues = &pInterpoInfo->prevValues;
char** nextValues = &pInterpoInfo->nextValues;
@ -226,7 +214,7 @@ static void doInterpoResultImpl(SInterpolationInfo* pInterpoInfo, int16_t interp
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pInterpoInfo->order);
char* val = getPos(data[0]->data, TSDB_KEYSIZE, pInterpoInfo->order, capacity, *num);
char* val = getPos(data[0]->data, TSDB_KEYSIZE, *num);
*(TSKEY*)val = pInterpoInfo->startTimestamp;
int32_t numOfValCols = pModel->numOfCols - numOfTags;
@ -239,7 +227,7 @@ static void doInterpoResultImpl(SInterpolationInfo* pInterpoInfo, int16_t interp
SSchema* pSchema = getColumnModelSchema(pModel, i);
int16_t offset = getColumnModelOffset(pModel, i);
char* val1 = getPos(data[i]->data, pSchema->bytes, pInterpoInfo->order, capacity, *num);
char* val1 = getPos(data[i]->data, pSchema->bytes, *num);
if (isNull(pInterpolationData + offset, pSchema->type)) {
setNull(val1, pSchema->type, pSchema->bytes);
@ -251,7 +239,7 @@ static void doInterpoResultImpl(SInterpolationInfo* pInterpoInfo, int16_t interp
for (int32_t i = 1; i < numOfValCols; ++i) {
SSchema* pSchema = getColumnModelSchema(pModel, i);
char* val1 = getPos(data[i]->data, pSchema->bytes, pInterpoInfo->order, capacity, *num);
char* val1 = getPos(data[i]->data, pSchema->bytes, *num);
setNull(val1, pSchema->type, pSchema->bytes);
}
}
@ -265,7 +253,7 @@ static void doInterpoResultImpl(SInterpolationInfo* pInterpoInfo, int16_t interp
int16_t offset = getColumnModelOffset(pModel, i);
int16_t type = pSchema->type;
char* val1 = getPos(data[i]->data, pSchema->bytes, pInterpoInfo->order, capacity, *num);
char* val1 = getPos(data[i]->data, pSchema->bytes, *num);
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR || type == TSDB_DATA_TYPE_BOOL) {
setNull(val1, type, pSchema->bytes);
@ -284,7 +272,7 @@ static void doInterpoResultImpl(SInterpolationInfo* pInterpoInfo, int16_t interp
for (int32_t i = 1; i < numOfValCols; ++i) {
SSchema* pSchema = getColumnModelSchema(pModel, i);
char* val1 = getPos(data[i]->data, pSchema->bytes, pInterpoInfo->order, capacity, *num);
char* val1 = getPos(data[i]->data, pSchema->bytes, *num);
setNull(val1, pSchema->type, pSchema->bytes);
}
@ -294,7 +282,7 @@ static void doInterpoResultImpl(SInterpolationInfo* pInterpoInfo, int16_t interp
for (int32_t i = 1; i < numOfValCols; ++i) {
SSchema* pSchema = getColumnModelSchema(pModel, i);
char* val1 = getPos(data[i]->data, pSchema->bytes, pInterpoInfo->order, capacity, *num);
char* val1 = getPos(data[i]->data, pSchema->bytes, *num);
assignVal(val1, (char*)&defaultVal[i], pSchema->bytes, pSchema->type);
}
@ -360,6 +348,9 @@ int32_t taosDoInterpoResult(SInterpolationInfo* pInterpoInfo, int16_t interpoTyp
}
}
if (((pInterpoInfo->startTimestamp < currentTimestamp && INTERPOL_IS_ASC_INTERPOL(pInterpoInfo)) ||
(pInterpoInfo->startTimestamp > currentTimestamp && !INTERPOL_IS_ASC_INTERPOL(pInterpoInfo))) &&
num < outputRows) {
while (((pInterpoInfo->startTimestamp < currentTimestamp && INTERPOL_IS_ASC_INTERPOL(pInterpoInfo)) ||
(pInterpoInfo->startTimestamp > currentTimestamp && !INTERPOL_IS_ASC_INTERPOL(pInterpoInfo))) &&
num < outputRows) {
@ -373,8 +364,8 @@ int32_t taosDoInterpoResult(SInterpolationInfo* pInterpoInfo, int16_t interpoTyp
pInterpoInfo->numOfTotalInterpo += pInterpoInfo->numOfCurrentInterpo;
return outputRows;
}
if (pInterpoInfo->startTimestamp == currentTimestamp) {
} else {
// if (pInterpoInfo->startTimestamp == currentTimestamp) {
if (*prevValues == NULL) {
*prevValues = calloc(1, pModel->rowSize);
for (int i = 1; i < pModel->numOfCols; i++) {
@ -391,14 +382,13 @@ int32_t taosDoInterpoResult(SInterpolationInfo* pInterpoInfo, int16_t interpoTyp
int16_t offset = getColumnModelOffset(pModel, i);
SSchema* pSchema = getColumnModelSchema(pModel, i);
char* val1 = getPos(data[i]->data, pSchema->bytes, pInterpoInfo->order, bufSize, num);
char* val1 = getPos(data[i]->data, pSchema->bytes, num);
if (i == 0 ||
(functionIDs[i] != TSDB_FUNC_COUNT &&
!isNull(srcData[i] + pInterpoInfo->rowIdx * pSchema->bytes, pSchema->type)) ||
(functionIDs[i] == TSDB_FUNC_COUNT &&
*(int64_t*)(srcData[i] + pInterpoInfo->rowIdx * pSchema->bytes) != 0)) {
assignVal(val1, srcData[i] + pInterpoInfo->rowIdx * pSchema->bytes, pSchema->bytes, pSchema->type);
memcpy(*prevValues + tlen, srcData[i] + pInterpoInfo->rowIdx * pSchema->bytes, pSchema->bytes);
} else { // i > 0 and isNULL, do interpolation
@ -416,11 +406,11 @@ int32_t taosDoInterpoResult(SInterpolationInfo* pInterpoInfo, int16_t interpoTyp
/* set the tag value for final result */
setTagsValueInInterpolation(data, pTags, pModel, pInterpoInfo->order, pModel->numOfCols - numOfTags, bufSize,
num);
}
pInterpoInfo->startTimestamp += (nInterval * step);
pInterpoInfo->rowIdx += 1;
num += 1;
}
if ((pInterpoInfo->rowIdx >= pInterpoInfo->numOfRawDataInRows && INTERPOL_IS_ASC_INTERPOL(pInterpoInfo)) ||
(pInterpoInfo->rowIdx < 0 && !INTERPOL_IS_ASC_INTERPOL(pInterpoInfo)) || num >= outputRows) {