diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 1160f330c5..b395b5e1b8 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -993,70 +993,6 @@ static FORCE_INLINE TSKEY reviseWindowEkey(SQuery *pQuery, STimeWindow *pWindow) return ekey; } -#if 0 -//todo binary search -static UNUSED_FUNC void* getDataBlockImpl(SArray* pDataBlock, int32_t colId) { - int32_t numOfCols = (int32_t)taosArrayGetSize(pDataBlock); - - for (int32_t i = 0; i < numOfCols; ++i) { - SColumnInfoData *p = taosArrayGet(pDataBlock, i); - if (colId == p->info.colId) { - return p->pData; - } - } - - return NULL; -} - -// todo refactor -static UNUSED_FUNC char *getDataBlock(SQuery* pQuery, SArithmeticSupport *sas, int32_t col, int32_t size, SArray *pDataBlock) { - if (pDataBlock == NULL) { - return NULL; - } - - char *dataBlock = NULL; - int32_t functionId = pQuery->pExpr1[col].base.functionId; - if (functionId == TSDB_FUNC_ARITHM) { - sas->pArithExpr = &pQuery->pExpr1[col]; - sas->offset = (QUERY_IS_ASC_QUERY(pQuery))? pQuery->pos : pQuery->pos - (size - 1); - sas->colList = pQuery->colList; - sas->numOfCols = pQuery->numOfCols; - - // here the pQuery->colList and sas->colList are identical - int32_t numOfCols = (int32_t)taosArrayGetSize(pDataBlock); - for (int32_t i = 0; i < pQuery->numOfCols; ++i) { - SColumnInfo *pColMsg = &pQuery->colList[i]; - - dataBlock = NULL; - for (int32_t k = 0; k < numOfCols; ++k) { //todo refactor - SColumnInfoData *p = taosArrayGet(pDataBlock, k); - if (pColMsg->colId == p->info.colId) { - dataBlock = p->pData; - break; - } - } - - assert(dataBlock != NULL); - sas->data[i] = dataBlock; // start from the offset - } - - } else { // other type of query function - SColIndex *pCol = &pQuery->pExpr1[col].base.colInfo; - if (TSDB_COL_IS_NORMAL_COL(pCol->flag)) { - SColIndex* pColIndex = &pQuery->pExpr1[col].base.colInfo; - SColumnInfoData *p = taosArrayGet(pDataBlock, pColIndex->colIndex); - assert(p->info.colId == pColIndex->colId); - - dataBlock = p->pData; - } else { - dataBlock = NULL; - } - } - - return dataBlock; -} -#endif - static void setNotInterpoWindowKey(SQLFunctionCtx* pCtx, int32_t numOfOutput, int32_t type) { if (type == RESULT_ROW_START_INTERP) { for (int32_t k = 0; k < numOfOutput; ++k) { @@ -1491,131 +1427,6 @@ static void hashGroupbyAgg(SQueryRuntimeEnv *pRuntimeEnv, SOperatorInfo* pOperat } } -#if 0 -/** - * todo set the last value for pQueryTableInfo as in rowwiseapplyfunctions - * @param pRuntimeEnv - * @param forwardStep - * @param tsCols - * @param pFields - * @param isDiskFileBlock - * @return the incremental number of output value, so it maybe 0 for fixed number of query, - * such as count/min/max etc. - */ -static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pStatis, SDataBlockInfo *pDataBlockInfo, - SResultRowInfo *pWindowResInfo, __block_search_fn_t searchFn, SArray *pDataBlock) { - SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx; - bool masterScan = IS_MASTER_SCAN(pRuntimeEnv); - - SQuery *pQuery = pRuntimeEnv->pQuery; - int64_t groupId = pQuery->current->groupIndex; - - TSKEY *tsCols = NULL; - if (pDataBlock != NULL) { - SColumnInfoData *pColInfo = taosArrayGet(pDataBlock, 0); - tsCols = (TSKEY *)(pColInfo->pData); - } - - for (int32_t k = 0; k < pQuery->numOfOutput; ++k) { - char *dataBlock = getDataBlock(pQuery, &pRuntimeEnv->sasArray[k], k, pDataBlockInfo->rows, pDataBlock); - setExecParams(pQuery, &pCtx[k], dataBlock, tsCols, pDataBlockInfo, pStatis, &pQuery->pExpr1[k]); - } - - int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); - if (QUERY_IS_INTERVAL_QUERY(pQuery)) { - int32_t prevIndex = curTimeWindowIndex(pWindowResInfo); - - TSKEY ts = getStartTsKey(pQuery, pDataBlockInfo, tsCols, step); - STimeWindow win = getActiveTimeWindow(pWindowResInfo, ts, pQuery); - - SResultRow* pResult = NULL; - int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, &win, masterScan, &pResult, groupId); - if (ret != TSDB_CODE_SUCCESS || pResult == NULL) { - goto _end; - } - - int32_t forwardStep = 0; - int32_t startPos = pQuery->pos; - - TSKEY ekey = reviseWindowEkey(pQuery, &win); - forwardStep = getNumOfRowsInTimeWindow(pQuery, pDataBlockInfo, tsCols, pQuery->pos, ekey, searchFn, true); - - // prev time window not interpolation yet. - int32_t curIndex = curTimeWindowIndex(pWindowResInfo); - if (prevIndex != -1 && prevIndex < curIndex && pQuery->timeWindowInterpo) { - for(int32_t j = prevIndex; j < curIndex; ++j) { // previous time window may be all closed already. - SResultRow *pRes = pWindowResInfo->pResult[j]; - if (pRes->closed) { - assert(resultRowInterpolated(pRes, RESULT_ROW_START_INTERP) && resultRowInterpolated(pRes, RESULT_ROW_END_INTERP)); - continue; - } - - STimeWindow w = pRes->win; - ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, &w, masterScan, &pResult, groupId); - assert(ret == TSDB_CODE_SUCCESS && !resultRowInterpolated(pResult, RESULT_ROW_END_INTERP)); - - int32_t p = QUERY_IS_ASC_QUERY(pQuery)? 0:pDataBlockInfo->rows-1; - doRowwiseTimeWindowInterpolation(pRuntimeEnv, pDataBlock, *(TSKEY*) pRuntimeEnv->prevRow[0], -1, tsCols[0], p, w.ekey, RESULT_ROW_END_INTERP); - setResultRowInterpo(pResult, RESULT_ROW_END_INTERP); - setNotInterpoWindowKey(pRuntimeEnv->pCtx, pQuery->numOfOutput, RESULT_ROW_START_INTERP); - - doBlockwiseApplyFunctions(pRuntimeEnv, &w, startPos, 0, tsCols, pDataBlockInfo->rows, pDataBlock); - } - - // restore current time window - ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, &win, masterScan, &pResult, groupId); - assert (ret == TSDB_CODE_SUCCESS); - } - - // window start key interpolation - doWindowBorderInterpolation(pRuntimeEnv, pDataBlockInfo, pDataBlock, pResult, &win, pQuery->pos, forwardStep); - doBlockwiseApplyFunctions(pRuntimeEnv, &win, startPos, forwardStep, tsCols, pDataBlockInfo->rows, pDataBlock); - - STimeWindow nextWin = win; - while (1) { - int32_t prevEndPos = (forwardStep - 1) * step + startPos; - startPos = getNextQualifiedWindow(pQuery, &nextWin, pDataBlockInfo, tsCols, searchFn, prevEndPos); - if (startPos < 0) { - break; - } - - // null data, failed to allocate more memory buffer - int32_t code = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, &nextWin, masterScan, &pResult, groupId); - if (code != TSDB_CODE_SUCCESS || pResult == NULL) { - break; - } - - ekey = reviseWindowEkey(pQuery, &nextWin); - forwardStep = getNumOfRowsInTimeWindow(pQuery, pDataBlockInfo, tsCols, startPos, ekey, searchFn, true); - - // window start(end) key interpolation - doWindowBorderInterpolation(pRuntimeEnv, pDataBlockInfo, pDataBlock, pResult, &nextWin, startPos, forwardStep); - doBlockwiseApplyFunctions(pRuntimeEnv, &nextWin, startPos, forwardStep, tsCols, pDataBlockInfo->rows, pDataBlock); - } - - } else { - /* - * the sqlfunctionCtx parameters should be set done before all functions are invoked, - * since the selectivity + tag_prj query needs all parameters been set done. - * tag_prj function are changed to be TSDB_FUNC_TAG_DUMMY - */ - for (int32_t k = 0; k < pQuery->numOfOutput; ++k) { - int32_t functionId = pQuery->pExpr1[k].base.functionId; - if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) { - pCtx[k].startTs = pQuery->window.skey; - aAggs[functionId].xFunction(&pCtx[k]); - } - } - } - - _end: - if (pQuery->timeWindowInterpo) { - int32_t rowIndex = QUERY_IS_ASC_QUERY(pQuery)? pDataBlockInfo->rows-1:0; - saveDataBlockLastRow(pRuntimeEnv, pDataBlockInfo, pDataBlock, rowIndex); - } -} -#endif - static int32_t setGroupResultOutputBuf_rv(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo* pResultRowInfo, SQLFunctionCtx * pCtx, int32_t numOfCols, char *pData, int16_t type, int16_t bytes, int32_t groupIndex, int32_t* rowCellInfoOffset) { SDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf; @@ -1754,315 +1565,6 @@ static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx return true; } - -#if 0 -static UNUSED_FUNC void setTimeWindowSKeyInterp(SQueryRuntimeEnv* pRuntimeEnv, SArray* pDataBlock, TSKEY prevTs, int32_t prevRowIndex, TSKEY ts, int32_t offset, SResultRow* pResult, STimeWindow* win) { - SQuery* pQuery = pRuntimeEnv->pQuery; - - bool done = resultRowInterpolated(pResult, RESULT_ROW_START_INTERP); - if (!done) { - TSKEY key = QUERY_IS_ASC_QUERY(pQuery)? win->skey:win->ekey; - if (key == ts) { - setResultRowInterpo(pResult, RESULT_ROW_START_INTERP); - } else if (prevTs != INT64_MIN && ((QUERY_IS_ASC_QUERY(pQuery) && prevTs < key) || (!QUERY_IS_ASC_QUERY(pQuery) && prevTs > key))) { - doRowwiseTimeWindowInterpolation(pRuntimeEnv, pDataBlock, prevTs, prevRowIndex, ts, offset, key, RESULT_ROW_START_INTERP); - setResultRowInterpo(pResult, RESULT_ROW_START_INTERP); - } else { - setNotInterpoWindowKey(pRuntimeEnv->pCtx, pQuery->numOfOutput, RESULT_ROW_START_INTERP); - } - - setNotInterpoWindowKey(pRuntimeEnv->pCtx, pQuery->numOfOutput, RESULT_ROW_END_INTERP); - for (int32_t k = 0; k < pQuery->numOfOutput; ++k) { - pRuntimeEnv->pCtx[k].size = 1; - } - } else { - setNotInterpoWindowKey(pRuntimeEnv->pCtx, pQuery->numOfOutput, RESULT_ROW_START_INTERP); - } -} - -static UNUSED_FUNC void setTimeWindowEKeyInterp(SQueryRuntimeEnv* pRuntimeEnv, SArray* pDataBlock, TSKEY prevTs, int32_t prevRowIndex, TSKEY ts, int32_t offset, SResultRow* pResult, STimeWindow* win) { - SQuery* pQuery = pRuntimeEnv->pQuery; - - TSKEY key = QUERY_IS_ASC_QUERY(pQuery)? win->ekey:win->skey; - doRowwiseTimeWindowInterpolation(pRuntimeEnv, pDataBlock, prevTs, prevRowIndex, ts, offset, key, RESULT_ROW_END_INTERP); - setResultRowInterpo(pResult, RESULT_ROW_END_INTERP); - - setNotInterpoWindowKey(pRuntimeEnv->pCtx, pQuery->numOfOutput, RESULT_ROW_START_INTERP); - for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { - pRuntimeEnv->pCtx[i].size = 0; - } -} - - -static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pStatis, SDataBlockInfo *pDataBlockInfo, - SResultRowInfo *pWindowResInfo, SArray *pDataBlock) { - SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx; - bool masterScan = IS_MASTER_SCAN(pRuntimeEnv); - - SQuery *pQuery = pRuntimeEnv->pQuery; - STableQueryInfo* item = pQuery->current; - - int64_t groupId = item->groupIndex; - - SColumnInfoData* pColumnInfoData = (SColumnInfoData *)taosArrayGet(pDataBlock, 0); - - TSKEY *tsCols = (pColumnInfoData->info.type == TSDB_DATA_TYPE_TIMESTAMP)? (TSKEY*) pColumnInfoData->pData:NULL; - bool groupbyColumnValue = pQuery->groupbyColumn; - - int16_t type = 0; - int16_t bytes = 0; - - char *groupbyColumnData = NULL; - if (groupbyColumnValue) { - groupbyColumnData = getGroupbyColumnData(pQuery, &type, &bytes, pDataBlock); - } - - SQInfo* pQInfo = GET_QINFO_ADDR(pRuntimeEnv); - for (int32_t k = 0; k < pQuery->numOfOutput; ++k) { - char *dataBlock = getDataBlock(pQuery, &pRuntimeEnv->sasArray[k], k, pDataBlockInfo->rows, pDataBlock); - setExecParams(pQuery, &pCtx[k], dataBlock, tsCols, pDataBlockInfo, pStatis, &pQuery->pExpr1[k]); - pCtx[k].size = 1; - } - - // set the input column data - for (int32_t k = 0; k < pQuery->numOfFilterCols; ++k) { - SSingleColumnFilterInfo *pFilterInfo = &pQuery->pFilterInfo[k]; - pFilterInfo->pData = getDataBlockImpl(pDataBlock, pFilterInfo->info.colId); - assert(pFilterInfo->pData != NULL); - } - - int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); - - // from top to bottom in desc - // from bottom to top in asc order - if (pRuntimeEnv->pTsBuf != NULL) { - qDebug("QInfo:%p process data rows, numOfRows:%d, query order:%d, ts comp order:%d", pQInfo, pDataBlockInfo->rows, - pQuery->order.order, pRuntimeEnv->pTsBuf->cur.order); - } - - int32_t offset = -1; - TSKEY prevTs = *(TSKEY*) pRuntimeEnv->prevRow[0]; - int32_t prevRowIndex = -1; - - for (int32_t j = 0; j < pDataBlockInfo->rows; ++j) { - offset = GET_COL_DATA_POS(pQuery, j, step); - - if (pRuntimeEnv->pTsBuf != NULL) { - int32_t ret = doTSJoinFilter(pRuntimeEnv, offset); - if (ret == TS_JOIN_TAG_NOT_EQUALS) { - break; - } else if (ret == TS_JOIN_TS_NOT_EQUALS) { - continue; - } else { - assert(ret == TS_JOIN_TS_EQUAL); - } - } - - if (pQuery->numOfFilterCols > 0 && (!doFilterData(pQuery, offset))) { - continue; - } - - // interval window query, decide the time window according to the primary timestamp - if (QUERY_IS_INTERVAL_QUERY(pQuery)) { - int32_t prevWindowIndex = curTimeWindowIndex(pWindowResInfo); - int64_t ts = tsCols[offset]; - - STimeWindow win = getActiveTimeWindow(pWindowResInfo, ts, pQuery); - - SResultRow* pResult = NULL; - int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, &win, masterScan, &pResult, groupId); - if (ret != TSDB_CODE_SUCCESS || pResult == NULL) { // null data, too many state code - goto _end; - } - - // window start key interpolation - if (pQuery->timeWindowInterpo) { - // check for the time window end time interpolation - int32_t curIndex = curTimeWindowIndex(pWindowResInfo); - if (prevWindowIndex != -1 && prevWindowIndex < curIndex) { - for (int32_t k = prevWindowIndex; k < curIndex; ++k) { - SResultRow *pRes = pWindowResInfo->pResult[k]; - if (pRes->closed) { - assert(resultRowInterpolated(pResult, RESULT_ROW_START_INTERP) && resultRowInterpolated(pResult, RESULT_ROW_END_INTERP)); - continue; - } - - ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, &pRes->win, masterScan, &pResult, groupId); - assert(ret == TSDB_CODE_SUCCESS && !resultRowInterpolated(pResult, RESULT_ROW_END_INTERP)); - - setTimeWindowEKeyInterp(pRuntimeEnv, pDataBlock, prevTs, prevRowIndex, ts, offset, pResult, &pRes->win); - doRowwiseApplyFunctions(pRuntimeEnv, &pRes->win, offset); - } - - // restore current time window - ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, &win, masterScan, &pResult, groupId); - if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code - continue; - } - } - - setTimeWindowSKeyInterp(pRuntimeEnv, pDataBlock, prevTs, prevRowIndex, ts, offset, pResult, &win); - } - - doRowwiseApplyFunctions(pRuntimeEnv, &win, offset); - int32_t index = pWindowResInfo->curIndex; - - STimeWindow nextWin = win; - while (1) { - getNextTimeWindow(pQuery, &nextWin); - if ((nextWin.skey > pQuery->window.ekey && QUERY_IS_ASC_QUERY(pQuery)) || - (nextWin.ekey < pQuery->window.ekey && !QUERY_IS_ASC_QUERY(pQuery))) { - break; - } - - if (ts < nextWin.skey || ts > nextWin.ekey) { - break; - } - - // null data, failed to allocate more memory buffer - int32_t code = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, &nextWin, masterScan, &pResult, groupId); - if (code != TSDB_CODE_SUCCESS || pResult == NULL) { - break; - } - - setTimeWindowSKeyInterp(pRuntimeEnv, pDataBlock, prevTs, prevRowIndex, ts, offset, pResult, &nextWin); - doRowwiseApplyFunctions(pRuntimeEnv, &nextWin, offset); - } - - // restore the index, add the result row will move the index - pWindowResInfo->curIndex = index; - } else { // other queries - // decide which group this rows belongs to according to current state value - char* val = NULL; - if (groupbyColumnValue) { - val = groupbyColumnData + bytes * offset; - if (isNull(val, type)) { // ignore the null value - continue; - } - - int32_t ret = setGroupResultOutputBuf(pRuntimeEnv, &pRuntimeEnv->resultRowInfo, val, type, bytes, item->groupIndex); - if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code - longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR); - } - } - - if (pQuery->stabledev) { - for (int32_t k = 0; k < pQuery->numOfOutput; ++k) { - int32_t functionId = pQuery->pExpr1[k].base.functionId; - if (functionId != TSDB_FUNC_STDDEV_DST) { - continue; - } - - pRuntimeEnv->pCtx[k].param[0].arr = NULL; - pRuntimeEnv->pCtx[k].param[0].nType = TSDB_DATA_TYPE_INT; // avoid freeing the memory by setting the type to be int - - // todo opt perf - int32_t numOfGroup = (int32_t)taosArrayGetSize(pRuntimeEnv->prevResult); - for (int32_t i = 0; i < numOfGroup; ++i) { - SInterResult *p = taosArrayGet(pRuntimeEnv->prevResult, i); - if (memcmp(p->tags, val, bytes) == 0) { - int32_t numOfCols = (int32_t)taosArrayGetSize(p->pResult); - for (int32_t f = 0; f < numOfCols; ++f) { - SStddevInterResult *pres = taosArrayGet(p->pResult, f); - if (pres->colId == pQuery->pExpr1[k].base.colInfo.colId) { - pRuntimeEnv->pCtx[k].param[0].arr = pres->pResult; - break; - } - } - } - } - } - } - - for (int32_t k = 0; k < pQuery->numOfOutput; ++k) { - int32_t functionId = pQuery->pExpr1[k].base.functionId; - if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) { - aAggs[functionId].xFunctionF(&pCtx[k], offset); - } - } - } - - prevTs = tsCols[offset]; - prevRowIndex = offset; - - if (pRuntimeEnv->pTsBuf != NULL) { - // if timestamp filter list is empty, quit current query - if (!tsBufNextPos(pRuntimeEnv->pTsBuf)) { - setQueryStatus(pQuery, QUERY_COMPLETED); - break; - } - } - } - - _end: - assert(offset >= 0 && tsCols != NULL); - if (prevTs != INT64_MIN && prevTs != *(int64_t*)pRuntimeEnv->prevRow[0]) { - assert(prevRowIndex >= 0); - item->lastKey = prevTs + step; - } - - // In case of all rows in current block are not qualified - if (pQuery->timeWindowInterpo && prevRowIndex != -1) { - saveDataBlockLastRow(pRuntimeEnv, pDataBlockInfo, pDataBlock, prevRowIndex); - } - - if (pRuntimeEnv->pTsBuf != NULL) { - item->cur = tsBufGetCursor(pRuntimeEnv->pTsBuf); - } -} - - - -static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pDataBlockInfo, - SDataStatis *pStatis, __block_search_fn_t searchFn, SArray *pDataBlock) { - SQuery *pQuery = pRuntimeEnv->pQuery; - - STableQueryInfo* pTableQueryInfo = pQuery->current; - SResultRowInfo* pResultRowInfo = &pRuntimeEnv->resultRowInfo; - -// if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTsBuf != NULL || pQuery->groupbyColumn) { -// rowwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pResultRowInfo, pDataBlock); -// } else { -// blockwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pResultRowInfo, searchFn, pDataBlock); -// } - - // update the lastkey of current table for projection/aggregation query - TSKEY lastKey = QUERY_IS_ASC_QUERY(pQuery) ? pDataBlockInfo->window.ekey : pDataBlockInfo->window.skey; - pTableQueryInfo->lastKey = lastKey + GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); - - // interval query with limit applied - int32_t numOfRes = 0; - if (QUERY_IS_INTERVAL_QUERY(pQuery) || pQuery->groupbyColumn) { - numOfRes = pResultRowInfo->size; - updateResultRowIndex(pResultRowInfo, pTableQueryInfo, QUERY_IS_ASC_QUERY(pQuery), pQuery->timeWindowInterpo); - } else { // projection query - numOfRes = (int32_t) getNumOfResult(pRuntimeEnv); - - // update the number of output result - if (numOfRes > 0 && pQuery->checkResultBuf == 1) { - assert(numOfRes >= pRuntimeEnv->resultInfo.rows); - pRuntimeEnv->resultInfo.rows = numOfRes; - - if (numOfRes >= pRuntimeEnv->resultInfo.threshold) { - setQueryStatus(pQuery, QUERY_RESBUF_FULL); - } - - if ((pQuery->limit.limit >= 0) && (pQuery->limit.limit + pQuery->limit.offset) <= numOfRes) { - setQueryStatus(pQuery, QUERY_COMPLETED); - } - - if (((pTableQueryInfo->lastKey > pTableQueryInfo->win.ekey) && QUERY_IS_ASC_QUERY(pQuery)) || - ((pTableQueryInfo->lastKey < pTableQueryInfo->win.ekey) && (!QUERY_IS_ASC_QUERY(pQuery)))) { - setQueryStatus(pQuery, QUERY_COMPLETED); - } - } - } - - return numOfRes; -} - -#endif - void setBlockStatisInfo(SQLFunctionCtx *pCtx, SSDataBlock* pSDataBlock, SColIndex* pColIndex) { SDataStatis *pStatis = NULL; @@ -3284,108 +2786,6 @@ int32_t binarySearchForKey(char *pValue, int num, TSKEY key, int order) { return midPos; } - - -#if 0 - -static UNUSED_FUNC void doSetInitialTimewindow(SQueryRuntimeEnv* pRuntimeEnv, SDataBlockInfo* pBlockInfo) { - SQuery* pQuery = pRuntimeEnv->pQuery; - - if (QUERY_IS_INTERVAL_QUERY(pQuery) && pRuntimeEnv->resultRowInfo.prevSKey == TSKEY_INITIAL_VAL) { - STimeWindow w = TSWINDOW_INITIALIZER; - SResultRowInfo *pWindowResInfo = &pRuntimeEnv->resultRowInfo; - - if (QUERY_IS_ASC_QUERY(pQuery)) { - getAlignQueryTimeWindow(pQuery, pBlockInfo->window.skey, pBlockInfo->window.skey, pQuery->window.ekey, &w); - pWindowResInfo->prevSKey = w.skey; - } else { // the start position of the first time window in the endpoint that spreads beyond the queried last timestamp - getAlignQueryTimeWindow(pQuery, pBlockInfo->window.ekey, pQuery->window.ekey, pBlockInfo->window.ekey, &w); - pWindowResInfo->prevSKey = w.skey; - } - } -} - - -static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { - SQuery *pQuery = pRuntimeEnv->pQuery; - STableQueryInfo* pTableQueryInfo = pQuery->current; - - SQInfo* pQInfo = GET_QINFO_ADDR(pRuntimeEnv); - SQueryCostInfo* summary = &pQInfo->summary; - - qDebug("QInfo:%p query start, qrange:%" PRId64 "-%" PRId64 ", lastkey:%" PRId64 ", order:%d", - GET_QINFO_ADDR(pRuntimeEnv), pTableQueryInfo->win.skey, pTableQueryInfo->win.ekey, pTableQueryInfo->lastKey, - pQuery->order.order); - - TsdbQueryHandleT pQueryHandle = IS_MASTER_SCAN(pRuntimeEnv)? pRuntimeEnv->pQueryHandle : pRuntimeEnv->pSecQueryHandle; - int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); - - SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER; - while (tsdbNextDataBlock(pQueryHandle)) { - summary->totalBlocks += 1; - - if (IS_MASTER_SCAN(pRuntimeEnv)) { - pQuery->numOfCheckedBlocks += 1; - } - - if (isQueryKilled(GET_QINFO_ADDR(pRuntimeEnv))) { - longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED); - } - - tsdbRetrieveDataBlockInfo(pQueryHandle, &blockInfo); - doSetInitialTimewindow(pRuntimeEnv, &blockInfo); - - // in case of prj/diff query, ensure the output buffer is sufficient to accommodate the results of current block - ensureOutputBuffer(pRuntimeEnv, blockInfo.rows); - - SDataStatis *pStatis = NULL; - SArray * pDataBlock = NULL; - uint32_t status = 0; - - int32_t ret = loadDataBlockOnDemand(pRuntimeEnv, &pRuntimeEnv->resultRowInfo, pQueryHandle, &blockInfo, &pStatis, &pDataBlock, &status); - if (ret != TSDB_CODE_SUCCESS) { - break; - } - - if (status == BLK_DATA_DISCARD) { - pQuery->current->lastKey = - QUERY_IS_ASC_QUERY(pQuery) ? blockInfo.window.ekey + step : blockInfo.window.skey + step; - continue; - } - - // query start position can not move into tableApplyFunctionsOnBlock due to limit/offset condition - pQuery->pos = QUERY_IS_ASC_QUERY(pQuery)? 0 : blockInfo.rows - 1; - int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, &blockInfo, pStatis, binarySearchForKey, pDataBlock); - - summary->totalRows += blockInfo.rows; - qDebug("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", numOfRows:%d, numOfRes:%d, lastKey:%"PRId64, GET_QINFO_ADDR(pRuntimeEnv), - blockInfo.window.skey, blockInfo.window.ekey, blockInfo.rows, numOfRes, pQuery->current->lastKey); - - // while the output buffer is full or limit/offset is applied, query may be paused here - if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL | QUERY_COMPLETED)) { - break; - } - } - - if (terrno != TSDB_CODE_SUCCESS) { - longjmp(pRuntimeEnv->env, terrno); - } - - // if the result buffer is not full, set the query complete - if (!Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)) { - setQueryStatus(pQuery, QUERY_COMPLETED); - } - - if (QUERY_IS_INTERVAL_QUERY(pQuery)) { - closeAllResultRows(&pRuntimeEnv->resultRowInfo); - pRuntimeEnv->resultRowInfo.curIndex = pRuntimeEnv->resultRowInfo.size - 1; // point to the last time window - } - - return 0; -} - -#endif - /* * set tag value in SQLFunctionCtx * e.g.,tag information into input buffer @@ -3732,45 +3132,11 @@ static void setupQueryRangeForReverseScan(SQueryRuntimeEnv* pRuntimeEnv) { } } - void switchCtxOrder(SQLFunctionCtx* pCtx, int32_t numOfOutput) { for (int32_t i = 0; i < numOfOutput; ++i) { SWITCH_ORDER(pCtx[i].order); } } -#if 0 -void resetDefaultResInfoOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) { - SQuery *pQuery = pRuntimeEnv->pQuery; - - int32_t tid = 0; - int64_t uid = 0; - SResultRow* pRow = doPrepareResultRowFromKey(pRuntimeEnv, &pRuntimeEnv->resultRowInfo, (char *)&tid, sizeof(tid), true, uid); - - for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { - SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i]; - pCtx->pOutput = pQuery->sdata[i]->data; - - /* - * set the output buffer information and intermediate buffer - * not all queries require the interResultBuf, such as COUNT/TAGPRJ/PRJ/TAG etc. - */ - SResultRowCellInfo* pCellInfo = getResultCell(pRuntimeEnv, pRow, i); - RESET_RESULT_INFO(pCellInfo); - pCtx->resultInfo = pCellInfo; - - // set the timestamp output buffer for top/bottom/diff query - int32_t functionId = pQuery->pExpr1[i].base.functionId; - if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF) { - pCtx->ptsOutputBuf = pRuntimeEnv->pCtx[0].pOutput; - } - - memset(pQuery->sdata[i]->data, 0, (size_t)(pQuery->pExpr1[i].bytes * pRuntimeEnv->resultInfo.capacity)); - } - - initCtxOutputBuf(pRuntimeEnv, pRuntimeEnv->pCtx); -} - -#endif int32_t initResultRow(SResultRow *pResultRow) { pResultRow->pCellInfo = (SResultRowCellInfo*)((char*)pResultRow + sizeof(SResultRow)); @@ -3841,124 +3207,6 @@ void updateOutputBuf(SArithOperatorInfo* pInfo, int32_t numOfInputRows) { } } -#if 0 -void forwardCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, int64_t output) { - SQuery *pQuery = pRuntimeEnv->pQuery; - - // reset the execution contexts - for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { - int32_t functionId = pQuery->pExpr1[j].base.functionId; - assert(functionId != TSDB_FUNC_DIFF); - - // set next output position - if (IS_OUTER_FORWARD(aAggs[functionId].status)) { - pRuntimeEnv->pCtx[j].pOutput += pRuntimeEnv->pCtx[j].outputBytes * output; - } - - if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM) { - /* - * NOTE: for top/bottom query, the value of first column of output (timestamp) are assigned - * in the procedure of top/bottom routine - * the output buffer in top/bottom routine is ptsOutputBuf, so we need to forward the output buffer - * - * diff function is handled in multi-output function - */ - pRuntimeEnv->pCtx[j].ptsOutputBuf = (char*)pRuntimeEnv->pCtx[j].ptsOutputBuf + TSDB_KEYSIZE * output; - } - - RESET_RESULT_INFO(pRuntimeEnv->pCtx[j].resultInfo); - } -} - -void initCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pCtx) { - SQuery *pQuery = pRuntimeEnv->pQuery; - - for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { - pCtx[j].currentStage = 0; - - SResultRowCellInfo* pResInfo = GET_RES_INFO(&pCtx[j]); - if (pResInfo->initialized) { - continue; - } - - aAggs[pCtx[j].functionId].init(&pCtx[j]); - } -} - -void skipResults(SQueryRuntimeEnv *pRuntimeEnv) { - SQuery *pQuery = pRuntimeEnv->pQuery; - if (pRuntimeEnv->resultInfo.rows == 0 || pQuery->limit.offset == 0) { - return; - } - - if (pRuntimeEnv->resultInfo.rows <= pQuery->limit.offset) { - qDebug("QInfo:%p skip rows:%" PRId64 ", new offset:%" PRIu64, GET_QINFO_ADDR(pRuntimeEnv), pRuntimeEnv->resultInfo.rows, - pQuery->limit.offset - pRuntimeEnv->resultInfo.rows); - - pQuery->limit.offset -= pRuntimeEnv->resultInfo.rows; - pRuntimeEnv->resultInfo.rows = 0; - - resetDefaultResInfoOutputBuf(pRuntimeEnv); - - // clear the buffer full flag if exists - CLEAR_QUERY_STATUS(pQuery, QUERY_RESBUF_FULL); - } else { - int64_t numOfSkip = pQuery->limit.offset; - pRuntimeEnv->resultInfo.rows -= numOfSkip; - pQuery->limit.offset = 0; - - qDebug("QInfo:%p skip row:%"PRId64", new offset:%d, numOfRows remain:%" PRIu64, GET_QINFO_ADDR(pRuntimeEnv), numOfSkip, - 0, pRuntimeEnv->resultInfo.rows); - - for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { - int32_t functionId = pQuery->pExpr1[i].base.functionId; - int32_t bytes = pRuntimeEnv->pCtx[i].outputBytes; - - memmove(pQuery->sdata[i]->data, (char*)pQuery->sdata[i]->data + bytes * numOfSkip, (size_t)(pRuntimeEnv->resultInfo.rows * bytes)); - pRuntimeEnv->pCtx[i].pOutput = ((char*) pQuery->sdata[i]->data) + pRuntimeEnv->resultInfo.rows * bytes; - - if (functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM) { - pRuntimeEnv->pCtx[i].ptsOutputBuf = pRuntimeEnv->pCtx[0].pOutput; - } - } - - updateNumOfResult(pRuntimeEnv, (int32_t)pRuntimeEnv->resultInfo.rows); - } -} - -void prepareRepeatTableScan(SQueryRuntimeEnv* pRuntimeEnv) { - SQuery *pQuery = pRuntimeEnv->pQuery; - - if (pQuery->groupbyColumn || QUERY_IS_INTERVAL_QUERY(pQuery)) { - // for each group result, call the finalize function for each column - SResultRowInfo *pWindowResInfo = &pRuntimeEnv->resultRowInfo; - - for (int32_t i = 0; i < pWindowResInfo->size; ++i) { - SResultRow *pResult = getResultRow(pWindowResInfo, i); - - setResultOutputBuf(pRuntimeEnv, pResult); - for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { - SQLFunctionCtx* pCtx = &pRuntimeEnv->pCtx[j]; - if (pCtx->functionId == TSDB_FUNC_TS) { // ignore more table - continue; - } - - aAggs[pCtx->functionId].xNextStep(pCtx); - } - } - } else { - for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { - SQLFunctionCtx* pCtx = &pRuntimeEnv->pCtx[j]; - if (pCtx->functionId == TSDB_FUNC_TS) { - continue; - } - - aAggs[pCtx->functionId].xNextStep(pCtx); - } - } -} -#endif - void initCtxOutputBuf_rv(SQLFunctionCtx* pCtx, int32_t size) { for (int32_t j = 0; j < size; ++j) { SResultRowCellInfo* pResInfo = GET_RES_INFO(&pCtx[j]); @@ -3970,7 +3218,6 @@ void initCtxOutputBuf_rv(SQLFunctionCtx* pCtx, int32_t size) { } } - void setQueryStatus(SQuery *pQuery, int8_t status) { if (status == QUERY_NOT_COMPLETED) { pQuery->status = status; @@ -3981,134 +3228,6 @@ void setQueryStatus(SQuery *pQuery, int8_t status) { } } -#if 0 -bool needRepeatScan(SQueryRuntimeEnv *pRuntimeEnv) { - SQuery *pQuery = pRuntimeEnv->pQuery; - - bool toContinue = false; - if (pQuery->groupbyColumn || QUERY_IS_INTERVAL_QUERY(pQuery)) { - // for each group result, call the finalize function for each column - SResultRowInfo *pWindowResInfo = &pRuntimeEnv->resultRowInfo; - - for (int32_t i = 0; i < pWindowResInfo->size; ++i) { - SResultRow *pResult = getResultRow(pWindowResInfo, i); - - setResultOutputBuf(pRuntimeEnv, pResult); - for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { - int16_t functId = pQuery->pExpr1[j].base.functionId; - if (functId == TSDB_FUNC_TS) { - continue; - } - - aAggs[functId].xNextStep(&pRuntimeEnv->pCtx[j]); - SResultRowCellInfo *pResInfo = GET_RES_INFO(&pRuntimeEnv->pCtx[j]); - - toContinue |= (!pResInfo->complete); - } - } - } else { - for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { - int16_t functId = pQuery->pExpr1[j].base.functionId; - if (functId == TSDB_FUNC_TS) { - continue; - } - - aAggs[functId].xNextStep(&pRuntimeEnv->pCtx[j]); - SResultRowCellInfo *pResInfo = GET_RES_INFO(&pRuntimeEnv->pCtx[j]); - - toContinue |= (!pResInfo->complete); - } - } - - return toContinue; -} - -static UNUSED_FUNC SQueryStatusInfo getQueryStatusInfo(SQueryRuntimeEnv *pRuntimeEnv, TSKEY start) { - SQuery *pQuery = pRuntimeEnv->pQuery; - STableQueryInfo* pTableQueryInfo = pQuery->current; - - assert((start <= pTableQueryInfo->lastKey && QUERY_IS_ASC_QUERY(pQuery)) || - (start >= pTableQueryInfo->lastKey && !QUERY_IS_ASC_QUERY(pQuery))); - - SQueryStatusInfo info = { - .status = pQuery->status, - .windowIndex = pRuntimeEnv->resultRowInfo.curIndex, - .lastKey = start, - }; - - TIME_WINDOW_COPY(info.w, pQuery->window); - return info; -} - - -static void setEnvBeforeReverseScan(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatusInfo *pStatus) { - SQInfo *pQInfo = GET_QINFO_ADDR(pRuntimeEnv); - SQuery *pQuery = pRuntimeEnv->pQuery; - - pStatus->cur = tsBufGetCursor(pRuntimeEnv->pTsBuf); // save the cursor - if (pRuntimeEnv->pTsBuf) { - SWITCH_ORDER(pRuntimeEnv->pTsBuf->cur.order); - bool ret = tsBufNextPos(pRuntimeEnv->pTsBuf); - assert(ret); - } - - // reverse order time range - SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY); - SWITCH_ORDER(pQuery->order.order); - - if (QUERY_IS_ASC_QUERY(pQuery)) { - assert(pQuery->window.skey <= pQuery->window.ekey); - } else { - assert(pQuery->window.skey >= pQuery->window.ekey); - } - - SET_REVERSE_SCAN_FLAG(pRuntimeEnv); - STsdbQueryCond cond = createTsdbQueryCond(pQuery, &pQuery->window); - - setQueryStatus(pQuery, QUERY_NOT_COMPLETED); - switchCtxOrder(pRuntimeEnv); - disableFuncInReverseScan(pRuntimeEnv); - setupQueryRangeForReverseScan(pRuntimeEnv); - - // clean unused handle - if (pRuntimeEnv->pSecQueryHandle != NULL) { - tsdbCleanupQueryHandle(pRuntimeEnv->pSecQueryHandle); - } - - pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQuery->tsdb, &cond, &pQuery->tableGroupInfo, pQInfo, &pQuery->memRef); - if (pRuntimeEnv->pSecQueryHandle == NULL) { - longjmp(pRuntimeEnv->env, terrno); - } -} - - - -void disableFuncInReverseScan(SQueryRuntimeEnv* pRuntimeEnv, SResultRowInfo *pWindowResInfo, SQLFunctionCtx* pCtx, - int32_t numOfOutput) { - SQuery *pQuery = pRuntimeEnv->pQuery; - int32_t order = pQuery->order.order; - - // group by normal columns and interval query on normal table - if (pQuery->groupbyColumn || QUERY_IS_INTERVAL_QUERY(pQuery)) { -// disableFuncInReverseScanImpl(pRuntimeEnv, pWindowResInfo, order, numOfOutput); - } else { // for simple result of table query, - for (int32_t j = 0; j < numOfOutput; ++j) { // todo refactor - int32_t functId = pCtx[j].functionId; - if (pCtx[j].resultInfo == NULL) { - continue; // resultInfo is NULL, means no data checked in previous scan - } - - if (((functId == TSDB_FUNC_FIRST || functId == TSDB_FUNC_FIRST_DST) && order == TSDB_ORDER_ASC) || - ((functId == TSDB_FUNC_LAST || functId == TSDB_FUNC_LAST_DST) && order == TSDB_ORDER_DESC)) { - pCtx[j].resultInfo->complete = false; - } else if (functId != TSDB_FUNC_TS && functId != TSDB_FUNC_TAG) { - pCtx[j].resultInfo->complete = true; - } - } - } -} -#endif - static void setEnvBeforeReverseScan_rv(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo, SQLFunctionCtx* pCtx, int32_t numOfOutput) { SQuery *pQuery = pRuntimeEnv->pQuery; @@ -4130,172 +3249,6 @@ static void setEnvBeforeReverseScan_rv(SQueryRuntimeEnv *pRuntimeEnv, SResultRow setupQueryRangeForReverseScan(pRuntimeEnv); } -#if 0 -static UNUSED_FUNC void clearEnvAfterReverseScan(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatusInfo *pStatus) { - SQuery *pQuery = pRuntimeEnv->pQuery; - STableQueryInfo* pTableQueryInfo = pQuery->current; - - SWITCH_ORDER(pQuery->order.order); - switchCtxOrder(pRuntimeEnv); - - tsBufSetCursor(pRuntimeEnv->pTsBuf, &pStatus->cur); - if (pRuntimeEnv->pTsBuf) { - pRuntimeEnv->pTsBuf->cur.order = pQuery->order.order; - } - - SET_MASTER_SCAN_FLAG(pRuntimeEnv); - - // update the pQuery->window.skey and pQuery->window.ekey to limit the scan scope of sliding query during reverse scan - pTableQueryInfo->lastKey = pStatus->lastKey; - pQuery->status = pStatus->status; - - pTableQueryInfo->win = pStatus->w; - pQuery->window = pTableQueryInfo->win; -} - -static UNUSED_FUNC void restoreTimeWindow(STableGroupInfo* pTableGroupInfo, STsdbQueryCond* pCond) { - assert(pTableGroupInfo->numOfTables == 1); - SArray* pTableKeyGroup = taosArrayGetP(pTableGroupInfo->pGroupList, 0); - STableKeyInfo* pKeyInfo = taosArrayGet(pTableKeyGroup, 0); - pKeyInfo->lastKey = pCond->twindow.skey; -} - -static UNUSED_FUNC void handleInterpolationQuery(SQInfo* pQInfo) { - SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv; - - SQuery *pQuery = pRuntimeEnv->pQuery; - if (pQuery->numOfCheckedBlocks > 0 || !isPointInterpoQuery(pQuery)) { - return; - } - - SArray *prev = tsdbGetExternalRow(pRuntimeEnv->pQueryHandle, &pQuery->memRef, TSDB_PREV_ROW); - SArray *next = tsdbGetExternalRow(pRuntimeEnv->pQueryHandle, &pQuery->memRef, TSDB_NEXT_ROW); - if (prev == NULL || next == NULL) { - return; - } - - // setup the pCtx->start/end info and calculate the interpolation value - SColumnInfoData *startTs = taosArrayGet(prev, 0); - SColumnInfoData *endTs = taosArrayGet(next, 0); - - for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { - SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i]; - - int32_t functionId = pQuery->pExpr1[i].base.functionId; - SColIndex *pColIndex = &pQuery->pExpr1[i].base.colInfo; - - if (!TSDB_COL_IS_NORMAL_COL(pColIndex->flag)) { - aAggs[functionId].xFunction(pCtx); - continue; - } - - SColumnInfoData *p = taosArrayGet(prev, pColIndex->colIndex); - SColumnInfoData *n = taosArrayGet(next, pColIndex->colIndex); - - assert(p->info.colId == pColIndex->colId); - - pCtx->start.key = *(TSKEY *)startTs->pData; - pCtx->end.key = *(TSKEY *)endTs->pData; - - if (p->info.type != TSDB_DATA_TYPE_BINARY && p->info.type != TSDB_DATA_TYPE_NCHAR) { - GET_TYPED_DATA(pCtx->start.val, double, p->info.type, p->pData); - GET_TYPED_DATA(pCtx->end.val, double, n->info.type, n->pData); - } else { // string pointer - pCtx->start.ptr = p->pData; - pCtx->end.ptr = n->pData; - } - - pCtx->param[2].i64 = (int8_t)pQuery->fillType; - pCtx->startTs = pQuery->window.skey; - if (pQuery->fillVal != NULL) { - if (isNull((const char *)&pQuery->fillVal[i], pCtx->inputType)) { - pCtx->param[1].nType = TSDB_DATA_TYPE_NULL; - } else { // todo refactor, tVariantCreateFromBinary should handle the NULL value - if (pCtx->inputType != TSDB_DATA_TYPE_BINARY && pCtx->inputType != TSDB_DATA_TYPE_NCHAR) { - tVariantCreateFromBinary(&pCtx->param[1], (char *)&pQuery->fillVal[i], pCtx->inputBytes, pCtx->inputType); - } - } - } - - aAggs[functionId].xFunction(pCtx); - } -} - - -void scanOneTableDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, TSKEY start) { - SQInfo *pQInfo = (SQInfo *) GET_QINFO_ADDR(pRuntimeEnv); - SQuery *pQuery = pRuntimeEnv->pQuery; - STableQueryInfo *pTableQueryInfo = pQuery->current; - - setQueryStatus(pQuery, QUERY_NOT_COMPLETED); - - // store the start query position - SQueryStatusInfo qstatus = getQueryStatusInfo(pRuntimeEnv, start); - SET_MASTER_SCAN_FLAG(pRuntimeEnv); - - if (!pQuery->groupbyColumn && pQuery->hasTagResults) { - setTagVal(pRuntimeEnv, pTableQueryInfo->pTable); - } - - while (1) { -// doScanAllDataBlocks(pRuntimeEnv); - - if (pRuntimeEnv->scanFlag == MASTER_SCAN) { - qstatus.status = pQuery->status; - - // do nothing if no data blocks are found qualified during scan - if (qstatus.lastKey == pTableQueryInfo->lastKey) { - qDebug("QInfo:%p no results generated in this scan", pQInfo); - } - } - - if (!needRepeatScan(pRuntimeEnv)) { - // restore the status code and jump out of loop - if (pRuntimeEnv->scanFlag == REPEAT_SCAN) { - pQuery->status = qstatus.status; - } - - break; - } - - if (pRuntimeEnv->pSecQueryHandle != NULL) { - tsdbCleanupQueryHandle(pRuntimeEnv->pSecQueryHandle); - } - - STsdbQueryCond cond = createTsdbQueryCond(pQuery, &pQuery->window); - restoreTimeWindow(&pQuery->tableGroupInfo, &cond); - pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQuery->tsdb, &cond, &pQuery->tableGroupInfo, pQInfo, &pQuery->memRef); - if (pRuntimeEnv->pSecQueryHandle == NULL) { - longjmp(pRuntimeEnv->env, terrno); - } - - pRuntimeEnv->resultRowInfo.curIndex = qstatus.windowIndex; - setQueryStatus(pQuery, QUERY_NOT_COMPLETED); - pRuntimeEnv->scanFlag = REPEAT_SCAN; - - if (pRuntimeEnv->pTsBuf) { - bool ret = tsBufNextPos(pRuntimeEnv->pTsBuf); - assert(ret); - } - - qDebug("QInfo:%p start to repeat scan data blocks due to query func required, qrange:%"PRId64"-%"PRId64, pQInfo, - cond.twindow.skey, cond.twindow.ekey); - } - - if (needReverseScan(pQuery)) { - setEnvBeforeReverseScan(pRuntimeEnv, &qstatus); - - // reverse scan from current position - qDebug("QInfo:%p start to reverse scan", pQInfo); -// doScanAllDataBlocks(pRuntimeEnv); - - clearEnvAfterReverseScan(pRuntimeEnv, &qstatus); - } - - handleInterpolationQuery(pQInfo); -} -#endif - void finalizeQueryResult_rv(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset) { SQueryRuntimeEnv *pRuntimeEnv = pOperator->pRuntimeEnv; SQuery *pQuery = pRuntimeEnv->pQuery; @@ -4376,68 +3329,6 @@ void destroyTableQueryInfoImpl(STableQueryInfo *pTableQueryInfo) { cleanupResultRowInfo(&pTableQueryInfo->resInfo); } -#if 0 -/** - * set output buffer for different group - * @param pRuntimeEnv - * @param pDataBlockInfo - */ -void setExecutionContext(SQueryRuntimeEnv *pRuntimeEnv, int32_t groupIndex, TSKEY nextKey) { - STableQueryInfo *pTableQueryInfo = pRuntimeEnv->pQuery->current; - SResultRowInfo *pWindowResInfo = &pRuntimeEnv->resultRowInfo; - - // lastKey needs to be updated - pTableQueryInfo->lastKey = nextKey; - if (pRuntimeEnv->prevGroupId != INT32_MIN && pRuntimeEnv->prevGroupId == groupIndex) { - return; - } - - int64_t uid = 0; - SResultRow *pResultRow = doPrepareResultRowFromKey(pRuntimeEnv, pWindowResInfo, (char *)&groupIndex, - sizeof(groupIndex), true, uid); - assert (pResultRow != NULL); - - /* - * not assign result buffer yet, add new result buffer - * all group belong to one result set, and each group result has different group id so set the id to be one - */ - if (pResultRow->pageId == -1) { - if (addNewWindowResultBuf(pResultRow, pRuntimeEnv->pResultBuf, groupIndex, pRuntimeEnv->numOfRowsPerPage) != - TSDB_CODE_SUCCESS) { - return; - } - } - - // record the current active group id - pRuntimeEnv->prevGroupId = groupIndex; - setResultOutputBuf(pRuntimeEnv, pResultRow); - initCtxOutputBuf(pRuntimeEnv, pRuntimeEnv->pCtx); -} - -void setResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult) { - SQuery *pQuery = pRuntimeEnv->pQuery; - - // Note: pResult->pos[i]->num == 0, there is only fixed number of results for each group - tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pResult->pageId); - - for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { - SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i]; - pCtx->pOutput = getPosInResultPage(pRuntimeEnv, i, pResult, page); - - int32_t functionId = pQuery->pExpr1[i].base.functionId; - if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF) { - pCtx->ptsOutputBuf = pRuntimeEnv->pCtx[0].pOutput; - } - - /* - * set the output buffer information and intermediate buffer, - * not all queries require the interResultBuf, such as COUNT - */ - pCtx->resultInfo = getResultCell(pRuntimeEnv, pResult, i); - } -} -#endif - void setResultRowOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SQLFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowCellInfoOffset) { // Note: pResult->pos[i]->num == 0, there is only fixed number of results for each group @@ -4465,7 +3356,6 @@ void setResultRowOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pRe } } - void setExecutionContext_rv(SQueryRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *pInfo, int32_t numOfOutput, int32_t groupIndex, TSKEY nextKey) { STableQueryInfo *pTableQueryInfo = pRuntimeEnv->pQuery->current; @@ -4962,51 +3852,6 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data } } -#if 0 -int32_t doFillGapsInResults(SQueryRuntimeEnv* pRuntimeEnv, tFilePage **pDst) { - SQuery *pQuery = pRuntimeEnv->pQuery; - SFillInfo* pFillInfo = pRuntimeEnv->pFillInfo; - - while (1) { - int32_t ret = (int32_t)taosFillResultDataBlock(pFillInfo, (tFilePage**)pQuery->sdata, (int32_t)pRuntimeEnv->resultInfo.capacity); - - // todo apply limit output function - /* reached the start position of according to offset value, return immediately */ - if (pQuery->limit.offset == 0) { - qDebug("QInfo:%p initial numOfRows:%d, generate filled result:%d rows", pRuntimeEnv->qinfo, pFillInfo->numOfRows, ret); - return ret; - } - - if (pQuery->limit.offset < ret) { - qDebug("QInfo:%p initial numOfRows:%d, generate filled result:%d rows, offset:%" PRId64 ". Discard due to offset, remain:%" PRId64 ", new offset:%d", - pRuntimeEnv->qinfo, pFillInfo->numOfRows, ret, pQuery->limit.offset, ret - pQuery->limit.offset, 0); - - ret -= (int32_t)pQuery->limit.offset; - for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { //???pExpr1 or pExpr2 - memmove(pDst[i]->data, pDst[i]->data + pQuery->pExpr1[i].bytes * pQuery->limit.offset, - ret * pQuery->pExpr1[i].bytes); - } - - pQuery->limit.offset = 0; - return ret; - } else { - qDebug("QInfo:%p initial numOfRows:%d, generate filled result:%d rows, offset:%" PRId64 ". Discard due to offset, " - "remain:%d, new offset:%" PRId64, pRuntimeEnv->qinfo, pFillInfo->numOfRows, ret, pQuery->limit.offset, 0, - pQuery->limit.offset - ret); - - pQuery->limit.offset -= ret; - ret = 0; - } - - // no data in current data after fill - int32_t numOfTotal = (int32_t)getNumOfResultsAfterFillGap(pFillInfo, pFillInfo->end, (int32_t)pRuntimeEnv->resultInfo.capacity); - if (numOfTotal == 0) { - return 0; - } - } -} -#endif - int32_t doFillGapsInResults_rv(SQueryRuntimeEnv* pRuntimeEnv, SSDataBlock *pOutput) { SFillInfo* pFillInfo = pRuntimeEnv->pFillInfo; @@ -6198,9 +5043,7 @@ static UNUSED_FUNC void sequentialTableProcess(SQInfo *pQInfo) { } } -#endif -#if 0 static int32_t doSaveContext(SQInfo *pQInfo) { SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQuery * pQuery = pRuntimeEnv->pQuery; @@ -6265,88 +5108,6 @@ static void doCloseAllTimeWindow(SQueryRuntimeEnv* pRuntimeEnv) { // } } -#if 0 -static UNUSED_FUNC void multiTableQueryProcess(SQInfo *pQInfo) { - SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; - SQuery *pQuery = pRuntimeEnv->pQuery; - - if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { - if (QUERY_IS_INTERVAL_QUERY(pQuery)) { - copyResToQueryResultBuf(pQInfo, pQuery); - } else { - copyToOutputBuf(pRuntimeEnv, &pRuntimeEnv->resultRowInfo); - } - - qDebug("QInfo:%p current:%"PRId64", total:%"PRId64, pQInfo, pRuntimeEnv->resultInfo.rows, pRuntimeEnv->resultInfo.total); - return; - } - - qDebug("QInfo:%p query start, qrange:%" PRId64 "-%" PRId64 ", order:%d, forward scan start", pQInfo, - pQuery->window.skey, pQuery->window.ekey, pQuery->order.order); - - // do check all qualified data blocks - int64_t el = scanMultiTableDataBlocks(pQInfo); - qDebug("QInfo:%p master scan completed, elapsed time: %" PRId64 "ms, reverse scan start", pQInfo, el); - - // query error occurred or query is killed, abort current execution - if (pQInfo->code != TSDB_CODE_SUCCESS || isQueryKilled(pQInfo)) { - qDebug("QInfo:%p query killed or error occurred, code:%s, abort", pQInfo, tstrerror(pQInfo->code)); - longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED); - } - - // close all time window results - doCloseAllTimeWindow(pQInfo); - - if (needReverseScan(pQuery)) { - int32_t code = doSaveContext(pQInfo); - if (code == TSDB_CODE_SUCCESS) { - el = scanMultiTableDataBlocks(pQInfo); - qDebug("QInfo:%p reversed scan completed, elapsed time: %" PRId64 "ms", pQInfo, el); - doRestoreContext(pQInfo); - } else { - pQInfo->code = code; - } - } else { - qDebug("QInfo:%p no need to do reversed scan, query completed", pQInfo); - } - - setQueryStatus(pQuery, QUERY_COMPLETED); - - if (pQInfo->code != TSDB_CODE_SUCCESS || isQueryKilled(pQInfo)) { - qDebug("QInfo:%p query killed or error occurred, code:%s, abort", pQInfo, tstrerror(pQInfo->code)); - //TODO finalizeQueryResult may cause SEGSEV, since the memory may not allocated yet, add a cleanup function instead - longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED); - } - - if (QUERY_IS_INTERVAL_QUERY(pQuery) || isSumAvgRateQuery(pQuery)) { - copyResToQueryResultBuf(pQInfo, pQuery); - } else { // not a interval query - initGroupResInfo(&pRuntimeEnv->groupResInfo, &pRuntimeEnv->resultRowInfo, 0); - copyToOutputBuf(pRuntimeEnv, &pRuntimeEnv->resultRowInfo); - } - - // handle the limitation of output buffer - qDebug("QInfo:%p points returned:%" PRId64 ", total:%" PRId64, pQInfo, pRuntimeEnv->resultInfo.rows, pRuntimeEnv->resultInfo.total + pRuntimeEnv->resultInfo.rows); -} - -static char *getArithemicInputSrc(void *param, const char *name, int32_t colId) { - SArithmeticSupport *pSupport = (SArithmeticSupport *) param; - SExprInfo* pExprInfo = (SExprInfo*) pSupport->exprList; - - int32_t index = -1; - for (int32_t i = 0; i < pSupport->numOfCols; ++i) { - if (colId == pExprInfo[i].base.resColId) { - index = i; - break; - } - } - - assert(index >= 0 && index < pSupport->numOfCols); - return pSupport->data[index] + pSupport->offset * pExprInfo[index].bytes; -} - -#endif - static SSDataBlock* doTableScanImpl(STableScanInfo *pTableScanInfo) { SSDataBlock *pBlock = &pTableScanInfo->block; SQuery* pQuery = pTableScanInfo->pRuntimeEnv->pQuery; @@ -7422,23 +6183,6 @@ SOperatorInfo* createTagScanOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInf return pOperator; } -/* - * in each query, this function will be called only once, no retry for further result. - * - * select count(*)/top(field,k)/avg(field name) from table_name [where ts>now-1a]; - * select count(*) from table_name group by status_column; - */ -void tableAggregationProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) { - SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; - - SQuery *pQuery = pRuntimeEnv->pQuery; - if (!pQuery->topBotQuery && pQuery->limit.offset > 0) { // no need to execute, since the output will be ignore. - return; - } - - pRuntimeEnv->outputBuf = pRuntimeEnv->proot->exec(pRuntimeEnv->proot); -} - void tableQueryImpl(SQInfo *pQInfo) { SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQuery * pQuery = pRuntimeEnv->pQuery; @@ -8662,18 +7406,6 @@ void freeQInfo(SQInfo *pQInfo) { tfree(pQInfo); } -size_t getResultSize(SQInfo *pQInfo, int64_t *numOfRows) { - SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv; - SQuery *pQuery = pRuntimeEnv->pQuery; - - /* - * get the file size and set the numOfRows to be the file size, since for tsComp query, - * the returned row size is equalled to 1 - * TODO handle the case that the file is too large to send back one time - */ - return (size_t)(pQuery->resultRowSize * (*numOfRows)); -} - int32_t doDumpQueryResult(SQInfo *pQInfo, char *data) { // the remained number of retrieved rows, not the interpolated result SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;