refactor: add function for extract the required buffer during query.
This commit is contained in:
parent
94ab465b15
commit
5e77b3eca3
|
@ -46,7 +46,6 @@ extern "C" {
|
||||||
|
|
||||||
typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int32_t order);
|
typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int32_t order);
|
||||||
|
|
||||||
#define Q_STATUS_EQUAL(p, s) (((p) & (s)) != 0u)
|
|
||||||
#define IS_VALID_SESSION_WIN(winInfo) ((winInfo).sessionWin.win.skey > 0)
|
#define IS_VALID_SESSION_WIN(winInfo) ((winInfo).sessionWin.win.skey > 0)
|
||||||
#define SET_SESSION_WIN_INVALID(winInfo) ((winInfo).sessionWin.win.skey = INT64_MIN)
|
#define SET_SESSION_WIN_INVALID(winInfo) ((winInfo).sessionWin.win.skey = INT64_MIN)
|
||||||
#define IS_INVALID_SESSION_WIN_KEY(winKey) ((winKey).win.skey <= 0)
|
#define IS_INVALID_SESSION_WIN_KEY(winKey) ((winKey).win.skey <= 0)
|
||||||
|
@ -109,6 +108,7 @@ typedef int32_t (*__optr_open_fn_t)(struct SOperatorInfo* pOptr);
|
||||||
typedef SSDataBlock* (*__optr_fn_t)(struct SOperatorInfo* pOptr);
|
typedef SSDataBlock* (*__optr_fn_t)(struct SOperatorInfo* pOptr);
|
||||||
typedef void (*__optr_close_fn_t)(void* param);
|
typedef void (*__optr_close_fn_t)(void* param);
|
||||||
typedef int32_t (*__optr_explain_fn_t)(struct SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len);
|
typedef int32_t (*__optr_explain_fn_t)(struct SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len);
|
||||||
|
typedef int32_t (*__optr_reqBuf_fn_t)(struct SOperatorInfo* pOptr);
|
||||||
|
|
||||||
typedef struct STaskIdInfo {
|
typedef struct STaskIdInfo {
|
||||||
uint64_t queryId; // this is also a request id
|
uint64_t queryId; // this is also a request id
|
||||||
|
@ -170,8 +170,9 @@ struct SExecTaskInfo {
|
||||||
STaskCostInfo cost;
|
STaskCostInfo cost;
|
||||||
int64_t owner; // if it is in execution
|
int64_t owner; // if it is in execution
|
||||||
int32_t code;
|
int32_t code;
|
||||||
|
int32_t qbufQuota; // total available buffer (in KB) during execution query
|
||||||
|
|
||||||
int64_t version; // used for stream to record wal version
|
int64_t version; // used for stream to record wal version, why not move to sschemainfo
|
||||||
SStreamTaskInfo streamInfo;
|
SStreamTaskInfo streamInfo;
|
||||||
SSchemaInfo schemaInfo;
|
SSchemaInfo schemaInfo;
|
||||||
STableListInfo* pTableInfoList; // this is a table list
|
STableListInfo* pTableInfoList; // this is a table list
|
||||||
|
@ -198,6 +199,7 @@ typedef struct SOperatorFpSet {
|
||||||
__optr_fn_t getNextFn;
|
__optr_fn_t getNextFn;
|
||||||
__optr_fn_t cleanupFn; // call this function to release the allocated resources ASAP
|
__optr_fn_t cleanupFn; // call this function to release the allocated resources ASAP
|
||||||
__optr_close_fn_t closeFn;
|
__optr_close_fn_t closeFn;
|
||||||
|
__optr_reqBuf_fn_t reqBufFn; // total used buffer for blocking operator
|
||||||
__optr_encode_fn_t encodeResultRow;
|
__optr_encode_fn_t encodeResultRow;
|
||||||
__optr_decode_fn_t decodeResultRow;
|
__optr_decode_fn_t decodeResultRow;
|
||||||
__optr_explain_fn_t getExplainFn;
|
__optr_explain_fn_t getExplainFn;
|
||||||
|
@ -486,12 +488,10 @@ typedef struct STableCountScanSupp {
|
||||||
int16_t dbNameSlotId;
|
int16_t dbNameSlotId;
|
||||||
int16_t stbNameSlotId;
|
int16_t stbNameSlotId;
|
||||||
int16_t tbCountSlotId;
|
int16_t tbCountSlotId;
|
||||||
|
|
||||||
bool groupByDbName;
|
bool groupByDbName;
|
||||||
bool groupByStbName;
|
bool groupByStbName;
|
||||||
char dbNameFilter[TSDB_DB_NAME_LEN];
|
char dbNameFilter[TSDB_DB_NAME_LEN];
|
||||||
char stbNameFilter[TSDB_TABLE_NAME_LEN];
|
char stbNameFilter[TSDB_TABLE_NAME_LEN];
|
||||||
|
|
||||||
} STableCountScanSupp;
|
} STableCountScanSupp;
|
||||||
|
|
||||||
typedef struct STableCountScanOperatorInfo {
|
typedef struct STableCountScanOperatorInfo {
|
||||||
|
@ -671,13 +671,14 @@ typedef struct SStreamFillOperatorInfo {
|
||||||
#define OPTR_SET_OPENED(_optr) ((_optr)->status |= OP_OPENED)
|
#define OPTR_SET_OPENED(_optr) ((_optr)->status |= OP_OPENED)
|
||||||
|
|
||||||
SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t cleanup,
|
SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t cleanup,
|
||||||
__optr_close_fn_t closeFn, __optr_explain_fn_t explain);
|
__optr_close_fn_t closeFn, __optr_reqBuf_fn_t reqBufFn, __optr_explain_fn_t explain);
|
||||||
int32_t operatorDummyOpenFn(SOperatorInfo* pOperator);
|
int32_t optrDummyOpenFn(SOperatorInfo* pOperator);
|
||||||
int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num);
|
int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num);
|
||||||
void setOperatorCompleted(SOperatorInfo* pOperator);
|
void setOperatorCompleted(SOperatorInfo* pOperator);
|
||||||
void setOperatorInfo(SOperatorInfo* pOperator, const char* name, int32_t type, bool blocking, int32_t status,
|
void setOperatorInfo(SOperatorInfo* pOperator, const char* name, int32_t type, bool blocking, int32_t status,
|
||||||
void* pInfo, SExecTaskInfo* pTaskInfo);
|
void* pInfo, SExecTaskInfo* pTaskInfo);
|
||||||
void destroyOperatorInfo(SOperatorInfo* pOperator);
|
void destroyOperatorInfo(SOperatorInfo* pOperator);
|
||||||
|
int32_t optrDefaultBufFn(SOperatorInfo* pOperator);
|
||||||
|
|
||||||
void initBasicInfo(SOptrBasicInfo* pInfo, SSDataBlock* pBlock);
|
void initBasicInfo(SOptrBasicInfo* pInfo, SSDataBlock* pBlock);
|
||||||
void cleanupBasicInfo(SOptrBasicInfo* pInfo);
|
void cleanupBasicInfo(SOptrBasicInfo* pInfo);
|
||||||
|
@ -740,6 +741,8 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi
|
||||||
|
|
||||||
SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNode* pScanPhyNode, const char* pUser, SExecTaskInfo* pTaskInfo);
|
SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNode* pScanPhyNode, const char* pUser, SExecTaskInfo* pTaskInfo);
|
||||||
|
|
||||||
|
SOperatorInfo* createTableCountScanOperatorInfo(SReadHandle* handle, STableCountScanPhysiNode* pNode, SExecTaskInfo* pTaskInfo);
|
||||||
|
|
||||||
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pNode, SExecTaskInfo* pTaskInfo);
|
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pNode, SExecTaskInfo* pTaskInfo);
|
||||||
|
|
||||||
SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pNode, SExecTaskInfo* pTaskInfo);
|
SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pNode, SExecTaskInfo* pTaskInfo);
|
||||||
|
@ -812,8 +815,6 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead
|
||||||
int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, qTaskInfo_t* pTaskInfo, SReadHandle* readHandle);
|
int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, qTaskInfo_t* pTaskInfo, SReadHandle* readHandle);
|
||||||
int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInfoList);
|
int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInfoList);
|
||||||
|
|
||||||
int32_t getMaximumIdleDurationSec();
|
|
||||||
|
|
||||||
STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int64_t ts, SInterval* pInterval,
|
STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int64_t ts, SInterval* pInterval,
|
||||||
int32_t order);
|
int32_t order);
|
||||||
int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, int32_t startPos, TSKEY ekey,
|
int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, int32_t startPos, TSKEY ekey,
|
||||||
|
|
|
@ -117,7 +117,7 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe
|
||||||
pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
|
pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
|
||||||
|
|
||||||
pOperator->fpSet =
|
pOperator->fpSet =
|
||||||
createOperatorFpSet(operatorDummyOpenFn, doScanCache, NULL, destroyCacheScanOperator, NULL);
|
createOperatorFpSet(optrDummyOpenFn, doScanCache, NULL, destroyCacheScanOperator, optrDefaultBufFn, NULL);
|
||||||
|
|
||||||
pOperator->cost.openCost = 0;
|
pOperator->cost.openCost = 0;
|
||||||
return pOperator;
|
return pOperator;
|
||||||
|
|
|
@ -307,7 +307,7 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode
|
||||||
pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pDummyBlock->pDataBlock);
|
pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pDummyBlock->pDataBlock);
|
||||||
|
|
||||||
pOperator->fpSet =
|
pOperator->fpSet =
|
||||||
createOperatorFpSet(prepareLoadRemoteData, loadRemoteData, NULL, destroyExchangeOperatorInfo, NULL);
|
createOperatorFpSet(prepareLoadRemoteData, loadRemoteData, NULL, destroyExchangeOperatorInfo, optrDefaultBufFn, NULL);
|
||||||
return pOperator;
|
return pOperator;
|
||||||
|
|
||||||
_error:
|
_error:
|
||||||
|
|
|
@ -503,7 +503,7 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bo
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pTaskInfo->cost.start == 0) {
|
if (pTaskInfo->cost.start == 0) {
|
||||||
pTaskInfo->cost.start = taosGetTimestampMs();
|
pTaskInfo->cost.start = taosGetTimestampUs();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (isTaskKilled(pTaskInfo)) {
|
if (isTaskKilled(pTaskInfo)) {
|
||||||
|
@ -597,7 +597,7 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pTaskInfo->cost.start == 0) {
|
if (pTaskInfo->cost.start == 0) {
|
||||||
pTaskInfo->cost.start = taosGetTimestampMs();
|
pTaskInfo->cost.start = taosGetTimestampUs();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (isTaskKilled(pTaskInfo)) {
|
if (isTaskKilled(pTaskInfo)) {
|
||||||
|
@ -706,15 +706,20 @@ int32_t qAsyncKillTask(qTaskInfo_t qinfo, int32_t rspCode) {
|
||||||
|
|
||||||
static void printTaskExecCostInLog(SExecTaskInfo* pTaskInfo) {
|
static void printTaskExecCostInLog(SExecTaskInfo* pTaskInfo) {
|
||||||
STaskCostInfo* pSummary = &pTaskInfo->cost;
|
STaskCostInfo* pSummary = &pTaskInfo->cost;
|
||||||
|
int64_t idleTime = pSummary->start - pSummary->created;
|
||||||
|
|
||||||
SFileBlockLoadRecorder* pRecorder = pSummary->pRecoder;
|
SFileBlockLoadRecorder* pRecorder = pSummary->pRecoder;
|
||||||
if (pSummary->pRecoder != NULL) {
|
if (pSummary->pRecoder != NULL) {
|
||||||
qDebug(
|
qDebug(
|
||||||
"%s :cost summary: elapsed time:%.2f ms, extract tableList:%.2f ms, createGroupIdMap:%.2f ms, total blocks:%d, "
|
"%s :cost summary: idle in queue:%.2f ms, elapsed time:%.2f ms, extract tableList:%.2f ms, "
|
||||||
|
"createGroupIdMap:%.2f ms, total blocks:%d, "
|
||||||
"load block SMA:%d, load data block:%d, total rows:%" PRId64 ", check rows:%" PRId64,
|
"load block SMA:%d, load data block:%d, total rows:%" PRId64 ", check rows:%" PRId64,
|
||||||
GET_TASKID(pTaskInfo), pSummary->elapsedTime / 1000.0, pSummary->extractListTime, pSummary->groupIdMapTime,
|
GET_TASKID(pTaskInfo), idleTime / 1000.0, pSummary->elapsedTime / 1000.0, pSummary->extractListTime,
|
||||||
pRecorder->totalBlocks, pRecorder->loadBlockStatis, pRecorder->loadBlocks, pRecorder->totalRows,
|
pSummary->groupIdMapTime, pRecorder->totalBlocks, pRecorder->loadBlockStatis, pRecorder->loadBlocks,
|
||||||
pRecorder->totalCheckedRows);
|
pRecorder->totalRows, pRecorder->totalCheckedRows);
|
||||||
|
} else {
|
||||||
|
qDebug("%s :cost summary: idle in queue:%.2f ms, elapsed time:%.2f ms", GET_TASKID(pTaskInfo), idleTime / 1000.0,
|
||||||
|
pSummary->elapsedTime / 1000.0);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -85,8 +85,6 @@ typedef struct SAggOperatorInfo {
|
||||||
SExprSupp scalarExprSup;
|
SExprSupp scalarExprSup;
|
||||||
} SAggOperatorInfo;
|
} SAggOperatorInfo;
|
||||||
|
|
||||||
int32_t getMaximumIdleDurationSec() { return tsShellActivityTimer * 2; }
|
|
||||||
|
|
||||||
static void setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExpr, SSDataBlock* pBlock);
|
static void setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExpr, SSDataBlock* pBlock);
|
||||||
|
|
||||||
static void releaseQueryBuf(size_t numOfTables);
|
static void releaseQueryBuf(size_t numOfTables);
|
||||||
|
@ -106,7 +104,7 @@ void setOperatorCompleted(SOperatorInfo* pOperator) {
|
||||||
pOperator->status = OP_EXEC_DONE;
|
pOperator->status = OP_EXEC_DONE;
|
||||||
ASSERT(pOperator->pTaskInfo != NULL);
|
ASSERT(pOperator->pTaskInfo != NULL);
|
||||||
|
|
||||||
pOperator->cost.totalCost = (taosGetTimestampUs() - pOperator->pTaskInfo->cost.start * 1000) / 1000.0;
|
pOperator->cost.totalCost = (taosGetTimestampUs() - pOperator->pTaskInfo->cost.start) / 1000.0;
|
||||||
setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
|
setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -120,19 +118,21 @@ void setOperatorInfo(SOperatorInfo* pOperator, const char* name, int32_t type, b
|
||||||
pOperator->pTaskInfo = pTaskInfo;
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t operatorDummyOpenFn(SOperatorInfo* pOperator) {
|
int32_t optrDummyOpenFn(SOperatorInfo* pOperator) {
|
||||||
OPTR_SET_OPENED(pOperator);
|
OPTR_SET_OPENED(pOperator);
|
||||||
pOperator->cost.openCost = 0;
|
pOperator->cost.openCost = 0;
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t cleanup,
|
SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t cleanup,
|
||||||
__optr_close_fn_t closeFn, __optr_explain_fn_t explain) {
|
__optr_close_fn_t closeFn, __optr_reqBuf_fn_t reqBufFn,
|
||||||
|
__optr_explain_fn_t explain) {
|
||||||
SOperatorFpSet fpSet = {
|
SOperatorFpSet fpSet = {
|
||||||
._openFn = openFn,
|
._openFn = openFn,
|
||||||
.getNextFn = nextFn,
|
.getNextFn = nextFn,
|
||||||
.cleanupFn = cleanup,
|
.cleanupFn = cleanup,
|
||||||
.closeFn = closeFn,
|
.closeFn = closeFn,
|
||||||
|
.reqBufFn = reqBufFn,
|
||||||
.getExplainFn = explain,
|
.getExplainFn = explain,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -939,10 +939,10 @@ static void setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, u
|
||||||
}
|
}
|
||||||
|
|
||||||
static void doUpdateNumOfRows(SqlFunctionCtx* pCtx, SResultRow* pRow, int32_t numOfExprs,
|
static void doUpdateNumOfRows(SqlFunctionCtx* pCtx, SResultRow* pRow, int32_t numOfExprs,
|
||||||
const int32_t* rowCellOffset) {
|
const int32_t* rowEntryOffset) {
|
||||||
bool returnNotNull = false;
|
bool returnNotNull = false;
|
||||||
for (int32_t j = 0; j < numOfExprs; ++j) {
|
for (int32_t j = 0; j < numOfExprs; ++j) {
|
||||||
struct SResultRowEntryInfo* pResInfo = getResultEntryInfo(pRow, j, rowCellOffset);
|
SResultRowEntryInfo* pResInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
|
||||||
if (!isRowEntryInitialized(pResInfo)) {
|
if (!isRowEntryInitialized(pResInfo)) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -1145,43 +1145,6 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// void skipBlocks(STaskRuntimeEnv *pRuntimeEnv) {
|
|
||||||
// STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
|
|
||||||
//
|
|
||||||
// if (pQueryAttr->limit.offset <= 0 || pQueryAttr->numOfFilterCols > 0) {
|
|
||||||
// return;
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// pQueryAttr->pos = 0;
|
|
||||||
// int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQueryAttr->order.order);
|
|
||||||
//
|
|
||||||
// STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current;
|
|
||||||
// TsdbQueryHandleT pTsdbReadHandle = pRuntimeEnv->pTsdbReadHandle;
|
|
||||||
//
|
|
||||||
// SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER;
|
|
||||||
// while (tsdbNextDataBlock(pTsdbReadHandle)) {
|
|
||||||
// if (isTaskKilled(pRuntimeEnv->qinfo)) {
|
|
||||||
// T_LONG_JMP(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// if (pQueryAttr->limit.offset > blockInfo.rows) {
|
|
||||||
// pQueryAttr->limit.offset -= blockInfo.rows;
|
|
||||||
// pTableQueryInfo->lastKey = (QUERY_IS_ASC_QUERY(pQueryAttr)) ? blockInfo.window.ekey : blockInfo.window.skey;
|
|
||||||
// pTableQueryInfo->lastKey += step;
|
|
||||||
//
|
|
||||||
// //qDebug("QInfo:0x%"PRIx64" skip rows:%d, offset:%" PRId64, GET_TASKID(pRuntimeEnv), blockInfo.rows,
|
|
||||||
// pQuery->limit.offset);
|
|
||||||
// } else { // find the appropriated start position in current block
|
|
||||||
// updateOffsetVal(pRuntimeEnv, &blockInfo);
|
|
||||||
// break;
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// if (terrno != TSDB_CODE_SUCCESS) {
|
|
||||||
// T_LONG_JMP(pRuntimeEnv->env, terrno);
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
|
|
||||||
// static TSKEY doSkipIntervalProcess(STaskRuntimeEnv* pRuntimeEnv, STimeWindow* win, SDataBlockInfo* pBlockInfo,
|
// static TSKEY doSkipIntervalProcess(STaskRuntimeEnv* pRuntimeEnv, STimeWindow* win, SDataBlockInfo* pBlockInfo,
|
||||||
// STableQueryInfo* pTableQueryInfo) {
|
// STableQueryInfo* pTableQueryInfo) {
|
||||||
// STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
|
// STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
|
||||||
|
@ -1406,11 +1369,11 @@ static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBloc
|
||||||
|
|
||||||
SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
|
SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
|
||||||
bool hasCountFunc = false;
|
bool hasCountFunc = false;
|
||||||
|
|
||||||
for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) {
|
for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) {
|
||||||
if ((strcmp(pCtx[i].pExpr->pExpr->_function.functionName, "count") == 0) ||
|
const char* pName = pCtx[i].pExpr->pExpr->_function.functionName;
|
||||||
(strcmp(pCtx[i].pExpr->pExpr->_function.functionName, "hyperloglog") == 0) ||
|
if ((strcmp(pName, "count") == 0) || (strcmp(pName, "hyperloglog") == 0) ||
|
||||||
(strcmp(pCtx[i].pExpr->pExpr->_function.functionName, "_hyperloglog_partial") == 0) ||
|
(strcmp(pName, "_hyperloglog_partial") == 0) || (strcmp(pName, "_hyperloglog_merge") == 0)) {
|
||||||
(strcmp(pCtx[i].pExpr->pExpr->_function.functionName, "_hyperloglog_merge") == 0)) {
|
|
||||||
hasCountFunc = true;
|
hasCountFunc = true;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -1463,7 +1426,6 @@ static void destroyDataBlockForEmptyInput(bool blockAllocated, SSDataBlock **ppB
|
||||||
*ppBlock = NULL;
|
*ppBlock = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
// this is a blocking operator
|
// this is a blocking operator
|
||||||
static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
|
static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
|
||||||
if (OPTR_IS_OPENED(pOperator)) {
|
if (OPTR_IS_OPENED(pOperator)) {
|
||||||
|
@ -1614,6 +1576,15 @@ void destroyOperatorInfo(SOperatorInfo* pOperator) {
|
||||||
taosMemoryFreeClear(pOperator);
|
taosMemoryFreeClear(pOperator);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// each operator should be set their own function to return total cost buffer
|
||||||
|
int32_t optrDefaultBufFn(SOperatorInfo* pOperator) {
|
||||||
|
if (pOperator->blocking) {
|
||||||
|
ASSERT(0);
|
||||||
|
} else {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz) {
|
int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz) {
|
||||||
*defaultPgsz = 4096;
|
*defaultPgsz = 4096;
|
||||||
while (*defaultPgsz < rowSize * 4) {
|
while (*defaultPgsz < rowSize * 4) {
|
||||||
|
@ -1794,7 +1765,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiN
|
||||||
|
|
||||||
setOperatorInfo(pOperator, "TableAggregate", QUERY_NODE_PHYSICAL_PLAN_HASH_AGG, true, OP_NOT_OPENED, pInfo,
|
setOperatorInfo(pOperator, "TableAggregate", QUERY_NODE_PHYSICAL_PLAN_HASH_AGG, true, OP_NOT_OPENED, pInfo,
|
||||||
pTaskInfo);
|
pTaskInfo);
|
||||||
pOperator->fpSet = createOperatorFpSet(doOpenAggregateOptr, getAggregateResult, NULL, destroyAggOperatorInfo, NULL);
|
pOperator->fpSet = createOperatorFpSet(doOpenAggregateOptr, getAggregateResult, NULL, destroyAggOperatorInfo, optrDefaultBufFn, NULL);
|
||||||
|
|
||||||
if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
|
if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
|
||||||
STableScanInfo* pTableScanInfo = downstream->info;
|
STableScanInfo* pTableScanInfo = downstream->info;
|
||||||
|
@ -1871,8 +1842,6 @@ static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPT
|
||||||
|
|
||||||
SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode);
|
SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode);
|
||||||
|
|
||||||
SOperatorInfo* createTableCountScanOperatorInfo(SReadHandle* handle, STableCountScanPhysiNode* pNode,
|
|
||||||
SExecTaskInfo* pTaskInfo);
|
|
||||||
int32_t extractTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNode, SExecTaskInfo* pTaskInfo) {
|
int32_t extractTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNode, SExecTaskInfo* pTaskInfo) {
|
||||||
SMetaReader mr = {0};
|
SMetaReader mr = {0};
|
||||||
metaReaderInit(&mr, pHandle->meta, 0);
|
metaReaderInit(&mr, pHandle->meta, 0);
|
||||||
|
|
|
@ -381,7 +381,7 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode*
|
||||||
|
|
||||||
setOperatorInfo(pOperator, "FillOperator", QUERY_NODE_PHYSICAL_PLAN_FILL, false, OP_NOT_OPENED, pInfo, pTaskInfo);
|
setOperatorInfo(pOperator, "FillOperator", QUERY_NODE_PHYSICAL_PLAN_FILL, false, OP_NOT_OPENED, pInfo, pTaskInfo);
|
||||||
pOperator->exprSupp.numOfExprs = pInfo->numOfExpr;
|
pOperator->exprSupp.numOfExprs = pInfo->numOfExpr;
|
||||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doFill, NULL, destroyFillOperatorInfo, NULL);
|
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doFill, NULL, destroyFillOperatorInfo, optrDefaultBufFn, NULL);
|
||||||
|
|
||||||
code = appendDownstream(pOperator, &downstream, 1);
|
code = appendDownstream(pOperator, &downstream, 1);
|
||||||
return pOperator;
|
return pOperator;
|
||||||
|
@ -1478,7 +1478,7 @@ SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFi
|
||||||
pInfo->srcRowIndex = 0;
|
pInfo->srcRowIndex = 0;
|
||||||
setOperatorInfo(pOperator, "StreamFillOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL, false, OP_NOT_OPENED, pInfo,
|
setOperatorInfo(pOperator, "StreamFillOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL, false, OP_NOT_OPENED, pInfo,
|
||||||
pTaskInfo);
|
pTaskInfo);
|
||||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamFill, NULL, destroyStreamFillOperatorInfo, NULL);
|
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamFill, NULL, destroyStreamFillOperatorInfo, optrDefaultBufFn, NULL);
|
||||||
|
|
||||||
code = appendDownstream(pOperator, &downstream, 1);
|
code = appendDownstream(pOperator, &downstream, 1);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
|
|
@ -470,7 +470,7 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode*
|
||||||
setOperatorInfo(pOperator, "GroupbyAggOperator", 0, true, OP_NOT_OPENED, pInfo, pTaskInfo);
|
setOperatorInfo(pOperator, "GroupbyAggOperator", 0, true, OP_NOT_OPENED, pInfo, pTaskInfo);
|
||||||
|
|
||||||
pOperator->fpSet =
|
pOperator->fpSet =
|
||||||
createOperatorFpSet(operatorDummyOpenFn, hashGroupbyAggregate, NULL, destroyGroupOperatorInfo, NULL);
|
createOperatorFpSet(optrDummyOpenFn, hashGroupbyAggregate, NULL, destroyGroupOperatorInfo, optrDefaultBufFn, NULL);
|
||||||
code = appendDownstream(pOperator, &downstream, 1);
|
code = appendDownstream(pOperator, &downstream, 1);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _error;
|
goto _error;
|
||||||
|
@ -850,7 +850,7 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition
|
||||||
pOperator->exprSupp.numOfExprs = numOfCols;
|
pOperator->exprSupp.numOfExprs = numOfCols;
|
||||||
pOperator->exprSupp.pExprInfo = pExprInfo;
|
pOperator->exprSupp.pExprInfo = pExprInfo;
|
||||||
|
|
||||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, hashPartition, NULL, destroyPartitionOperatorInfo, NULL);
|
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, hashPartition, NULL, destroyPartitionOperatorInfo, optrDefaultBufFn, NULL);
|
||||||
|
|
||||||
code = appendDownstream(pOperator, &downstream, 1);
|
code = appendDownstream(pOperator, &downstream, 1);
|
||||||
return pOperator;
|
return pOperator;
|
||||||
|
@ -1142,7 +1142,7 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr
|
||||||
pOperator->exprSupp.numOfExprs = numOfCols;
|
pOperator->exprSupp.numOfExprs = numOfCols;
|
||||||
pOperator->exprSupp.pExprInfo = pExprInfo;
|
pOperator->exprSupp.pExprInfo = pExprInfo;
|
||||||
pOperator->fpSet =
|
pOperator->fpSet =
|
||||||
createOperatorFpSet(operatorDummyOpenFn, doStreamHashPartition, NULL, destroyStreamPartitionOperatorInfo, NULL);
|
createOperatorFpSet(optrDummyOpenFn, doStreamHashPartition, NULL, destroyStreamPartitionOperatorInfo, optrDefaultBufFn, NULL);
|
||||||
|
|
||||||
initParDownStream(downstream, &pInfo->partitionSup, &pInfo->scalarSup);
|
initParDownStream(downstream, &pInfo->partitionSup, &pInfo->scalarSup);
|
||||||
code = appendDownstream(pOperator, &downstream, 1);
|
code = appendDownstream(pOperator, &downstream, 1);
|
||||||
|
|
|
@ -136,7 +136,7 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t
|
||||||
pInfo->inputOrder = TSDB_ORDER_DESC;
|
pInfo->inputOrder = TSDB_ORDER_DESC;
|
||||||
}
|
}
|
||||||
|
|
||||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doMergeJoin, NULL, destroyMergeJoinOperator, NULL);
|
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doMergeJoin, NULL, destroyMergeJoinOperator, optrDefaultBufFn, NULL);
|
||||||
code = appendDownstream(pOperator, pDownstream, numOfDownstream);
|
code = appendDownstream(pOperator, pDownstream, numOfDownstream);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _error;
|
goto _error;
|
||||||
|
|
|
@ -118,8 +118,8 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys
|
||||||
pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pOperator->exprSupp.pCtx, numOfCols);
|
pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pOperator->exprSupp.pCtx, numOfCols);
|
||||||
|
|
||||||
setOperatorInfo(pOperator, "ProjectOperator", QUERY_NODE_PHYSICAL_PLAN_PROJECT, false, OP_NOT_OPENED, pInfo, pTaskInfo);
|
setOperatorInfo(pOperator, "ProjectOperator", QUERY_NODE_PHYSICAL_PLAN_PROJECT, false, OP_NOT_OPENED, pInfo, pTaskInfo);
|
||||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doProjectOperation, NULL,
|
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doProjectOperation, NULL,
|
||||||
destroyProjectOperatorInfo, NULL);
|
destroyProjectOperatorInfo, optrDefaultBufFn, NULL);
|
||||||
|
|
||||||
code = appendDownstream(pOperator, &downstream, 1);
|
code = appendDownstream(pOperator, &downstream, 1);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
@ -415,7 +415,7 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy
|
||||||
pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pSup->pCtx, numOfExpr);
|
pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pSup->pCtx, numOfExpr);
|
||||||
|
|
||||||
setOperatorInfo(pOperator, "IndefinitOperator", QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC, false, OP_NOT_OPENED, pInfo, pTaskInfo);
|
setOperatorInfo(pOperator, "IndefinitOperator", QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC, false, OP_NOT_OPENED, pInfo, pTaskInfo);
|
||||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doApplyIndefinitFunction, NULL, destroyIndefinitOperatorInfo, NULL);
|
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doApplyIndefinitFunction, NULL, destroyIndefinitOperatorInfo, optrDefaultBufFn, NULL);
|
||||||
|
|
||||||
code = appendDownstream(pOperator, &downstream, 1);
|
code = appendDownstream(pOperator, &downstream, 1);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
|
|
@ -899,8 +899,8 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
|
||||||
}
|
}
|
||||||
|
|
||||||
taosLRUCacheSetStrictCapacity(pInfo->base.metaCache.pTableMetaEntryCache, false);
|
taosLRUCacheSetStrictCapacity(pInfo->base.metaCache.pTableMetaEntryCache, false);
|
||||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScan, NULL, destroyTableScanOperatorInfo,
|
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableScan, NULL, destroyTableScanOperatorInfo,
|
||||||
getTableScannerExecInfo);
|
optrDefaultBufFn, getTableScannerExecInfo);
|
||||||
|
|
||||||
// for non-blocking operator, the open cost is always 0
|
// for non-blocking operator, the open cost is always 0
|
||||||
pOperator->cost.openCost = 0;
|
pOperator->cost.openCost = 0;
|
||||||
|
@ -925,7 +925,7 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo*
|
||||||
|
|
||||||
setOperatorInfo(pOperator, "TableSeqScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN, false, OP_NOT_OPENED,
|
setOperatorInfo(pOperator, "TableSeqScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN, false, OP_NOT_OPENED,
|
||||||
pInfo, pTaskInfo);
|
pInfo, pTaskInfo);
|
||||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScanImpl, NULL, NULL, NULL);
|
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableScanImpl, NULL, NULL, optrDefaultBufFn, NULL);
|
||||||
return pOperator;
|
return pOperator;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2139,7 +2139,7 @@ SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pT
|
||||||
setOperatorInfo(pOperator, "RawScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, false, OP_NOT_OPENED, pInfo,
|
setOperatorInfo(pOperator, "RawScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, false, OP_NOT_OPENED, pInfo,
|
||||||
pTaskInfo);
|
pTaskInfo);
|
||||||
|
|
||||||
pOperator->fpSet = createOperatorFpSet(NULL, doRawScan, NULL, destroyRawScanOperatorInfo, NULL);
|
pOperator->fpSet = createOperatorFpSet(NULL, doRawScan, NULL, destroyRawScanOperatorInfo, optrDefaultBufFn, NULL);
|
||||||
return pOperator;
|
return pOperator;
|
||||||
|
|
||||||
_end:
|
_end:
|
||||||
|
@ -2328,7 +2328,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
|
||||||
pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
|
pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
|
||||||
|
|
||||||
__optr_fn_t nextFn = pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM ? doStreamScan : doQueueScan;
|
__optr_fn_t nextFn = pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM ? doStreamScan : doQueueScan;
|
||||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, nextFn, NULL, destroyStreamScanOperatorInfo, NULL);
|
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, nextFn, NULL, destroyStreamScanOperatorInfo, optrDefaultBufFn, NULL);
|
||||||
|
|
||||||
return pOperator;
|
return pOperator;
|
||||||
|
|
||||||
|
@ -2465,7 +2465,7 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi
|
||||||
initResultSizeInfo(&pOperator->resultInfo, 4096);
|
initResultSizeInfo(&pOperator->resultInfo, 4096);
|
||||||
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
|
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
|
||||||
|
|
||||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTagScan, NULL, destroyTagScanOperatorInfo, NULL);
|
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTagScan, NULL, destroyTagScanOperatorInfo, optrDefaultBufFn, NULL);
|
||||||
|
|
||||||
return pOperator;
|
return pOperator;
|
||||||
|
|
||||||
|
@ -2866,8 +2866,8 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
|
||||||
pInfo, pTaskInfo);
|
pInfo, pTaskInfo);
|
||||||
pOperator->exprSupp.numOfExprs = numOfCols;
|
pOperator->exprSupp.numOfExprs = numOfCols;
|
||||||
|
|
||||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableMergeScan, NULL, destroyTableMergeScanOperatorInfo,
|
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableMergeScan, NULL, destroyTableMergeScanOperatorInfo,
|
||||||
getTableMergeScanExplainExecInfo);
|
optrDefaultBufFn, getTableMergeScanExplainExecInfo);
|
||||||
pOperator->cost.openCost = 0;
|
pOperator->cost.openCost = 0;
|
||||||
return pOperator;
|
return pOperator;
|
||||||
|
|
||||||
|
@ -3015,7 +3015,7 @@ SOperatorInfo* createTableCountScanOperatorInfo(SReadHandle* readHandle, STableC
|
||||||
setOperatorInfo(pOperator, "TableCountScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN, false, OP_NOT_OPENED,
|
setOperatorInfo(pOperator, "TableCountScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN, false, OP_NOT_OPENED,
|
||||||
pInfo, pTaskInfo);
|
pInfo, pTaskInfo);
|
||||||
pOperator->fpSet =
|
pOperator->fpSet =
|
||||||
createOperatorFpSet(operatorDummyOpenFn, doTableCountScan, NULL, destoryTableCountScanOperator, NULL);
|
createOperatorFpSet(optrDummyOpenFn, doTableCountScan, NULL, destoryTableCountScanOperator, optrDefaultBufFn, NULL);
|
||||||
return pOperator;
|
return pOperator;
|
||||||
|
|
||||||
_error:
|
_error:
|
||||||
|
|
|
@ -75,7 +75,7 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode*
|
||||||
// TODO dynamic set the available sort buffer
|
// TODO dynamic set the available sort buffer
|
||||||
|
|
||||||
pOperator->fpSet =
|
pOperator->fpSet =
|
||||||
createOperatorFpSet(doOpenSortOperator, doSort, NULL, destroySortOperatorInfo, getExplainExecInfo);
|
createOperatorFpSet(doOpenSortOperator, doSort, NULL, destroySortOperatorInfo, optrDefaultBufFn, getExplainExecInfo);
|
||||||
|
|
||||||
code = appendDownstream(pOperator, &downstream, 1);
|
code = appendDownstream(pOperator, &downstream, 1);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
@ -521,8 +521,8 @@ SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSort
|
||||||
|
|
||||||
pInfo->pSortInfo = createSortInfo(pSortPhyNode->pSortKeys);
|
pInfo->pSortInfo = createSortInfo(pSortPhyNode->pSortKeys);
|
||||||
setOperatorInfo(pOperator, "GroupSortOperator", QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT, false, OP_NOT_OPENED, pInfo, pTaskInfo);
|
setOperatorInfo(pOperator, "GroupSortOperator", QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT, false, OP_NOT_OPENED, pInfo, pTaskInfo);
|
||||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doGroupSort, NULL, destroyGroupSortOperatorInfo,
|
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doGroupSort, NULL, destroyGroupSortOperatorInfo,
|
||||||
getGroupSortExplainExecInfo);
|
optrDefaultBufFn, getGroupSortExplainExecInfo);
|
||||||
|
|
||||||
code = appendDownstream(pOperator, &downstream, 1);
|
code = appendDownstream(pOperator, &downstream, 1);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
@ -798,7 +798,7 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size
|
||||||
|
|
||||||
setOperatorInfo(pOperator, "MultiwayMergeOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE, false, OP_NOT_OPENED, pInfo, pTaskInfo);
|
setOperatorInfo(pOperator, "MultiwayMergeOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE, false, OP_NOT_OPENED, pInfo, pTaskInfo);
|
||||||
pOperator->fpSet = createOperatorFpSet(openMultiwayMergeOperator, doMultiwayMerge, NULL,
|
pOperator->fpSet = createOperatorFpSet(openMultiwayMergeOperator, doMultiwayMerge, NULL,
|
||||||
destroyMultiwayMergeOperatorInfo, getMultiwayMergeExplainExecInfo);
|
destroyMultiwayMergeOperatorInfo, optrDefaultBufFn, getMultiwayMergeExplainExecInfo);
|
||||||
|
|
||||||
code = appendDownstream(pOperator, downStreams, numStreams);
|
code = appendDownstream(pOperator, downStreams, numStreams);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
|
|
@ -1437,7 +1437,7 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan
|
||||||
setOperatorInfo(pOperator, "SysTableScanOperator", QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN, false, OP_NOT_OPENED,
|
setOperatorInfo(pOperator, "SysTableScanOperator", QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN, false, OP_NOT_OPENED,
|
||||||
pInfo, pTaskInfo);
|
pInfo, pTaskInfo);
|
||||||
pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
|
pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
|
||||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSysTableScan, NULL, destroySysScanOperator, NULL);
|
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doSysTableScan, NULL, destroySysScanOperator, optrDefaultBufFn, NULL);
|
||||||
return pOperator;
|
return pOperator;
|
||||||
|
|
||||||
_error:
|
_error:
|
||||||
|
@ -1943,7 +1943,7 @@ SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDi
|
||||||
setOperatorInfo(pOperator, "DataBlockDistScanOperator", QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN, false,
|
setOperatorInfo(pOperator, "DataBlockDistScanOperator", QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN, false,
|
||||||
OP_NOT_OPENED, pInfo, pTaskInfo);
|
OP_NOT_OPENED, pInfo, pTaskInfo);
|
||||||
pOperator->fpSet =
|
pOperator->fpSet =
|
||||||
createOperatorFpSet(operatorDummyOpenFn, doBlockInfoScan, NULL, destroyBlockDistScanOperatorInfo, NULL);
|
createOperatorFpSet(optrDummyOpenFn, doBlockInfoScan, NULL, destroyBlockDistScanOperatorInfo, optrDefaultBufFn, NULL);
|
||||||
return pOperator;
|
return pOperator;
|
||||||
|
|
||||||
_error:
|
_error:
|
||||||
|
|
|
@ -544,7 +544,7 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode
|
||||||
|
|
||||||
setOperatorInfo(pOperator, "TimeSliceOperator", QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC, false, OP_NOT_OPENED, pInfo,
|
setOperatorInfo(pOperator, "TimeSliceOperator", QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC, false, OP_NOT_OPENED, pInfo,
|
||||||
pTaskInfo);
|
pTaskInfo);
|
||||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTimeslice, NULL, destroyTimeSliceOperatorInfo, NULL);
|
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTimeslice, NULL, destroyTimeSliceOperatorInfo, optrDefaultBufFn, NULL);
|
||||||
|
|
||||||
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
|
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
|
||||||
|
|
||||||
|
|
|
@ -1050,13 +1050,13 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||||
|
|
||||||
SIntervalAggOperatorInfo* pInfo = pOperator->info;
|
SIntervalAggOperatorInfo* pInfo = pOperator->info;
|
||||||
SExprSupp* pSup = &pOperator->exprSupp;
|
SExprSupp* pSup = &pOperator->exprSupp;
|
||||||
|
|
||||||
int32_t scanFlag = MAIN_SCAN;
|
int32_t scanFlag = MAIN_SCAN;
|
||||||
|
|
||||||
int64_t st = taosGetTimestampUs();
|
int64_t st = taosGetTimestampUs();
|
||||||
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
|
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
|
||||||
|
@ -1268,9 +1268,6 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SSDataBlock* pBlock = pInfo->binfo.pRes;
|
SSDataBlock* pBlock = pInfo->binfo.pRes;
|
||||||
|
|
||||||
ASSERT(pInfo->execModel == OPTR_EXEC_MODEL_BATCH);
|
|
||||||
|
|
||||||
pTaskInfo->code = pOperator->fpSet._openFn(pOperator);
|
pTaskInfo->code = pOperator->fpSet._openFn(pOperator);
|
||||||
if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
|
if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -1765,7 +1762,6 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPh
|
||||||
pInfo->inputOrder = (pPhyNode->window.inputTsOrder == ORDER_ASC) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
|
pInfo->inputOrder = (pPhyNode->window.inputTsOrder == ORDER_ASC) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
|
||||||
pInfo->resultTsOrder = (pPhyNode->window.outputTsOrder == ORDER_ASC) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
|
pInfo->resultTsOrder = (pPhyNode->window.outputTsOrder == ORDER_ASC) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
|
||||||
pInfo->interval = interval;
|
pInfo->interval = interval;
|
||||||
pInfo->execModel = pTaskInfo->execModel;
|
|
||||||
pInfo->twAggSup = as;
|
pInfo->twAggSup = as;
|
||||||
pInfo->binfo.mergeResultBlock = pPhyNode->window.mergeDataBlock;
|
pInfo->binfo.mergeResultBlock = pPhyNode->window.mergeDataBlock;
|
||||||
|
|
||||||
|
@ -1797,7 +1793,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPh
|
||||||
pInfo, pTaskInfo);
|
pInfo, pTaskInfo);
|
||||||
|
|
||||||
pOperator->fpSet =
|
pOperator->fpSet =
|
||||||
createOperatorFpSet(doOpenIntervalAgg, doBuildIntervalResult, NULL, destroyIntervalOperatorInfo, NULL);
|
createOperatorFpSet(doOpenIntervalAgg, doBuildIntervalResult, NULL, destroyIntervalOperatorInfo, optrDefaultBufFn, NULL);
|
||||||
|
|
||||||
code = appendDownstream(pOperator, &downstream, 1);
|
code = appendDownstream(pOperator, &downstream, 1);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
@ -2015,7 +2011,7 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWi
|
||||||
setOperatorInfo(pOperator, "StateWindowOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE, true, OP_NOT_OPENED, pInfo,
|
setOperatorInfo(pOperator, "StateWindowOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE, true, OP_NOT_OPENED, pInfo,
|
||||||
pTaskInfo);
|
pTaskInfo);
|
||||||
pOperator->fpSet =
|
pOperator->fpSet =
|
||||||
createOperatorFpSet(openStateWindowAggOptr, doStateWindowAgg, NULL, destroyStateWindowOperatorInfo, NULL);
|
createOperatorFpSet(openStateWindowAggOptr, doStateWindowAgg, NULL, destroyStateWindowOperatorInfo, optrDefaultBufFn, NULL);
|
||||||
|
|
||||||
code = appendDownstream(pOperator, &downstream, 1);
|
code = appendDownstream(pOperator, &downstream, 1);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
@ -2088,7 +2084,7 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionW
|
||||||
setOperatorInfo(pOperator, "SessionWindowAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION, true, OP_NOT_OPENED,
|
setOperatorInfo(pOperator, "SessionWindowAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION, true, OP_NOT_OPENED,
|
||||||
pInfo, pTaskInfo);
|
pInfo, pTaskInfo);
|
||||||
pOperator->fpSet =
|
pOperator->fpSet =
|
||||||
createOperatorFpSet(operatorDummyOpenFn, doSessionWindowAgg, NULL, destroySWindowOperatorInfo, NULL);
|
createOperatorFpSet(optrDummyOpenFn, doSessionWindowAgg, NULL, destroySWindowOperatorInfo, optrDefaultBufFn, NULL);
|
||||||
pOperator->pTaskInfo = pTaskInfo;
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
code = appendDownstream(pOperator, &downstream, 1);
|
code = appendDownstream(pOperator, &downstream, 1);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
@ -2749,7 +2745,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
|
||||||
pOperator->info = pInfo;
|
pOperator->info = pInfo;
|
||||||
|
|
||||||
pOperator->fpSet =
|
pOperator->fpSet =
|
||||||
createOperatorFpSet(NULL, doStreamFinalIntervalAgg, NULL, destroyStreamFinalIntervalOperatorInfo, NULL);
|
createOperatorFpSet(NULL, doStreamFinalIntervalAgg, NULL, destroyStreamFinalIntervalOperatorInfo, optrDefaultBufFn, NULL);
|
||||||
if (pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL) {
|
if (pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL) {
|
||||||
initIntervalDownStream(downstream, pPhyNode->type, &pInfo->aggSup, &pInfo->interval, &pInfo->twAggSup);
|
initIntervalDownStream(downstream, pPhyNode->type, &pInfo->aggSup, &pInfo->interval, &pInfo->twAggSup);
|
||||||
}
|
}
|
||||||
|
@ -3561,7 +3557,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
|
||||||
setOperatorInfo(pOperator, "StreamSessionWindowAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION, true,
|
setOperatorInfo(pOperator, "StreamSessionWindowAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION, true,
|
||||||
OP_NOT_OPENED, pInfo, pTaskInfo);
|
OP_NOT_OPENED, pInfo, pTaskInfo);
|
||||||
pOperator->fpSet =
|
pOperator->fpSet =
|
||||||
createOperatorFpSet(operatorDummyOpenFn, doStreamSessionAgg, NULL, destroyStreamSessionAggOperatorInfo, NULL);
|
createOperatorFpSet(optrDummyOpenFn, doStreamSessionAgg, NULL, destroyStreamSessionAggOperatorInfo, optrDefaultBufFn, NULL);
|
||||||
|
|
||||||
if (downstream) {
|
if (downstream) {
|
||||||
initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex, &pInfo->twAggSup);
|
initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex, &pInfo->twAggSup);
|
||||||
|
@ -3705,8 +3701,8 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream
|
||||||
if (pPhyNode->type != QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) {
|
if (pPhyNode->type != QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) {
|
||||||
pInfo->pUpdateRes = createSpecialDataBlock(STREAM_CLEAR);
|
pInfo->pUpdateRes = createSpecialDataBlock(STREAM_CLEAR);
|
||||||
blockDataEnsureCapacity(pInfo->pUpdateRes, 128);
|
blockDataEnsureCapacity(pInfo->pUpdateRes, 128);
|
||||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamSessionSemiAgg, NULL,
|
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamSessionSemiAgg, NULL,
|
||||||
destroyStreamSessionAggOperatorInfo, NULL);
|
destroyStreamSessionAggOperatorInfo, optrDefaultBufFn, NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
setOperatorInfo(pOperator, name, pPhyNode->type, false, OP_NOT_OPENED, pInfo, pTaskInfo);
|
setOperatorInfo(pOperator, name, pPhyNode->type, false, OP_NOT_OPENED, pInfo, pTaskInfo);
|
||||||
|
@ -4066,7 +4062,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
|
||||||
setOperatorInfo(pOperator, "StreamStateAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE, true, OP_NOT_OPENED,
|
setOperatorInfo(pOperator, "StreamStateAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE, true, OP_NOT_OPENED,
|
||||||
pInfo, pTaskInfo);
|
pInfo, pTaskInfo);
|
||||||
pOperator->fpSet =
|
pOperator->fpSet =
|
||||||
createOperatorFpSet(operatorDummyOpenFn, doStreamStateAgg, NULL, destroyStreamStateOperatorInfo, NULL);
|
createOperatorFpSet(optrDummyOpenFn, doStreamStateAgg, NULL, destroyStreamStateOperatorInfo, optrDefaultBufFn, NULL);
|
||||||
initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex, &pInfo->twAggSup);
|
initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex, &pInfo->twAggSup);
|
||||||
code = appendDownstream(pOperator, &downstream, 1);
|
code = appendDownstream(pOperator, &downstream, 1);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
@ -4314,7 +4310,6 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream,
|
||||||
iaInfo->win = pTaskInfo->window;
|
iaInfo->win = pTaskInfo->window;
|
||||||
iaInfo->inputOrder = TSDB_ORDER_ASC;
|
iaInfo->inputOrder = TSDB_ORDER_ASC;
|
||||||
iaInfo->interval = interval;
|
iaInfo->interval = interval;
|
||||||
iaInfo->execModel = pTaskInfo->execModel;
|
|
||||||
iaInfo->primaryTsIndex = ((SColumnNode*)pNode->window.pTspk)->slotId;
|
iaInfo->primaryTsIndex = ((SColumnNode*)pNode->window.pTspk)->slotId;
|
||||||
iaInfo->binfo.mergeResultBlock = pNode->window.mergeDataBlock;
|
iaInfo->binfo.mergeResultBlock = pNode->window.mergeDataBlock;
|
||||||
|
|
||||||
|
@ -4344,7 +4339,7 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream,
|
||||||
false, OP_NOT_OPENED, miaInfo, pTaskInfo);
|
false, OP_NOT_OPENED, miaInfo, pTaskInfo);
|
||||||
|
|
||||||
pOperator->fpSet =
|
pOperator->fpSet =
|
||||||
createOperatorFpSet(operatorDummyOpenFn, mergeAlignedIntervalAgg, NULL, destroyMAIOperatorInfo, NULL);
|
createOperatorFpSet(optrDummyOpenFn, mergeAlignedIntervalAgg, NULL, destroyMAIOperatorInfo, optrDefaultBufFn, NULL);
|
||||||
|
|
||||||
code = appendDownstream(pOperator, &downstream, 1);
|
code = appendDownstream(pOperator, &downstream, 1);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
@ -4620,7 +4615,6 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMerge
|
||||||
pIntervalInfo->win = pTaskInfo->window;
|
pIntervalInfo->win = pTaskInfo->window;
|
||||||
pIntervalInfo->inputOrder = TSDB_ORDER_ASC;
|
pIntervalInfo->inputOrder = TSDB_ORDER_ASC;
|
||||||
pIntervalInfo->interval = interval;
|
pIntervalInfo->interval = interval;
|
||||||
pIntervalInfo->execModel = pTaskInfo->execModel;
|
|
||||||
pIntervalInfo->binfo.mergeResultBlock = pIntervalPhyNode->window.mergeDataBlock;
|
pIntervalInfo->binfo.mergeResultBlock = pIntervalPhyNode->window.mergeDataBlock;
|
||||||
pIntervalInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
|
pIntervalInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
|
||||||
|
|
||||||
|
@ -4650,7 +4644,7 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMerge
|
||||||
setOperatorInfo(pOperator, "TimeMergeIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL, false,
|
setOperatorInfo(pOperator, "TimeMergeIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL, false,
|
||||||
OP_NOT_OPENED, pMergeIntervalInfo, pTaskInfo);
|
OP_NOT_OPENED, pMergeIntervalInfo, pTaskInfo);
|
||||||
pOperator->fpSet =
|
pOperator->fpSet =
|
||||||
createOperatorFpSet(operatorDummyOpenFn, doMergeIntervalAgg, NULL, destroyMergeIntervalOperatorInfo, NULL);
|
createOperatorFpSet(optrDummyOpenFn, doMergeIntervalAgg, NULL, destroyMergeIntervalOperatorInfo, optrDefaultBufFn, NULL);
|
||||||
|
|
||||||
code = appendDownstream(pOperator, &downstream, 1);
|
code = appendDownstream(pOperator, &downstream, 1);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
@ -4866,8 +4860,8 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
|
||||||
|
|
||||||
setOperatorInfo(pOperator, "StreamIntervalOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, true, OP_NOT_OPENED,
|
setOperatorInfo(pOperator, "StreamIntervalOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, true, OP_NOT_OPENED,
|
||||||
pInfo, pTaskInfo);
|
pInfo, pTaskInfo);
|
||||||
pOperator->fpSet =
|
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamIntervalAgg, NULL,
|
||||||
createOperatorFpSet(operatorDummyOpenFn, doStreamIntervalAgg, NULL, destroyStreamFinalIntervalOperatorInfo, NULL);
|
destroyStreamFinalIntervalOperatorInfo, optrDefaultBufFn, NULL);
|
||||||
|
|
||||||
initIntervalDownStream(downstream, pPhyNode->type, &pInfo->aggSup, &pInfo->interval, &pInfo->twAggSup);
|
initIntervalDownStream(downstream, pPhyNode->type, &pInfo->aggSup, &pInfo->interval, &pInfo->twAggSup);
|
||||||
code = appendDownstream(pOperator, &downstream, 1);
|
code = appendDownstream(pOperator, &downstream, 1);
|
||||||
|
|
|
@ -286,21 +286,12 @@ static void i64VectorSumAVX2(const int64_t* plist, int32_t numOfRows, SAvgRes* p
|
||||||
}
|
}
|
||||||
|
|
||||||
// let sum up the final results
|
// let sum up the final results
|
||||||
if (type == TSDB_DATA_TYPE_BIGINT) {
|
|
||||||
const int64_t* q = (const int64_t*)∑
|
const int64_t* q = (const int64_t*)∑
|
||||||
pRes->sum.isum += q[0] + q[1] + q[2] + q[3];
|
pRes->sum.isum += q[0] + q[1] + q[2] + q[3];
|
||||||
|
|
||||||
for (int32_t j = 0; j < remainder; ++j) {
|
for (int32_t j = 0; j < remainder; ++j) {
|
||||||
pRes->sum.isum += plist[j + rounds * width];
|
pRes->sum.isum += plist[j + rounds * width];
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
const uint64_t* q = (const uint64_t*)∑
|
|
||||||
pRes->sum.usum += q[0] + q[1] + q[2] + q[3];
|
|
||||||
|
|
||||||
for (int32_t j = 0; j < remainder; ++j) {
|
|
||||||
pRes->sum.usum += (uint64_t)plist[j + rounds * width];
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
@ -588,14 +579,14 @@ int32_t avgFunction(SqlFunctionCtx* pCtx) {
|
||||||
const int64_t* plist = (const int64_t*) pCol->pData;
|
const int64_t* plist = (const int64_t*) pCol->pData;
|
||||||
|
|
||||||
// 1. If the CPU supports AVX, let's employ AVX instructions to speedup this loop
|
// 1. If the CPU supports AVX, let's employ AVX instructions to speedup this loop
|
||||||
if (simdAvailable) {
|
if (simdAvailable && type == TSDB_DATA_TYPE_BIGINT) {
|
||||||
i64VectorSumAVX2(plist, numOfRows, pAvgRes);
|
i64VectorSumAVX2(plist, numOfRows, pAvgRes);
|
||||||
} else {
|
} else {
|
||||||
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) {
|
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) {
|
||||||
if (type == TSDB_DATA_TYPE_BIGINT) {
|
if (type == TSDB_DATA_TYPE_BIGINT) {
|
||||||
pAvgRes->sum.isum += plist[i];
|
pAvgRes->sum.isum += plist[i];
|
||||||
} else {
|
} else {
|
||||||
pAvgRes->sum.isum += (uint64_t)plist[i];
|
pAvgRes->sum.usum += (uint64_t)plist[i];
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue