diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index ec2fadef28..9d32bf830e 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -46,7 +46,6 @@ extern "C" { typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int32_t order); -#define Q_STATUS_EQUAL(p, s) (((p) & (s)) != 0u) #define IS_VALID_SESSION_WIN(winInfo) ((winInfo).sessionWin.win.skey > 0) #define SET_SESSION_WIN_INVALID(winInfo) ((winInfo).sessionWin.win.skey = INT64_MIN) #define IS_INVALID_SESSION_WIN_KEY(winKey) ((winKey).win.skey <= 0) @@ -109,6 +108,7 @@ typedef int32_t (*__optr_open_fn_t)(struct SOperatorInfo* pOptr); typedef SSDataBlock* (*__optr_fn_t)(struct SOperatorInfo* pOptr); typedef void (*__optr_close_fn_t)(void* param); typedef int32_t (*__optr_explain_fn_t)(struct SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len); +typedef int32_t (*__optr_reqBuf_fn_t)(struct SOperatorInfo* pOptr); typedef struct STaskIdInfo { uint64_t queryId; // this is also a request id @@ -170,8 +170,9 @@ struct SExecTaskInfo { STaskCostInfo cost; int64_t owner; // if it is in execution int32_t code; + int32_t qbufQuota; // total available buffer (in KB) during execution query - int64_t version; // used for stream to record wal version + int64_t version; // used for stream to record wal version, why not move to sschemainfo SStreamTaskInfo streamInfo; SSchemaInfo schemaInfo; STableListInfo* pTableInfoList; // this is a table list @@ -198,6 +199,7 @@ typedef struct SOperatorFpSet { __optr_fn_t getNextFn; __optr_fn_t cleanupFn; // call this function to release the allocated resources ASAP __optr_close_fn_t closeFn; + __optr_reqBuf_fn_t reqBufFn; // total used buffer for blocking operator __optr_encode_fn_t encodeResultRow; __optr_decode_fn_t decodeResultRow; __optr_explain_fn_t getExplainFn; @@ -486,12 +488,10 @@ typedef struct STableCountScanSupp { int16_t dbNameSlotId; int16_t stbNameSlotId; int16_t tbCountSlotId; - - bool groupByDbName; - bool groupByStbName; + bool groupByDbName; + bool groupByStbName; char dbNameFilter[TSDB_DB_NAME_LEN]; char stbNameFilter[TSDB_TABLE_NAME_LEN]; - } STableCountScanSupp; typedef struct STableCountScanOperatorInfo { @@ -671,13 +671,14 @@ typedef struct SStreamFillOperatorInfo { #define OPTR_SET_OPENED(_optr) ((_optr)->status |= OP_OPENED) SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t cleanup, - __optr_close_fn_t closeFn, __optr_explain_fn_t explain); -int32_t operatorDummyOpenFn(SOperatorInfo* pOperator); + __optr_close_fn_t closeFn, __optr_reqBuf_fn_t reqBufFn, __optr_explain_fn_t explain); +int32_t optrDummyOpenFn(SOperatorInfo* pOperator); int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num); void setOperatorCompleted(SOperatorInfo* pOperator); void setOperatorInfo(SOperatorInfo* pOperator, const char* name, int32_t type, bool blocking, int32_t status, void* pInfo, SExecTaskInfo* pTaskInfo); void destroyOperatorInfo(SOperatorInfo* pOperator); +int32_t optrDefaultBufFn(SOperatorInfo* pOperator); void initBasicInfo(SOptrBasicInfo* pInfo, SSDataBlock* pBlock); void cleanupBasicInfo(SOptrBasicInfo* pInfo); @@ -740,6 +741,8 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNode* pScanPhyNode, const char* pUser, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createTableCountScanOperatorInfo(SReadHandle* handle, STableCountScanPhysiNode* pNode, SExecTaskInfo* pTaskInfo); + SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pNode, SExecTaskInfo* pTaskInfo); @@ -812,8 +815,6 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, qTaskInfo_t* pTaskInfo, SReadHandle* readHandle); int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInfoList); -int32_t getMaximumIdleDurationSec(); - STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int64_t ts, SInterval* pInterval, int32_t order); int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, int32_t startPos, TSKEY ekey, diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index c432f3c01c..672bb09b14 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -117,7 +117,7 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock); pOperator->fpSet = - createOperatorFpSet(operatorDummyOpenFn, doScanCache, NULL, destroyCacheScanOperator, NULL); + createOperatorFpSet(optrDummyOpenFn, doScanCache, NULL, destroyCacheScanOperator, optrDefaultBufFn, NULL); pOperator->cost.openCost = 0; return pOperator; diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index 546f5cc420..76e27f12fc 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -307,7 +307,7 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pDummyBlock->pDataBlock); pOperator->fpSet = - createOperatorFpSet(prepareLoadRemoteData, loadRemoteData, NULL, destroyExchangeOperatorInfo, NULL); + createOperatorFpSet(prepareLoadRemoteData, loadRemoteData, NULL, destroyExchangeOperatorInfo, optrDefaultBufFn, NULL); return pOperator; _error: diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index aedab23f74..60080d204b 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -503,7 +503,7 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bo } if (pTaskInfo->cost.start == 0) { - pTaskInfo->cost.start = taosGetTimestampMs(); + pTaskInfo->cost.start = taosGetTimestampUs(); } if (isTaskKilled(pTaskInfo)) { @@ -597,7 +597,7 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) { } if (pTaskInfo->cost.start == 0) { - pTaskInfo->cost.start = taosGetTimestampMs(); + pTaskInfo->cost.start = taosGetTimestampUs(); } if (isTaskKilled(pTaskInfo)) { @@ -706,15 +706,20 @@ int32_t qAsyncKillTask(qTaskInfo_t qinfo, int32_t rspCode) { static void printTaskExecCostInLog(SExecTaskInfo* pTaskInfo) { STaskCostInfo* pSummary = &pTaskInfo->cost; + int64_t idleTime = pSummary->start - pSummary->created; SFileBlockLoadRecorder* pRecorder = pSummary->pRecoder; if (pSummary->pRecoder != NULL) { qDebug( - "%s :cost summary: elapsed time:%.2f ms, extract tableList:%.2f ms, createGroupIdMap:%.2f ms, total blocks:%d, " + "%s :cost summary: idle in queue:%.2f ms, elapsed time:%.2f ms, extract tableList:%.2f ms, " + "createGroupIdMap:%.2f ms, total blocks:%d, " "load block SMA:%d, load data block:%d, total rows:%" PRId64 ", check rows:%" PRId64, - GET_TASKID(pTaskInfo), pSummary->elapsedTime / 1000.0, pSummary->extractListTime, pSummary->groupIdMapTime, - pRecorder->totalBlocks, pRecorder->loadBlockStatis, pRecorder->loadBlocks, pRecorder->totalRows, - pRecorder->totalCheckedRows); + GET_TASKID(pTaskInfo), idleTime / 1000.0, pSummary->elapsedTime / 1000.0, pSummary->extractListTime, + pSummary->groupIdMapTime, pRecorder->totalBlocks, pRecorder->loadBlockStatis, pRecorder->loadBlocks, + pRecorder->totalRows, pRecorder->totalCheckedRows); + } else { + qDebug("%s :cost summary: idle in queue:%.2f ms, elapsed time:%.2f ms", GET_TASKID(pTaskInfo), idleTime / 1000.0, + pSummary->elapsedTime / 1000.0); } } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index fb705dacfa..a489cd1ac8 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -85,8 +85,6 @@ typedef struct SAggOperatorInfo { SExprSupp scalarExprSup; } SAggOperatorInfo; -int32_t getMaximumIdleDurationSec() { return tsShellActivityTimer * 2; } - static void setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExpr, SSDataBlock* pBlock); static void releaseQueryBuf(size_t numOfTables); @@ -106,7 +104,7 @@ void setOperatorCompleted(SOperatorInfo* pOperator) { pOperator->status = OP_EXEC_DONE; ASSERT(pOperator->pTaskInfo != NULL); - pOperator->cost.totalCost = (taosGetTimestampUs() - pOperator->pTaskInfo->cost.start * 1000) / 1000.0; + pOperator->cost.totalCost = (taosGetTimestampUs() - pOperator->pTaskInfo->cost.start) / 1000.0; setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED); } @@ -120,19 +118,21 @@ void setOperatorInfo(SOperatorInfo* pOperator, const char* name, int32_t type, b pOperator->pTaskInfo = pTaskInfo; } -int32_t operatorDummyOpenFn(SOperatorInfo* pOperator) { +int32_t optrDummyOpenFn(SOperatorInfo* pOperator) { OPTR_SET_OPENED(pOperator); pOperator->cost.openCost = 0; return TSDB_CODE_SUCCESS; } SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t cleanup, - __optr_close_fn_t closeFn, __optr_explain_fn_t explain) { + __optr_close_fn_t closeFn, __optr_reqBuf_fn_t reqBufFn, + __optr_explain_fn_t explain) { SOperatorFpSet fpSet = { ._openFn = openFn, .getNextFn = nextFn, .cleanupFn = cleanup, .closeFn = closeFn, + .reqBufFn = reqBufFn, .getExplainFn = explain, }; @@ -939,10 +939,10 @@ static void setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, u } static void doUpdateNumOfRows(SqlFunctionCtx* pCtx, SResultRow* pRow, int32_t numOfExprs, - const int32_t* rowCellOffset) { + const int32_t* rowEntryOffset) { bool returnNotNull = false; for (int32_t j = 0; j < numOfExprs; ++j) { - struct SResultRowEntryInfo* pResInfo = getResultEntryInfo(pRow, j, rowCellOffset); + SResultRowEntryInfo* pResInfo = getResultEntryInfo(pRow, j, rowEntryOffset); if (!isRowEntryInitialized(pResInfo)) { continue; } @@ -1145,43 +1145,6 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG } } -// void skipBlocks(STaskRuntimeEnv *pRuntimeEnv) { -// STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr; -// -// if (pQueryAttr->limit.offset <= 0 || pQueryAttr->numOfFilterCols > 0) { -// return; -// } -// -// pQueryAttr->pos = 0; -// int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQueryAttr->order.order); -// -// STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current; -// TsdbQueryHandleT pTsdbReadHandle = pRuntimeEnv->pTsdbReadHandle; -// -// SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER; -// while (tsdbNextDataBlock(pTsdbReadHandle)) { -// if (isTaskKilled(pRuntimeEnv->qinfo)) { -// T_LONG_JMP(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED); -// } -// -// if (pQueryAttr->limit.offset > blockInfo.rows) { -// pQueryAttr->limit.offset -= blockInfo.rows; -// pTableQueryInfo->lastKey = (QUERY_IS_ASC_QUERY(pQueryAttr)) ? blockInfo.window.ekey : blockInfo.window.skey; -// pTableQueryInfo->lastKey += step; -// -// //qDebug("QInfo:0x%"PRIx64" skip rows:%d, offset:%" PRId64, GET_TASKID(pRuntimeEnv), blockInfo.rows, -// pQuery->limit.offset); -// } else { // find the appropriated start position in current block -// updateOffsetVal(pRuntimeEnv, &blockInfo); -// break; -// } -// } -// -// if (terrno != TSDB_CODE_SUCCESS) { -// T_LONG_JMP(pRuntimeEnv->env, terrno); -// } -// } - // static TSKEY doSkipIntervalProcess(STaskRuntimeEnv* pRuntimeEnv, STimeWindow* win, SDataBlockInfo* pBlockInfo, // STableQueryInfo* pTableQueryInfo) { // STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr; @@ -1406,11 +1369,11 @@ static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBloc SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx; bool hasCountFunc = false; + for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) { - if ((strcmp(pCtx[i].pExpr->pExpr->_function.functionName, "count") == 0) || - (strcmp(pCtx[i].pExpr->pExpr->_function.functionName, "hyperloglog") == 0) || - (strcmp(pCtx[i].pExpr->pExpr->_function.functionName, "_hyperloglog_partial") == 0) || - (strcmp(pCtx[i].pExpr->pExpr->_function.functionName, "_hyperloglog_merge") == 0)) { + const char* pName = pCtx[i].pExpr->pExpr->_function.functionName; + if ((strcmp(pName, "count") == 0) || (strcmp(pName, "hyperloglog") == 0) || + (strcmp(pName, "_hyperloglog_partial") == 0) || (strcmp(pName, "_hyperloglog_merge") == 0)) { hasCountFunc = true; break; } @@ -1463,7 +1426,6 @@ static void destroyDataBlockForEmptyInput(bool blockAllocated, SSDataBlock **ppB *ppBlock = NULL; } - // this is a blocking operator static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { if (OPTR_IS_OPENED(pOperator)) { @@ -1614,6 +1576,15 @@ void destroyOperatorInfo(SOperatorInfo* pOperator) { taosMemoryFreeClear(pOperator); } +// each operator should be set their own function to return total cost buffer +int32_t optrDefaultBufFn(SOperatorInfo* pOperator) { + if (pOperator->blocking) { + ASSERT(0); + } else { + return 0; + } +} + int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz) { *defaultPgsz = 4096; while (*defaultPgsz < rowSize * 4) { @@ -1794,7 +1765,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiN setOperatorInfo(pOperator, "TableAggregate", QUERY_NODE_PHYSICAL_PLAN_HASH_AGG, true, OP_NOT_OPENED, pInfo, pTaskInfo); - pOperator->fpSet = createOperatorFpSet(doOpenAggregateOptr, getAggregateResult, NULL, destroyAggOperatorInfo, NULL); + pOperator->fpSet = createOperatorFpSet(doOpenAggregateOptr, getAggregateResult, NULL, destroyAggOperatorInfo, optrDefaultBufFn, NULL); if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) { STableScanInfo* pTableScanInfo = downstream->info; @@ -1871,8 +1842,6 @@ static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPT SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode); -SOperatorInfo* createTableCountScanOperatorInfo(SReadHandle* handle, STableCountScanPhysiNode* pNode, - SExecTaskInfo* pTaskInfo); int32_t extractTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNode, SExecTaskInfo* pTaskInfo) { SMetaReader mr = {0}; metaReaderInit(&mr, pHandle->meta, 0); diff --git a/source/libs/executor/src/filloperator.c b/source/libs/executor/src/filloperator.c index 5ed26f627a..8d5af64777 100644 --- a/source/libs/executor/src/filloperator.c +++ b/source/libs/executor/src/filloperator.c @@ -381,7 +381,7 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* setOperatorInfo(pOperator, "FillOperator", QUERY_NODE_PHYSICAL_PLAN_FILL, false, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->exprSupp.numOfExprs = pInfo->numOfExpr; - pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doFill, NULL, destroyFillOperatorInfo, NULL); + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doFill, NULL, destroyFillOperatorInfo, optrDefaultBufFn, NULL); code = appendDownstream(pOperator, &downstream, 1); return pOperator; @@ -1478,7 +1478,7 @@ SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFi pInfo->srcRowIndex = 0; setOperatorInfo(pOperator, "StreamFillOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL, false, OP_NOT_OPENED, pInfo, pTaskInfo); - pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamFill, NULL, destroyStreamFillOperatorInfo, NULL); + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamFill, NULL, destroyStreamFillOperatorInfo, optrDefaultBufFn, NULL); code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 4601175561..5d34064b7e 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -470,7 +470,7 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* setOperatorInfo(pOperator, "GroupbyAggOperator", 0, true, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->fpSet = - createOperatorFpSet(operatorDummyOpenFn, hashGroupbyAggregate, NULL, destroyGroupOperatorInfo, NULL); + createOperatorFpSet(optrDummyOpenFn, hashGroupbyAggregate, NULL, destroyGroupOperatorInfo, optrDefaultBufFn, NULL); code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { goto _error; @@ -850,7 +850,7 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition pOperator->exprSupp.numOfExprs = numOfCols; pOperator->exprSupp.pExprInfo = pExprInfo; - pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, hashPartition, NULL, destroyPartitionOperatorInfo, NULL); + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, hashPartition, NULL, destroyPartitionOperatorInfo, optrDefaultBufFn, NULL); code = appendDownstream(pOperator, &downstream, 1); return pOperator; @@ -1142,7 +1142,7 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr pOperator->exprSupp.numOfExprs = numOfCols; pOperator->exprSupp.pExprInfo = pExprInfo; pOperator->fpSet = - createOperatorFpSet(operatorDummyOpenFn, doStreamHashPartition, NULL, destroyStreamPartitionOperatorInfo, NULL); + createOperatorFpSet(optrDummyOpenFn, doStreamHashPartition, NULL, destroyStreamPartitionOperatorInfo, optrDefaultBufFn, NULL); initParDownStream(downstream, &pInfo->partitionSup, &pInfo->scalarSup); code = appendDownstream(pOperator, &downstream, 1); diff --git a/source/libs/executor/src/joinoperator.c b/source/libs/executor/src/joinoperator.c index e7cce39dfd..d460af971c 100644 --- a/source/libs/executor/src/joinoperator.c +++ b/source/libs/executor/src/joinoperator.c @@ -136,7 +136,7 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t pInfo->inputOrder = TSDB_ORDER_DESC; } - pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doMergeJoin, NULL, destroyMergeJoinOperator, NULL); + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doMergeJoin, NULL, destroyMergeJoinOperator, optrDefaultBufFn, NULL); code = appendDownstream(pOperator, pDownstream, numOfDownstream); if (code != TSDB_CODE_SUCCESS) { goto _error; diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index da77facb21..65bb40b195 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -118,8 +118,8 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pOperator->exprSupp.pCtx, numOfCols); setOperatorInfo(pOperator, "ProjectOperator", QUERY_NODE_PHYSICAL_PLAN_PROJECT, false, OP_NOT_OPENED, pInfo, pTaskInfo); - pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doProjectOperation, NULL, - destroyProjectOperatorInfo, NULL); + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doProjectOperation, NULL, + destroyProjectOperatorInfo, optrDefaultBufFn, NULL); code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { @@ -415,7 +415,7 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pSup->pCtx, numOfExpr); setOperatorInfo(pOperator, "IndefinitOperator", QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC, false, OP_NOT_OPENED, pInfo, pTaskInfo); - pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doApplyIndefinitFunction, NULL, destroyIndefinitOperatorInfo, NULL); + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doApplyIndefinitFunction, NULL, destroyIndefinitOperatorInfo, optrDefaultBufFn, NULL); code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 1b05e8b573..0d245f0815 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -899,8 +899,8 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, } taosLRUCacheSetStrictCapacity(pInfo->base.metaCache.pTableMetaEntryCache, false); - pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScan, NULL, destroyTableScanOperatorInfo, - getTableScannerExecInfo); + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableScan, NULL, destroyTableScanOperatorInfo, + optrDefaultBufFn, getTableScannerExecInfo); // for non-blocking operator, the open cost is always 0 pOperator->cost.openCost = 0; @@ -925,7 +925,7 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo* setOperatorInfo(pOperator, "TableSeqScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo); - pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScanImpl, NULL, NULL, NULL); + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableScanImpl, NULL, NULL, optrDefaultBufFn, NULL); return pOperator; } @@ -2139,7 +2139,7 @@ SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pT setOperatorInfo(pOperator, "RawScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo); - pOperator->fpSet = createOperatorFpSet(NULL, doRawScan, NULL, destroyRawScanOperatorInfo, NULL); + pOperator->fpSet = createOperatorFpSet(NULL, doRawScan, NULL, destroyRawScanOperatorInfo, optrDefaultBufFn, NULL); return pOperator; _end: @@ -2328,7 +2328,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock); __optr_fn_t nextFn = pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM ? doStreamScan : doQueueScan; - pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, nextFn, NULL, destroyStreamScanOperatorInfo, NULL); + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, nextFn, NULL, destroyStreamScanOperatorInfo, optrDefaultBufFn, NULL); return pOperator; @@ -2465,7 +2465,7 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi initResultSizeInfo(&pOperator->resultInfo, 4096); blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); - pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTagScan, NULL, destroyTagScanOperatorInfo, NULL); + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTagScan, NULL, destroyTagScanOperatorInfo, optrDefaultBufFn, NULL); return pOperator; @@ -2866,8 +2866,8 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN pInfo, pTaskInfo); pOperator->exprSupp.numOfExprs = numOfCols; - pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableMergeScan, NULL, destroyTableMergeScanOperatorInfo, - getTableMergeScanExplainExecInfo); + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableMergeScan, NULL, destroyTableMergeScanOperatorInfo, + optrDefaultBufFn, getTableMergeScanExplainExecInfo); pOperator->cost.openCost = 0; return pOperator; @@ -3015,7 +3015,7 @@ SOperatorInfo* createTableCountScanOperatorInfo(SReadHandle* readHandle, STableC setOperatorInfo(pOperator, "TableCountScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->fpSet = - createOperatorFpSet(operatorDummyOpenFn, doTableCountScan, NULL, destoryTableCountScanOperator, NULL); + createOperatorFpSet(optrDummyOpenFn, doTableCountScan, NULL, destoryTableCountScanOperator, optrDefaultBufFn, NULL); return pOperator; _error: diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index b355b85861..005b794f0b 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -75,7 +75,7 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* // TODO dynamic set the available sort buffer pOperator->fpSet = - createOperatorFpSet(doOpenSortOperator, doSort, NULL, destroySortOperatorInfo, getExplainExecInfo); + createOperatorFpSet(doOpenSortOperator, doSort, NULL, destroySortOperatorInfo, optrDefaultBufFn, getExplainExecInfo); code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { @@ -521,8 +521,8 @@ SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSort pInfo->pSortInfo = createSortInfo(pSortPhyNode->pSortKeys); setOperatorInfo(pOperator, "GroupSortOperator", QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT, false, OP_NOT_OPENED, pInfo, pTaskInfo); - pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doGroupSort, NULL, destroyGroupSortOperatorInfo, - getGroupSortExplainExecInfo); + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doGroupSort, NULL, destroyGroupSortOperatorInfo, + optrDefaultBufFn, getGroupSortExplainExecInfo); code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { @@ -798,7 +798,7 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size setOperatorInfo(pOperator, "MultiwayMergeOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE, false, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->fpSet = createOperatorFpSet(openMultiwayMergeOperator, doMultiwayMerge, NULL, - destroyMultiwayMergeOperatorInfo, getMultiwayMergeExplainExecInfo); + destroyMultiwayMergeOperatorInfo, optrDefaultBufFn, getMultiwayMergeExplainExecInfo); code = appendDownstream(pOperator, downStreams, numStreams); if (code != TSDB_CODE_SUCCESS) { diff --git a/source/libs/executor/src/sysscanoperator.c b/source/libs/executor/src/sysscanoperator.c index 96f21babe6..32905ca897 100644 --- a/source/libs/executor/src/sysscanoperator.c +++ b/source/libs/executor/src/sysscanoperator.c @@ -1437,7 +1437,7 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan setOperatorInfo(pOperator, "SysTableScanOperator", QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock); - pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSysTableScan, NULL, destroySysScanOperator, NULL); + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doSysTableScan, NULL, destroySysScanOperator, optrDefaultBufFn, NULL); return pOperator; _error: @@ -1943,7 +1943,7 @@ SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDi setOperatorInfo(pOperator, "DataBlockDistScanOperator", QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->fpSet = - createOperatorFpSet(operatorDummyOpenFn, doBlockInfoScan, NULL, destroyBlockDistScanOperatorInfo, NULL); + createOperatorFpSet(optrDummyOpenFn, doBlockInfoScan, NULL, destroyBlockDistScanOperatorInfo, optrDefaultBufFn, NULL); return pOperator; _error: diff --git a/source/libs/executor/src/timesliceoperator.c b/source/libs/executor/src/timesliceoperator.c index 90f5dde7c3..2374d80bbf 100644 --- a/source/libs/executor/src/timesliceoperator.c +++ b/source/libs/executor/src/timesliceoperator.c @@ -544,7 +544,7 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode setOperatorInfo(pOperator, "TimeSliceOperator", QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC, false, OP_NOT_OPENED, pInfo, pTaskInfo); - pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTimeslice, NULL, destroyTimeSliceOperatorInfo, NULL); + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTimeslice, NULL, destroyTimeSliceOperatorInfo, optrDefaultBufFn, NULL); blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index b0466b6216..2d0893b97b 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1049,14 +1049,14 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) { return TSDB_CODE_SUCCESS; } - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SOperatorInfo* downstream = pOperator->pDownstream[0]; + SIntervalAggOperatorInfo* pInfo = pOperator->info; SExprSupp* pSup = &pOperator->exprSupp; int32_t scanFlag = MAIN_SCAN; - - int64_t st = taosGetTimestampUs(); - SOperatorInfo* downstream = pOperator->pDownstream[0]; + int64_t st = taosGetTimestampUs(); while (1) { SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); @@ -1268,9 +1268,6 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator) { } SSDataBlock* pBlock = pInfo->binfo.pRes; - - ASSERT(pInfo->execModel == OPTR_EXEC_MODEL_BATCH); - pTaskInfo->code = pOperator->fpSet._openFn(pOperator); if (pTaskInfo->code != TSDB_CODE_SUCCESS) { return NULL; @@ -1765,7 +1762,6 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPh pInfo->inputOrder = (pPhyNode->window.inputTsOrder == ORDER_ASC) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC; pInfo->resultTsOrder = (pPhyNode->window.outputTsOrder == ORDER_ASC) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC; pInfo->interval = interval; - pInfo->execModel = pTaskInfo->execModel; pInfo->twAggSup = as; pInfo->binfo.mergeResultBlock = pPhyNode->window.mergeDataBlock; @@ -1797,7 +1793,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPh pInfo, pTaskInfo); pOperator->fpSet = - createOperatorFpSet(doOpenIntervalAgg, doBuildIntervalResult, NULL, destroyIntervalOperatorInfo, NULL); + createOperatorFpSet(doOpenIntervalAgg, doBuildIntervalResult, NULL, destroyIntervalOperatorInfo, optrDefaultBufFn, NULL); code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { @@ -2015,7 +2011,7 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWi setOperatorInfo(pOperator, "StateWindowOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE, true, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->fpSet = - createOperatorFpSet(openStateWindowAggOptr, doStateWindowAgg, NULL, destroyStateWindowOperatorInfo, NULL); + createOperatorFpSet(openStateWindowAggOptr, doStateWindowAgg, NULL, destroyStateWindowOperatorInfo, optrDefaultBufFn, NULL); code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { @@ -2088,7 +2084,7 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionW setOperatorInfo(pOperator, "SessionWindowAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION, true, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->fpSet = - createOperatorFpSet(operatorDummyOpenFn, doSessionWindowAgg, NULL, destroySWindowOperatorInfo, NULL); + createOperatorFpSet(optrDummyOpenFn, doSessionWindowAgg, NULL, destroySWindowOperatorInfo, optrDefaultBufFn, NULL); pOperator->pTaskInfo = pTaskInfo; code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { @@ -2749,7 +2745,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, pOperator->info = pInfo; pOperator->fpSet = - createOperatorFpSet(NULL, doStreamFinalIntervalAgg, NULL, destroyStreamFinalIntervalOperatorInfo, NULL); + createOperatorFpSet(NULL, doStreamFinalIntervalAgg, NULL, destroyStreamFinalIntervalOperatorInfo, optrDefaultBufFn, NULL); if (pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL) { initIntervalDownStream(downstream, pPhyNode->type, &pInfo->aggSup, &pInfo->interval, &pInfo->twAggSup); } @@ -3561,7 +3557,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh setOperatorInfo(pOperator, "StreamSessionWindowAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION, true, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->fpSet = - createOperatorFpSet(operatorDummyOpenFn, doStreamSessionAgg, NULL, destroyStreamSessionAggOperatorInfo, NULL); + createOperatorFpSet(optrDummyOpenFn, doStreamSessionAgg, NULL, destroyStreamSessionAggOperatorInfo, optrDefaultBufFn, NULL); if (downstream) { initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex, &pInfo->twAggSup); @@ -3705,8 +3701,8 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream if (pPhyNode->type != QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) { pInfo->pUpdateRes = createSpecialDataBlock(STREAM_CLEAR); blockDataEnsureCapacity(pInfo->pUpdateRes, 128); - pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamSessionSemiAgg, NULL, - destroyStreamSessionAggOperatorInfo, NULL); + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamSessionSemiAgg, NULL, + destroyStreamSessionAggOperatorInfo, optrDefaultBufFn, NULL); } setOperatorInfo(pOperator, name, pPhyNode->type, false, OP_NOT_OPENED, pInfo, pTaskInfo); @@ -4066,7 +4062,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys setOperatorInfo(pOperator, "StreamStateAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE, true, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->fpSet = - createOperatorFpSet(operatorDummyOpenFn, doStreamStateAgg, NULL, destroyStreamStateOperatorInfo, NULL); + createOperatorFpSet(optrDummyOpenFn, doStreamStateAgg, NULL, destroyStreamStateOperatorInfo, optrDefaultBufFn, NULL); initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex, &pInfo->twAggSup); code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { @@ -4314,7 +4310,6 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, iaInfo->win = pTaskInfo->window; iaInfo->inputOrder = TSDB_ORDER_ASC; iaInfo->interval = interval; - iaInfo->execModel = pTaskInfo->execModel; iaInfo->primaryTsIndex = ((SColumnNode*)pNode->window.pTspk)->slotId; iaInfo->binfo.mergeResultBlock = pNode->window.mergeDataBlock; @@ -4344,7 +4339,7 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, false, OP_NOT_OPENED, miaInfo, pTaskInfo); pOperator->fpSet = - createOperatorFpSet(operatorDummyOpenFn, mergeAlignedIntervalAgg, NULL, destroyMAIOperatorInfo, NULL); + createOperatorFpSet(optrDummyOpenFn, mergeAlignedIntervalAgg, NULL, destroyMAIOperatorInfo, optrDefaultBufFn, NULL); code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { @@ -4620,7 +4615,6 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMerge pIntervalInfo->win = pTaskInfo->window; pIntervalInfo->inputOrder = TSDB_ORDER_ASC; pIntervalInfo->interval = interval; - pIntervalInfo->execModel = pTaskInfo->execModel; pIntervalInfo->binfo.mergeResultBlock = pIntervalPhyNode->window.mergeDataBlock; pIntervalInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId; @@ -4650,7 +4644,7 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMerge setOperatorInfo(pOperator, "TimeMergeIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL, false, OP_NOT_OPENED, pMergeIntervalInfo, pTaskInfo); pOperator->fpSet = - createOperatorFpSet(operatorDummyOpenFn, doMergeIntervalAgg, NULL, destroyMergeIntervalOperatorInfo, NULL); + createOperatorFpSet(optrDummyOpenFn, doMergeIntervalAgg, NULL, destroyMergeIntervalOperatorInfo, optrDefaultBufFn, NULL); code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { @@ -4866,8 +4860,8 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys setOperatorInfo(pOperator, "StreamIntervalOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, true, OP_NOT_OPENED, pInfo, pTaskInfo); - pOperator->fpSet = - createOperatorFpSet(operatorDummyOpenFn, doStreamIntervalAgg, NULL, destroyStreamFinalIntervalOperatorInfo, NULL); + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamIntervalAgg, NULL, + destroyStreamFinalIntervalOperatorInfo, optrDefaultBufFn, NULL); initIntervalDownStream(downstream, pPhyNode->type, &pInfo->aggSup, &pInfo->interval, &pInfo->twAggSup); code = appendDownstream(pOperator, &downstream, 1); diff --git a/source/libs/function/src/detail/tavgfunction.c b/source/libs/function/src/detail/tavgfunction.c index f06bafafe3..1c74d22a82 100644 --- a/source/libs/function/src/detail/tavgfunction.c +++ b/source/libs/function/src/detail/tavgfunction.c @@ -270,7 +270,7 @@ static void i64VectorSumAVX2(const int64_t* plist, int32_t numOfRows, SAvgRes* p #if __AVX2__ // find the start position that are aligned to 32bytes address in memory - int32_t width = (bitWidth>>3u) / sizeof(int64_t); + int32_t width = (bitWidth >> 3u) / sizeof(int64_t); int32_t remainder = numOfRows % width; int32_t rounds = numOfRows / width; @@ -286,20 +286,11 @@ static void i64VectorSumAVX2(const int64_t* plist, int32_t numOfRows, SAvgRes* p } // let sum up the final results - if (type == TSDB_DATA_TYPE_BIGINT) { - const int64_t* q = (const int64_t*)∑ - pRes->sum.isum += q[0] + q[1] + q[2] + q[3]; + const int64_t* q = (const int64_t*)∑ + pRes->sum.isum += q[0] + q[1] + q[2] + q[3]; - for (int32_t j = 0; j < remainder; ++j) { - pRes->sum.isum += plist[j + rounds * width]; - } - } else { - const uint64_t* q = (const uint64_t*)∑ - pRes->sum.usum += q[0] + q[1] + q[2] + q[3]; - - for (int32_t j = 0; j < remainder; ++j) { - pRes->sum.usum += (uint64_t)plist[j + rounds * width]; - } + for (int32_t j = 0; j < remainder; ++j) { + pRes->sum.isum += plist[j + rounds * width]; } #endif @@ -588,14 +579,14 @@ int32_t avgFunction(SqlFunctionCtx* pCtx) { const int64_t* plist = (const int64_t*) pCol->pData; // 1. If the CPU supports AVX, let's employ AVX instructions to speedup this loop - if (simdAvailable) { + if (simdAvailable && type == TSDB_DATA_TYPE_BIGINT) { i64VectorSumAVX2(plist, numOfRows, pAvgRes); } else { for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) { if (type == TSDB_DATA_TYPE_BIGINT) { pAvgRes->sum.isum += plist[i]; } else { - pAvgRes->sum.isum += (uint64_t)plist[i]; + pAvgRes->sum.usum += (uint64_t)plist[i]; } } }