diff --git a/src/system/detail/src/vnodeQueryImpl.c b/src/system/detail/src/vnodeQueryImpl.c index e2342870df..fe8586edba 100644 --- a/src/system/detail/src/vnodeQueryImpl.c +++ b/src/system/detail/src/vnodeQueryImpl.c @@ -1526,7 +1526,7 @@ static int32_t addNewWindowResultBuf(SWindowResult *pWindowRes, SQueryDiskbasedR // in the first scan, new space needed for results int32_t pageId = -1; SIDList list = getDataBufPagesIdList(pResultBuf, sid); - + if (list.size == 0) { pData = getNewDataBuf(pResultBuf, sid, &pageId); } else { @@ -1550,7 +1550,7 @@ static int32_t addNewWindowResultBuf(SWindowResult *pWindowRes, SQueryDiskbasedR pWindowRes->pos.pageId = pageId; pWindowRes->pos.rowId = pData->numOfElems++; } - + return 0; } @@ -1564,8 +1564,7 @@ static int32_t setWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SWindowRes return -1; } - // not assign result buffer yet - // todo refactor + // not assign result buffer yet, add new result buffer if (pWindowRes->pos.pageId == -1) { int32_t ret = addNewWindowResultBuf(pWindowRes, pResultBuf, sid, pRuntimeEnv->numOfRowsPerPage); if (ret != 0) { @@ -1604,6 +1603,45 @@ static int32_t getForwardStepsInBlock(int32_t numOfPoints, __block_search_fn_t s return forwardStep; } +static void doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKey, SWindowResInfo *pWindowResInfo) { + SQuery *pQuery = pRuntimeEnv->pQuery; + + if (pQuery->slidingTime > 0 && pQuery->intervalTime > 0 && IS_MASTER_SCAN(pRuntimeEnv)) { // query completed + if ((lastKey >= pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) || + (lastKey <= pQuery->ekey && !QUERY_IS_ASC_QUERY(pQuery))) { + closeAllSlidingWindow(pWindowResInfo); + + pWindowResInfo->curIndex = pWindowResInfo->size - 1; + setQueryStatus(pQuery, QUERY_COMPLETED | QUERY_RESBUF_FULL); + } else { + int32_t i = 0; + int64_t skey = 0; + + for (i = 0; i < pWindowResInfo->size; ++i) { + SWindowResult *pResult = &pWindowResInfo->pResult[i]; + + if ((pResult->window.ekey <= lastKey && QUERY_IS_ASC_QUERY(pQuery)) || + (pResult->window.skey >= lastKey && !QUERY_IS_ASC_QUERY(pQuery))) { + closeSlidingWindow(pWindowResInfo, i); + } else { + skey = pResult->window.skey; + break; + } + } + + pWindowResInfo->prevSKey = skey; + + // the number of completed slots are larger than the threshold, dump to client immediately. + int32_t v = numOfClosedSlidingWindow(pWindowResInfo); + if (v > pWindowResInfo->threshold) { + setQueryStatus(pQuery, QUERY_RESBUF_FULL); + } + + dTrace("QInfo:%p total window:%d, closed:%d", GET_QINFO_ADDR(pQuery), pWindowResInfo->size, v); + } + } +} + /** * * @param pRuntimeEnv @@ -1660,18 +1698,29 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t STimeWindow win = getActiveSlidingWindow(pWindowResInfo, ts, pQuery); int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pRuntimeEnv->pMeterObj->sid, &win); if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code - // continue; + return 0; } - if (win.ekey < pBlockInfo->keyLast) { - forwardStep = - getForwardStepsInBlock(pBlockInfo->size, searchFn, win.ekey, pQuery->pos, pQuery->order.order, primaryKeyCol); + if (QUERY_IS_ASC_QUERY(pQuery)) { //todo refactor + if (win.ekey < pBlockInfo->keyLast) { + forwardStep = + getForwardStepsInBlock(pBlockInfo->size, searchFn, win.ekey, pQuery->pos, pQuery->order.order, primaryKeyCol); + } else { + forwardStep = pBlockInfo->size - pQuery->pos; + } + } else { + if (win.skey > pBlockInfo->keyFirst) { + forwardStep = getForwardStepsInBlock(pBlockInfo->size, searchFn, win.skey, pQuery->pos, pQuery->order.order, primaryKeyCol); + } else { + forwardStep = pQuery->pos + 1; + } } for (int32_t k = 0; k < pQuery->numOfOutputCols; ++k) { pCtx[k].nStartQueryTimestamp = win.skey; pCtx[k].size = forwardStep; - + pCtx[k].startOffset = pQuery->pos; + int32_t functionId = pQuery->pSelectExpr[k].pBase.functionId; if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) { aAggs[functionId].xFunction(&pCtx[k]); @@ -1683,6 +1732,7 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t while (1) { getNextLogicalQueryRange(pRuntimeEnv, &nextWin); + if (pWindowResInfo->startTime > nextWin.skey || (nextWin.skey > pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) || (nextWin.ekey < pQuery->ekey && !QUERY_IS_ASC_QUERY(pQuery))) { pWindowResInfo->curIndex = index; @@ -1695,43 +1745,67 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t break; } - if (pBlockInfo->keyFirst <= nextWin.skey && pBlockInfo->keyLast >= nextWin.skey) { - int32_t startPos = searchFn((char *)primaryKeyCol, pBlockInfo->size, nextWin.skey, TSQL_SO_ASC); + // if (pBlockInfo->keyLast >= nextWin.skey && pBlockInfo->keyFirst <= nextWin.ekey) { + int32_t startPos = -1; + if (QUERY_IS_ASC_QUERY(pQuery)) { + startPos = searchFn((char *)primaryKeyCol, pBlockInfo->size, nextWin.skey, TSQL_SO_ASC); + } else { + startPos = searchFn((char *)primaryKeyCol, pBlockInfo->size, nextWin.ekey, TSQL_SO_DESC); + } + + /* + * This time window does not cover any data, try next time window + * when the time window is too small, this case may happen + */ + if ((primaryKeyCol[startPos] > nextWin.ekey && QUERY_IS_ASC_QUERY(pQuery)) || + (primaryKeyCol[startPos] < nextWin.skey && !QUERY_IS_ASC_QUERY(pQuery))) { + continue; + } - // null data, failed to allocate more memory buffer - if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pRuntimeEnv->pMeterObj->sid, &nextWin) != - TSDB_CODE_SUCCESS) { - pRuntimeEnv->windowResInfo.curIndex = index; - break; - } + // null data, failed to allocate more memory buffer + if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pRuntimeEnv->pMeterObj->sid, &nextWin) != + TSDB_CODE_SUCCESS) { + pRuntimeEnv->windowResInfo.curIndex = index; + break; + } + if (QUERY_IS_ASC_QUERY(pQuery)) { //todo refactor if (nextWin.ekey < pBlockInfo->keyLast) { - forwardStep = getForwardStepsInBlock(pBlockInfo->size, searchFn, nextWin.ekey, startPos, pQuery->order.order, - primaryKeyCol); + forwardStep = getForwardStepsInBlock(pBlockInfo->size, searchFn, nextWin.ekey, startPos, pQuery->order.order, primaryKeyCol); } else { forwardStep = pBlockInfo->size - startPos; } + } else { + if (nextWin.skey > pBlockInfo->keyFirst) { + forwardStep = getForwardStepsInBlock(pBlockInfo->size, searchFn, nextWin.skey, startPos, pQuery->order.order, primaryKeyCol); + } else { + forwardStep = startPos + 1; + } + } - for (int32_t k = 0; k < pQuery->numOfOutputCols; ++k) { - pCtx[k].nStartQueryTimestamp = win.skey; - pCtx[k].size = forwardStep; + for (int32_t k = 0; k < pQuery->numOfOutputCols; ++k) { + pCtx[k].nStartQueryTimestamp = nextWin.skey; + pCtx[k].size = forwardStep; + pCtx[k].startOffset = startPos; - SWindowStatus *pStatus = getSlidingWindowStatus(pWindowResInfo, curSlidingWindow(pWindowResInfo)); - if (!IS_MASTER_SCAN(pRuntimeEnv) && !pStatus->closed) { - continue; - } - - int32_t functionId = pQuery->pSelectExpr[k].pBase.functionId; - if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) { - aAggs[functionId].xFunction(&pCtx[k]); - } + SWindowStatus *pStatus = getSlidingWindowStatus(pWindowResInfo, curSlidingWindow(pWindowResInfo)); + if (!IS_MASTER_SCAN(pRuntimeEnv) && !pStatus->closed) { + continue; } - } else { - pWindowResInfo->curIndex = index; - break; + int32_t functionId = pQuery->pSelectExpr[k].pBase.functionId; + if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) { + aAggs[functionId].xFunction(&pCtx[k]); + } } + + // } else { + // pWindowResInfo->curIndex = index; + // break; + // } } + + pWindowResInfo->curIndex = index; } else { /* * the sqlfunctionCtx parameters should be set done before all functions are invoked, @@ -1746,49 +1820,22 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t } } - int64_t lastKey = pBlockInfo->keyLast; - if (pQuery->slidingTime > 0 && pQuery->intervalTime > 0 && IS_MASTER_SCAN(pRuntimeEnv)) { - // query completed - if ((lastKey >= pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) || - (lastKey <= pQuery->ekey && !QUERY_IS_ASC_QUERY(pQuery))) { - closeAllSlidingWindow(pWindowResInfo); + TSKEY lastKey = (QUERY_IS_ASC_QUERY(pQuery)) ? pBlockInfo->keyLast : pBlockInfo->keyFirst; + doCheckQueryCompleted(pRuntimeEnv, lastKey, pWindowResInfo); - pWindowResInfo->curIndex = pWindowResInfo->size - 1; - setQueryStatus(pQuery, QUERY_COMPLETED | QUERY_RESBUF_FULL); - } else { - int32_t i = 0; - int64_t skey = 0; - - for (i = 0; i < pWindowResInfo->size; ++i) { - SWindowResult *pResult = &pWindowResInfo->pResult[i]; - - if ((pResult->window.ekey <= lastKey && QUERY_IS_ASC_QUERY(pQuery)) || - (pResult->window.skey >= lastKey && !QUERY_IS_ASC_QUERY(pQuery))) { - closeSlidingWindow(pWindowResInfo, i); - } else { - skey = pResult->window.skey; - break; - } - } - - pWindowResInfo->prevSKey = skey; - - // the number of completed slots are larger than the threshold, dump to client immediately. - int32_t v = numOfClosedSlidingWindow(pWindowResInfo); - if (v > pWindowResInfo->threshold) { - setQueryStatus(pQuery, QUERY_RESBUF_FULL); - } - - dTrace("QInfo:%p total window:%d, closed:%d", GET_QINFO_ADDR(pQuery), pWindowResInfo->size, v); - } + /* + * No need to calculate the number of output results for group-by normal columns, interval query + * because the results of group by normal column is put into intermediate buffer. + */ + int32_t num = 0; + if (pQuery->intervalTime == 0 && pQuery->slidingTime == 0) { + num = getNumOfResult(pRuntimeEnv) - prevNumOfRes; } - int64_t numOfIncrementRes = 0 /*getNumOfResult(pRuntimeEnv) - prevNumOfRes*/; - validateTimestampForSupplementResult(pRuntimeEnv, numOfIncrementRes); + validateTimestampForSupplementResult(pRuntimeEnv, num); tfree(sasArray); - - return (int32_t)numOfIncrementRes; + return (int32_t)num; } /** @@ -2275,44 +2322,11 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t * free(sasArray); - if (pQuery->slidingTime > 0 && pQuery->intervalTime > 0 && IS_MASTER_SCAN(pRuntimeEnv)) { - // query completed - if ((lastKey >= pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) || - (lastKey <= pQuery->ekey && !QUERY_IS_ASC_QUERY(pQuery))) { - closeAllSlidingWindow(pWindowResInfo); - - pWindowResInfo->curIndex = pWindowResInfo->size - 1; - setQueryStatus(pQuery, QUERY_COMPLETED | QUERY_RESBUF_FULL); - } else { - int32_t i = 0; - int64_t skey = 0; - - for (i = 0; i < pWindowResInfo->size; ++i) { - SWindowResult *pResult = &pWindowResInfo->pResult[i]; - - if ((pResult->window.ekey <= lastKey && QUERY_IS_ASC_QUERY(pQuery)) || - (pResult->window.skey >= lastKey && !QUERY_IS_ASC_QUERY(pQuery))) { - closeSlidingWindow(pWindowResInfo, i); - } else { - skey = pResult->window.skey; - break; - } - } - - pWindowResInfo->prevSKey = skey; - - // the number of completed slots are larger than the threshold, dump to client immediately. - int32_t v = numOfClosedSlidingWindow(pWindowResInfo); - if (v > pWindowResInfo->threshold) { - setQueryStatus(pQuery, QUERY_RESBUF_FULL); - } - - dTrace("QInfo:%p total window:%d, closed:%d", GET_QINFO_ADDR(pQuery), pWindowResInfo->size, v); - } - } + lastKey = (QUERY_IS_ASC_QUERY(pQuery)) ? pBlockInfo->keyLast : pBlockInfo->keyFirst; + doCheckQueryCompleted(pRuntimeEnv, lastKey, pWindowResInfo); /* - * No need to calculate the number of output results for groupby normal columns + * No need to calculate the number of output results for group-by normal columns, interval query * because the results of group by normal column is put into intermediate buffer. */ int32_t num = 0; @@ -3481,6 +3495,70 @@ bool normalizeUnBoundLastRowQuery(SMeterQuerySupportObj *pSupporter, SPointInter return getNeighborPoints(pSupporter, pMeterObj, pPointInterpSupporter); } +static void getActualRange(SMeterQuerySupportObj *pSupporter, STimeWindow *pTimeWindow) { + SQueryRuntimeEnv * pRuntimeEnv = &pSupporter->runtimeEnv; + SQuery * pQuery = pRuntimeEnv->pQuery; + SMeterObj * pMeterObj = pRuntimeEnv->pMeterObj; + __block_search_fn_t searchFn = vnodeSearchKeyFunc[pMeterObj->searchAlgorithm]; + + int32_t order = pQuery->order.order; + SWAP(pQuery->skey, pQuery->ekey, TSKEY); + pQuery->lastKey = pQuery->skey; + + if (QUERY_IS_ASC_QUERY(pQuery)) { // do the desc check first for asc query + pQuery->order.order ^= 1; + + TSKEY t = getQueryStartPositionInCache(pRuntimeEnv, &pQuery->slot, &pQuery->pos, false); + if (t > 0) { + pTimeWindow->ekey = t; + } else if (getQualifiedDataBlock(pMeterObj, pRuntimeEnv, QUERY_RANGE_LESS_EQUAL, searchFn)) { + pTimeWindow->ekey = getTimestampInDiskBlock(pRuntimeEnv, pQuery->pos); + } + + pQuery->order.order = order; + SWAP(pQuery->skey, pQuery->ekey, TSKEY); + pQuery->lastKey = pQuery->skey; + + if (getQualifiedDataBlock(pMeterObj, pRuntimeEnv, QUERY_RANGE_GREATER_EQUAL, searchFn)) { + pTimeWindow->skey = getTimestampInDiskBlock(pRuntimeEnv, pQuery->pos); + } else { // set no data in file + pQuery->fileId = -1; + pTimeWindow->skey = getQueryStartPositionInCache(pRuntimeEnv, &pQuery->slot, &pQuery->pos, false); + } + + pQuery->skey = pTimeWindow->skey; + pQuery->ekey = pTimeWindow->ekey; + } else { + pQuery->order.order ^= 1; + + if (getQualifiedDataBlock(pMeterObj, pRuntimeEnv, QUERY_RANGE_GREATER_EQUAL, searchFn)) { + pTimeWindow->skey = getTimestampInDiskBlock(pRuntimeEnv, pQuery->pos); + } else { // set no data in file + pQuery->fileId = -1; + pTimeWindow->skey = getQueryStartPositionInCache(pRuntimeEnv, &pQuery->slot, &pQuery->pos, false); + } + + // reverse check for maxValue in query range + SWAP(pQuery->skey, pQuery->ekey, TSKEY); + pQuery->order.order ^= 1; + + // set no data in file + pQuery->lastKey = pQuery->skey; + + TSKEY t = getQueryStartPositionInCache(pRuntimeEnv, &pQuery->slot, &pQuery->pos, false); + if (t > 0) { + pTimeWindow->ekey = t; + } else if (getQualifiedDataBlock(pMeterObj, pRuntimeEnv, QUERY_RANGE_LESS_EQUAL, searchFn)) { + pTimeWindow->ekey = getTimestampInDiskBlock(pRuntimeEnv, pQuery->pos); + } + + pQuery->ekey = pTimeWindow->skey; + pQuery->skey = pTimeWindow->ekey; + } + + pQuery->order.order = order; +} + /** * determine the first query range, according to raw query range [skey, ekey] and group-by interval. * the time interval for aggregating is not enforced to check its validation, the minimum interval is not less than @@ -4523,55 +4601,26 @@ int32_t vnodeQuerySingleMeterPrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, SMete pointInterpSupporterDestroy(&interpInfo); return TSDB_CODE_SUCCESS; } - } else { - // find the skey and ekey in case of sliding query - // todo refactor + } else { // find the skey and ekey in case of sliding query if (pQuery->slidingTime > 0 && pQuery->intervalTime > 0) { - int64_t skey = 0; - - SWAP(pQuery->skey, pQuery->ekey, int64_t); - pQuery->order.order ^= 1; - pQuery->lastKey = pQuery->skey; - - if (normalizedFirstQueryRange(dataInDisk, dataInCache, pSupporter, &interpInfo, &skey) == false) { - sem_post(&pQInfo->dataReady); - pQInfo->over = 1; - - pointInterpSupporterDestroy(&interpInfo); - return TSDB_CODE_SUCCESS; - } - - pQuery->skey = skey; - - pQuery->order.order ^= 1; - SWAP(pQuery->skey, pQuery->ekey, int64_t); - - int64_t ekey = 0; - pQuery->lastKey = pQuery->skey; - if (normalizedFirstQueryRange(dataInDisk, dataInCache, pSupporter, &interpInfo, &ekey) == false) { - // - } - - pQuery->skey = ekey; + STimeWindow win = {0}; + getActualRange(pSupporter, &win); TSKEY skey1, ekey1; TSKEY windowSKey = 0, windowEKey = 0; - TSKEY minKey = MIN(pQuery->skey, pQuery->ekey); - TSKEY maxKey = MAX(pQuery->skey, pQuery->ekey); - - doGetAlignedIntervalQueryRangeImpl(pQuery, minKey, minKey, maxKey, &skey1, &ekey1, &windowSKey, &windowEKey); + doGetAlignedIntervalQueryRangeImpl(pQuery, win.skey, win.skey, win.ekey, &skey1, &ekey1, &windowSKey, + &windowEKey); pRuntimeEnv->windowResInfo.startTime = windowSKey; - pSupporter->rawSKey = pQuery->skey; - pSupporter->rawEKey = pQuery->ekey; - if (QUERY_IS_ASC_QUERY(pQuery)) { pRuntimeEnv->windowResInfo.prevSKey = windowSKey; } else { pRuntimeEnv->windowResInfo.prevSKey = - windowSKey + ((pQuery->skey - windowSKey) / pQuery->slidingTime) * pQuery->slidingTime; + windowSKey + ((win.ekey - windowSKey) / pQuery->slidingTime) * pQuery->slidingTime; } + + pQuery->over = QUERY_NOT_COMPLETED; } else { int64_t ekey = 0; if ((normalizedFirstQueryRange(dataInDisk, dataInCache, pSupporter, &interpInfo, &ekey) == false) || @@ -5093,7 +5142,6 @@ static void doHandleDataBlockImpl(SQueryRuntimeEnv *pRuntimeEnv, SBlockInfo *pbl static void getNextLogicalQueryRange(SQueryRuntimeEnv *pRuntimeEnv, STimeWindow *pTimeWindow) { SQuery *pQuery = pRuntimeEnv->pQuery; - int32_t factor = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); pTimeWindow->skey += (pQuery->slidingTime * factor); @@ -5853,13 +5901,11 @@ void enableFunctForMasterScan(SQueryRuntimeEnv *pRuntimeEnv, int32_t order) { pQuery->order.order = (pQuery->order.order ^ 1); } -// todo dynamically add new slots void createQueryResultInfo(SQuery *pQuery, SWindowResult *pResultRow, bool isSTableQuery, SPosInfo *posInfo) { int32_t numOfCols = pQuery->numOfOutputCols; pResultRow->resultInfo = calloc((size_t)numOfCols, sizeof(SResultInfo)); - pResultRow->pos = - *posInfo; // page->data + (pRuntimeEnv->offset[i] * pRuntimeEnv->numOfRowsPerPage) + page->numOfElems*s1; + pResultRow->pos = *posInfo; for (int32_t i = 0; i < numOfCols; ++i) { SResultInfo *pResultInfo = &pResultRow->resultInfo[i]; @@ -7073,15 +7119,15 @@ void validateTimestampForSupplementResult(SQueryRuntimeEnv *pRuntimeEnv, int64_t int32_t setOutputBufferForIntervalQuery(SQueryRuntimeEnv *pRuntimeEnv, SMeterQueryInfo *pMeterQueryInfo) { SQueryDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf; - SWindowResInfo* pWindowResInfo = &pMeterQueryInfo->windowResInfo; - + SWindowResInfo * pWindowResInfo = &pMeterQueryInfo->windowResInfo; + STimeWindow win = getActiveSlidingWindow(pWindowResInfo, pMeterQueryInfo->lastKey, pRuntimeEnv->pQuery); SWindowResult *pWindowRes = doSetSlidingWindowFromKey(pRuntimeEnv, pWindowResInfo, (char *)&win.skey, TSDB_KEYSIZE); if (pWindowRes == NULL) { return -1; } - + // not allocated yet, allocate new buffer if (pWindowRes->pos.pageId == -1) { int32_t ret = addNewWindowResultBuf(pWindowRes, pResultBuf, pMeterQueryInfo->sid, pRuntimeEnv->numOfRowsPerPage);