diff --git a/include/libs/function/function.h b/include/libs/function/function.h index 9fa89f0415..1303a1fb6a 100644 --- a/include/libs/function/function.h +++ b/include/libs/function/function.h @@ -110,7 +110,6 @@ typedef struct SFileBlockInfo { #define FUNCTION_COV 38 typedef struct SResultRowEntryInfo { -// int8_t hasResult:6; // result generated, not NULL value bool initialized:1; // output buffer has been initialized bool complete:1; // query has completed uint8_t isNullRes:6; // the result is null @@ -119,10 +118,10 @@ typedef struct SResultRowEntryInfo { // determine the real data need to calculated the result enum { - BLK_DATA_NO_NEEDED = 0x0, - BLK_DATA_STATIS_NEEDED = 0x1, - BLK_DATA_ALL_NEEDED = 0x3, - BLK_DATA_DISCARD = 0x4, // discard current data block since it is not qualified for filter + BLK_DATA_NOT_LOAD = 0x0, + BLK_DATA_SMA_LOAD = 0x1, + BLK_DATA_DATA_LOAD = 0x3, + BLK_DATA_FILTEROUT = 0x4, // discard current data block since it is not qualified for filter }; enum { diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index 5f99af5460..772ff5e69a 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -46,6 +46,14 @@ extern "C" { #define ERROR_MSG_BUF_DEFAULT_SIZE 512 #define HEARTBEAT_INTERVAL 1500 // ms +enum { + RES_TYPE__QUERY = 1, + RES_TYPE__TMQ, +}; + +#define TD_RES_QUERY(res) (*(int8_t*)res == RES_TYPE__QUERY) +#define TD_RES_TMQ(res) (*(int8_t*)res == RES_TYPE__TMQ) + typedef struct SAppInstInfo SAppInstInfo; typedef struct { diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 20b7169895..c911aef45f 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -2004,7 +2004,7 @@ static int32_t updateBlockLoadStatus(STaskAttr* pQuery, int32_t status) { bool hasFirstLastFunc = false; bool hasOtherFunc = false; - if (status == BLK_DATA_ALL_NEEDED || status == BLK_DATA_DISCARD) { + if (status == BLK_DATA_DATA_LOAD || status == BLK_DATA_FILTEROUT) { return status; } @@ -2023,11 +2023,11 @@ static int32_t updateBlockLoadStatus(STaskAttr* pQuery, int32_t status) { } } - if (hasFirstLastFunc && status == BLK_DATA_NO_NEEDED) { + if (hasFirstLastFunc && status == BLK_DATA_NOT_LOAD) { if (!hasOtherFunc) { - return BLK_DATA_DISCARD; + return BLK_DATA_FILTEROUT; } else { - return BLK_DATA_ALL_NEEDED; + return BLK_DATA_DATA_LOAD; } } @@ -2360,7 +2360,7 @@ static void doSetTagValueInParam(void* pTable, int32_t tagColId, SVarian static uint32_t doFilterByBlockTimeWindow(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock) { SqlFunctionCtx* pCtx = pTableScanInfo->pCtx; - uint32_t status = BLK_DATA_NO_NEEDED; + uint32_t status = BLK_DATA_NOT_LOAD; int32_t numOfOutput = pTableScanInfo->numOfOutput; for (int32_t i = 0; i < numOfOutput; ++i) { @@ -2369,11 +2369,11 @@ static uint32_t doFilterByBlockTimeWindow(STableScanInfo* pTableScanInfo, SSData // group by + first/last should not apply the first/last block filter if (functionId < 0) { - status |= BLK_DATA_ALL_NEEDED; + status |= BLK_DATA_DATA_LOAD; return status; } else { // status |= aAggs[functionId].dataReqFunc(&pTableScanInfo->pCtx[i], &pBlock->info.window, colId); - // if ((status & BLK_DATA_ALL_NEEDED) == BLK_DATA_ALL_NEEDED) { + // if ((status & BLK_DATA_DATA_LOAD) == BLK_DATA_DATA_LOAD) { // return status; // } } @@ -2384,7 +2384,7 @@ static uint32_t doFilterByBlockTimeWindow(STableScanInfo* pTableScanInfo, SSData int32_t loadDataBlockOnDemand(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, uint32_t* status) { - *status = BLK_DATA_NO_NEEDED; + *status = BLK_DATA_NOT_LOAD; pBlock->pDataBlock = NULL; pBlock->pBlockAgg = NULL; @@ -2397,36 +2397,15 @@ int32_t loadDataBlockOnDemand(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableSc pCost->totalBlocks += 1; pCost->totalRows += pBlock->info.rows; #if 0 - if (pRuntimeEnv->pTsBuf != NULL) { - (*status) = BLK_DATA_ALL_NEEDED; - - if (pQueryAttr->stableQuery) { // todo refactor - SExprInfo* pExprInfo = &pTableScanInfo->pExpr[0]; - int16_t tagId = (int16_t)pExprInfo->base.param[0].i; - SColumnInfo* pColInfo = doGetTagColumnInfoById(pQueryAttr->tagColList, pQueryAttr->numOfTags, tagId); - - // compare tag first - SVariant t = {0}; - doSetTagValueInParam(pRuntimeEnv->current->pTable, tagId, &t, pColInfo->type, pColInfo->bytes); - setTimestampListJoinInfo(pRuntimeEnv, &t, pRuntimeEnv->current); - - STSElem elem = tsBufGetElem(pRuntimeEnv->pTsBuf); - if (!tsBufIsValidElem(&elem) || (tsBufIsValidElem(&elem) && (taosVariantCompare(&t, elem.tag) != 0))) { - (*status) = BLK_DATA_DISCARD; - return TSDB_CODE_SUCCESS; - } - } - } - // Calculate all time windows that are overlapping or contain current data block. // If current data block is contained by all possible time window, do not load current data block. if (/*pQueryAttr->pFilters || */pQueryAttr->groupbyColumn || pQueryAttr->sw.gap > 0 || (QUERY_IS_INTERVAL_QUERY(pQueryAttr) && overlapWithTimeWindow(pTaskInfo, &pBlock->info))) { - (*status) = BLK_DATA_ALL_NEEDED; + (*status) = BLK_DATA_DATA_LOAD; } // check if this data block is required to load - if ((*status) != BLK_DATA_ALL_NEEDED) { + if ((*status) != BLK_DATA_DATA_LOAD) { bool needFilter = true; // the pCtx[i] result is belonged to previous time window since the outputBuf has not been set yet, @@ -2458,18 +2437,18 @@ int32_t loadDataBlockOnDemand(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableSc if (needFilter) { (*status) = doFilterByBlockTimeWindow(pTableScanInfo, pBlock); } else { - (*status) = BLK_DATA_ALL_NEEDED; + (*status) = BLK_DATA_DATA_LOAD; } } SDataBlockInfo* pBlockInfo = &pBlock->info; // *status = updateBlockLoadStatus(pRuntimeEnv->pQueryAttr, *status); - if ((*status) == BLK_DATA_NO_NEEDED || (*status) == BLK_DATA_DISCARD) { + if ((*status) == BLK_DATA_NOT_LOAD || (*status) == BLK_DATA_FILTEROUT) { //qDebug("QInfo:0x%"PRIx64" data block discard, brange:%" PRId64 "-%" PRId64 ", rows:%d", pQInfo->qId, pBlockInfo->window.skey, // pBlockInfo->window.ekey, pBlockInfo->rows); pCost->discardBlocks += 1; - } else if ((*status) == BLK_DATA_STATIS_NEEDED) { + } else if ((*status) == BLK_DATA_SMA_LOAD) { // this function never returns error? pCost->loadBlockStatis += 1; // tsdbRetrieveDataBlockStatisInfo(pTableScanInfo->pTsdbReadHandle, &pBlock->pBlockAgg); @@ -2479,7 +2458,7 @@ int32_t loadDataBlockOnDemand(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableSc pCost->totalCheckedRows += pBlock->info.rows; } } else { - assert((*status) == BLK_DATA_ALL_NEEDED); + assert((*status) == BLK_DATA_DATA_LOAD); // load the data block statistics to perform further filter pCost->loadBlockStatis += 1; @@ -2511,7 +2490,7 @@ int32_t loadDataBlockOnDemand(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableSc pCost->discardBlocks += 1; //qDebug("QInfo:0x%"PRIx64" data block discard, brange:%" PRId64 "-%" PRId64 ", rows:%d", pQInfo->qId, // pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); - (*status) = BLK_DATA_DISCARD; + (*status) = BLK_DATA_FILTEROUT; return TSDB_CODE_SUCCESS; } } @@ -2523,7 +2502,7 @@ int32_t loadDataBlockOnDemand(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableSc // pCost->discardBlocks += 1; // qDebug("QInfo:0x%"PRIx64" data block discard, brange:%" PRId64 "-%" PRId64 ", rows:%d", pQInfo->qId, pBlockInfo->window.skey, // pBlockInfo->window.ekey, pBlockInfo->rows); -// (*status) = BLK_DATA_DISCARD; +// (*status) = BLK_DATA_FILTEROUT; // return TSDB_CODE_SUCCESS; // } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 0286b6e6e9..901bfde6a4 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -73,7 +73,7 @@ int32_t loadDataBlock(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableScanInfo, pCost->totalCheckedRows += pBlock->info.rows; pCost->loadBlocks += 1; - *status = BLK_DATA_ALL_NEEDED; + *status = BLK_DATA_DATA_LOAD; SArray* pCols = tsdbRetrieveDataBlock(pTableScanInfo->dataReader, NULL); if (pCols == NULL) { @@ -138,7 +138,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator, bool* newgroup) { // } // this function never returns error? - uint32_t status = BLK_DATA_ALL_NEEDED; + uint32_t status = BLK_DATA_DATA_LOAD; int32_t code = loadDataBlock(pTaskInfo, pTableScanInfo, pBlock, &status); // int32_t code = loadDataBlockOnDemand(pOperator->pRuntimeEnv, pTableScanInfo, pBlock, &status); if (code != TSDB_CODE_SUCCESS) { @@ -146,7 +146,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator, bool* newgroup) { } // current block is ignored according to filter result by block statistics data, continue load the next block - if (status == BLK_DATA_DISCARD || pBlock->info.rows == 0) { + if (status == BLK_DATA_FILTEROUT || pBlock->info.rows == 0) { continue; } diff --git a/source/libs/function/src/taggfunction.c b/source/libs/function/src/taggfunction.c index 60566d00d8..e001ef4071 100644 --- a/source/libs/function/src/taggfunction.c +++ b/source/libs/function/src/taggfunction.c @@ -557,14 +557,14 @@ static void count_func_merge(SqlFunctionCtx *pCtx) { */ int32_t countRequired(SqlFunctionCtx *pCtx, STimeWindow* w, int32_t colId) { if (colId == PRIMARYKEY_TIMESTAMP_COL_ID) { - return BLK_DATA_NO_NEEDED; + return BLK_DATA_NOT_LOAD; } else { - return BLK_DATA_STATIS_NEEDED; + return BLK_DATA_SMA_LOAD; } } int32_t noDataRequired(SqlFunctionCtx *pCtx, STimeWindow* w, int32_t colId) { - return BLK_DATA_NO_NEEDED; + return BLK_DATA_NOT_LOAD; } #define LIST_ADD_N_DOUBLE_FLOAT(x, ctx, p, t, numOfElem, tsdbType) \ do { \ @@ -743,76 +743,76 @@ static void sum_func_merge(SqlFunctionCtx *pCtx) { } static int32_t statisRequired(SqlFunctionCtx *pCtx, STimeWindow* w, int32_t colId) { - return BLK_DATA_STATIS_NEEDED; + return BLK_DATA_SMA_LOAD; } static int32_t dataBlockRequired(SqlFunctionCtx *pCtx, STimeWindow* w, int32_t colId) { - return BLK_DATA_ALL_NEEDED; + return BLK_DATA_DATA_LOAD; } // todo: if column in current data block are null, opt for this case static int32_t firstFuncRequired(SqlFunctionCtx *pCtx, STimeWindow* w, int32_t colId) { if (pCtx->order == TSDB_ORDER_DESC) { - return BLK_DATA_NO_NEEDED; + return BLK_DATA_NOT_LOAD; } // no result for first query, data block is required if (GET_RES_INFO(pCtx) == NULL || GET_RES_INFO(pCtx)->numOfRes <= 0) { - return BLK_DATA_ALL_NEEDED; + return BLK_DATA_DATA_LOAD; } else { - return BLK_DATA_NO_NEEDED; + return BLK_DATA_NOT_LOAD; } } static int32_t lastFuncRequired(SqlFunctionCtx *pCtx, STimeWindow* w, int32_t colId) { if (pCtx->order != pCtx->param[0].i) { - return BLK_DATA_NO_NEEDED; + return BLK_DATA_NOT_LOAD; } if (GET_RES_INFO(pCtx) == NULL || GET_RES_INFO(pCtx)->numOfRes <= 0) { - return BLK_DATA_ALL_NEEDED; + return BLK_DATA_DATA_LOAD; } else { - return BLK_DATA_NO_NEEDED; + return BLK_DATA_NOT_LOAD; } } static int32_t firstDistFuncRequired(SqlFunctionCtx *pCtx, STimeWindow* w, int32_t colId) { if (pCtx->order == TSDB_ORDER_DESC) { - return BLK_DATA_NO_NEEDED; + return BLK_DATA_NOT_LOAD; } // not initialized yet, it is the first block, load it. if (pCtx->pOutput == NULL) { - return BLK_DATA_ALL_NEEDED; + return BLK_DATA_DATA_LOAD; } // the pCtx should be set to current Ctx and output buffer before call this function. Otherwise, pCtx->pOutput is // the previous windowRes output buffer, not current unloaded block. In this case, the following filter is invalid SFirstLastInfo *pInfo = (SFirstLastInfo*) (pCtx->pOutput + pCtx->inputBytes); if (pInfo->hasResult != DATA_SET_FLAG) { - return BLK_DATA_ALL_NEEDED; + return BLK_DATA_DATA_LOAD; } else { // data in current block is not earlier than current result - return (pInfo->ts <= w->skey) ? BLK_DATA_NO_NEEDED : BLK_DATA_ALL_NEEDED; + return (pInfo->ts <= w->skey) ? BLK_DATA_NOT_LOAD : BLK_DATA_DATA_LOAD; } } static int32_t lastDistFuncRequired(SqlFunctionCtx *pCtx, STimeWindow* w, int32_t colId) { if (pCtx->order != pCtx->param[0].i) { - return BLK_DATA_NO_NEEDED; + return BLK_DATA_NOT_LOAD; } // not initialized yet, it is the first block, load it. if (pCtx->pOutput == NULL) { - return BLK_DATA_ALL_NEEDED; + return BLK_DATA_DATA_LOAD; } // the pCtx should be set to current Ctx and output buffer before call this function. Otherwise, pCtx->pOutput is // the previous windowRes output buffer, not current unloaded block. In this case, the following filter is invalid SFirstLastInfo *pInfo = (SFirstLastInfo*) (pCtx->pOutput + pCtx->inputBytes); if (pInfo->hasResult != DATA_SET_FLAG) { - return BLK_DATA_ALL_NEEDED; + return BLK_DATA_DATA_LOAD; } else { - return (pInfo->ts > w->ekey) ? BLK_DATA_NO_NEEDED : BLK_DATA_ALL_NEEDED; + return (pInfo->ts > w->ekey) ? BLK_DATA_NOT_LOAD : BLK_DATA_DATA_LOAD; } }