Merge pull request #11117 from taosdata/feature/3.0_liaohj

Feature/3.0 liaohj
This commit is contained in:
Haojun Liao 2022-03-30 14:53:29 +08:00 committed by GitHub
commit 1263b8b5b6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 385 additions and 473 deletions

View File

@ -133,7 +133,8 @@ static FORCE_INLINE int32_t colDataAppendInt32(SColumnInfoData* pColumnInfoData,
}
static FORCE_INLINE int32_t colDataAppendInt64(SColumnInfoData* pColumnInfoData, uint32_t currentRow, int64_t* v) {
ASSERT(pColumnInfoData->info.type == TSDB_DATA_TYPE_BIGINT || pColumnInfoData->info.type == TSDB_DATA_TYPE_UBIGINT);
int32_t type = pColumnInfoData->info.type;
ASSERT(type == TSDB_DATA_TYPE_BIGINT || type == TSDB_DATA_TYPE_UBIGINT || type == TSDB_DATA_TYPE_TIMESTAMP);
char* p = pColumnInfoData->pData + pColumnInfoData->info.bytes * currentRow;
*(int64_t*)p = *(int64_t*)v;
}
@ -175,18 +176,17 @@ size_t blockDataGetRowSize(SSDataBlock* pBlock);
double blockDataGetSerialRowSize(const SSDataBlock* pBlock);
size_t blockDataGetSerialMetaSize(const SSDataBlock* pBlock);
SSchema* blockDataExtractSchema(const SSDataBlock* pBlock, int32_t* numOfCols);
int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo);
int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirst);
int32_t blockDataEnsureColumnCapacity(SColumnInfoData* pColumn, uint32_t numOfRows);
int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows);
void blockDataCleanup(SSDataBlock* pDataBlock);
SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock);
size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize);
void* blockDataDestroy(SSDataBlock* pBlock);
SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock);
void blockDebugShowData(const SArray* dataBlocks);
#ifdef __cplusplus

View File

@ -469,8 +469,7 @@ typedef struct {
int32_t tz; // query client timezone
char intervalUnit;
char slidingUnit;
char
offsetUnit; // TODO Remove it, the offset is the number of precision tickle, and it must be a immutable duration.
char offsetUnit; // TODO Remove it, the offset is the number of precision tickle, and it must be a immutable duration.
int8_t precision;
int64_t interval;
int64_t sliding;

View File

@ -41,6 +41,7 @@ typedef void (*FExecFinalize)(struct SqlFunctionCtx *pCtx);
typedef int32_t (*FScalarExecProcess)(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
typedef struct SScalarFuncExecFuncs {
FExecGetEnv getEnv;
FScalarExecProcess process;
} SScalarFuncExecFuncs;
@ -241,7 +242,6 @@ typedef struct tExprNode {
};
} tExprNode;
void exprTreeToBinary(SBufferWriter* bw, tExprNode* pExprTree);
void tExprTreeDestroy(tExprNode *pNode, void (*fp)(void *));
typedef struct SAggFunctionInfo {
@ -267,28 +267,6 @@ struct SScalarParam {
int32_t numOfRows;
};
typedef struct SMultiFunctionsDesc {
bool stableQuery;
bool groupbyColumn;
bool agg;
bool arithmeticOnAgg;
bool projectionQuery;
bool hasFilter;
bool onlyTagQuery;
bool orderProjectQuery;
bool globalMerge;
bool multigroupResult;
bool blockDistribution;
bool stateWindow;
bool timewindow;
bool sessionWindow;
bool topbotQuery;
bool interpQuery;
bool distinct;
bool join;
bool continueQuery;
} SMultiFunctionsDesc;
int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionId, int32_t param, SResultDataInfo* pInfo, int16_t extLength,
bool isSuperTable);
@ -296,8 +274,6 @@ bool qIsValidUdf(SArray* pUdfInfo, const char* name, int32_t len, int32_t* funct
tExprNode* exprTreeFromBinary(const void* data, size_t size);
void extractFunctionDesc(SArray* pFunctionIdList, SMultiFunctionsDesc* pDesc);
tExprNode* exprdup(tExprNode* pTree);
void resetResultRowEntryResult(SqlFunctionCtx* pCtx, int32_t num);

View File

@ -58,6 +58,14 @@ int32_t ceilFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutp
int32_t floorFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
int32_t roundFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
bool getTimePseudoFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
int32_t winStartTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
int32_t winEndTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
int32_t winDurFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
int32_t qStartTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
int32_t qEndTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
#ifdef __cplusplus
}
#endif

View File

@ -331,7 +331,6 @@ int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock) {
return 0;
}
ASSERT(pColInfoData->nullbitmap == NULL);
pDataBlock->info.window.skey = *(TSKEY*)colDataGetData(pColInfoData, 0);
pDataBlock->info.window.ekey = *(TSKEY*)colDataGetData(pColInfoData, (pDataBlock->info.rows - 1));
return 0;
@ -609,22 +608,6 @@ size_t blockDataGetSerialMetaSize(const SSDataBlock* pBlock) {
return sizeof(int32_t) + pBlock->info.numOfCols * sizeof(int32_t);
}
SSchema* blockDataExtractSchema(const SSDataBlock* pBlock, int32_t* numOfCols) {
SSchema* pSchema = taosMemoryCalloc(pBlock->info.numOfCols, sizeof(SSchema));
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
pSchema[i].bytes = pColInfoData->info.bytes;
pSchema[i].type = pColInfoData->info.type;
pSchema[i].colId = pColInfoData->info.colId;
}
if (numOfCols != NULL) {
*numOfCols = pBlock->info.numOfCols;
}
return pSchema;
}
double blockDataGetSerialRowSize(const SSDataBlock* pBlock) {
ASSERT(pBlock != NULL);
double rowSize = 0;

View File

@ -161,20 +161,8 @@ typedef struct STaskCostInfo {
typedef struct SOperatorCostInfo {
uint64_t openCost;
uint64_t execCost;
// uint64_t totalRows;
// uint64_t totalBytes;
} SOperatorCostInfo;
typedef struct {
int64_t vgroupLimit;
int64_t ts;
} SOrderedPrjQueryInfo;
typedef struct {
char* tags;
SArray* pResult; // SArray<SStddevInterResult>
} SInterResult;
// The basic query information extracted from the SQueryInfo tree to support the
// execution of query in a data node.
typedef struct STaskAttr {
@ -230,7 +218,6 @@ typedef struct STaskAttr {
SColumnInfo* tagColList;
int32_t numOfFilterCols;
int64_t* fillVal;
SOrderedPrjQueryInfo prjInfo; // limit value for each vgroup, only available in global order projection query.
SSingleColumnFilterInfo* pFilterInfo;
// SFilterInfo *pFilters;
@ -245,6 +232,7 @@ struct SOperatorInfo;
typedef void (*__optr_encode_fn_t)(struct SOperatorInfo* pOperator, char **result, int32_t *length);
typedef bool (*__optr_decode_fn_t)(struct SOperatorInfo* pOperator, char *result, int32_t length);
typedef int32_t (*__optr_open_fn_t)(struct SOperatorInfo* param);
typedef SSDataBlock* (*__optr_fn_t)(struct SOperatorInfo* param, bool* newgroup);
typedef void (*__optr_close_fn_t)(void* param, int32_t num);
@ -330,11 +318,12 @@ typedef struct SOperatorInfo {
SResultInfo resultInfo;
struct SOperatorInfo** pDownstream; // downstram pointer list
int32_t numOfDownstream; // number of downstream. The value is always ONE expect for join operator
__optr_fn_t getNextFn;
__optr_fn_t cleanupFn;
__optr_close_fn_t closeFn;
__optr_open_fn_t _openFn; // DO NOT invoke this function directly
__optr_encode_fn_t encodeResultRow; //
__optr_fn_t getNextFn;
__optr_fn_t getStreamResFn; // execute the aggregate in the stream model.
__optr_fn_t cleanupFn; // call this function to release the allocated resources ASAP
__optr_close_fn_t closeFn;
__optr_encode_fn_t encodeResultRow;
__optr_decode_fn_t decodeResultRow;
} SOperatorInfo;
@ -363,18 +352,18 @@ typedef struct SQInfo {
STaskCostInfo summary;
} SQInfo;
enum {
DATA_NOT_READY = 0x1,
DATA_READY = 0x2,
DATA_EXHAUSTED = 0x3,
};
typedef enum {
EX_SOURCE_DATA_NOT_READY = 0x1,
EX_SOURCE_DATA_READY = 0x2,
EX_SOURCE_DATA_EXHAUSTED = 0x3,
} EX_SOURCE_STATUS;
typedef struct SSourceDataInfo {
struct SExchangeInfo *pEx;
int32_t index;
SRetrieveTableRsp *pRsp;
uint64_t totalRows;
int32_t status;
EX_SOURCE_STATUS status;
} SSourceDataInfo;
typedef struct SLoadRemoteDataInfo {
@ -383,12 +372,6 @@ typedef struct SLoadRemoteDataInfo {
uint64_t totalElapsed; // total elapsed time
} SLoadRemoteDataInfo;
enum {
EX_SOURCE_DATA_NOT_READY = 0x1,
EX_SOURCE_DATA_READY = 0x2,
EX_SOURCE_DATA_EXHAUSTED = 0x3,
};
typedef struct SExchangeInfo {
SArray* pSources;
SArray* pSourceDataInfo;
@ -483,17 +466,24 @@ typedef struct SAggSupporter {
int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row
} SAggSupporter;
typedef enum {
OPTR_EXEC_MODEL_BATCH = 0x1,
OPTR_EXEC_MODEL_STREAM = 0x2,
} OPTR_EXEC_MODEL;
typedef struct STableIntervalOperatorInfo {
SOptrBasicInfo binfo;
SGroupResInfo groupResInfo;
SInterval interval;
STimeWindow win;
int32_t precision;
bool timeWindowInterpo;
char **pRow;
SAggSupporter aggSup;
STableQueryInfo *pCurrent;
int32_t order;
SOptrBasicInfo binfo; // basic info
SGroupResInfo groupResInfo; // multiple results build supporter
SInterval interval; // interval info
STimeWindow win; // query time range
bool timeWindowInterpo; // interpolation needed or not
char **pRow; // previous row/tuple of already processed datablock
SAggSupporter aggSup; // aggregate supporter
STableQueryInfo *pCurrent; // current tableQueryInfo struct
int32_t order; // current SSDataBlock scan order
OPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model]
SArray *pUpdatedWindow; // updated time window due to the input data block from the downstream operator.
SColumnInfoData timeWindowData; // query time window info for scalar function execution.
} STableIntervalOperatorInfo;
typedef struct SAggOperatorInfo {
@ -695,12 +685,6 @@ SOperatorInfo* createSLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorI
SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pdownstream, int32_t numOfDownstream, SSchema* pSchema,
int32_t numOfOutput);
void doSetFilterColumnInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols, SSDataBlock* pBlock);
bool doFilterDataBlock(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols, int32_t numOfRows, int8_t* p);
void doCompactSDataBlock(SSDataBlock* pBlock, int32_t numOfRows, int8_t* p);
SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput, int32_t numOfRows);
void* doDestroyFilterInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols);
void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order);

View File

@ -1014,8 +1014,35 @@ static int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* p
return num;
}
static void doApplyFunctions(SqlFunctionCtx* pCtx, STimeWindow* pWin, int32_t offset, int32_t forwardStep, TSKEY* tsCol,
// query_range_start, query_range_end, window_duration, window_start, window_end
static void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow) {
pColData->info.type = TSDB_DATA_TYPE_TIMESTAMP;
pColData->info.bytes = sizeof(int64_t);
blockDataEnsureColumnCapacity(pColData, 5);
colDataAppendInt64(pColData, 0, &pQueryWindow->skey);
colDataAppendInt64(pColData, 1, &pQueryWindow->ekey);
int64_t interval = 0;
colDataAppendInt64(pColData, 2, &interval); // this value may be variable in case of 'n' and 'y'.
colDataAppendInt64(pColData, 3, &pQueryWindow->skey);
colDataAppendInt64(pColData, 4, &pQueryWindow->ekey);
}
static void updateTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pWin) {
int64_t* ts = (int64_t*)pColData->pData;
int64_t duration = pWin->ekey - pWin->skey + 1;
ts[2] = duration; // set the duration
ts[3] = pWin->skey; // window start key
ts[4] = pWin->ekey + 1; // window end key
}
static void doApplyFunctions(SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pTimeWindowData, int32_t offset, int32_t forwardStep, TSKEY* tsCol,
int32_t numOfTotal, int32_t numOfOutput, int32_t order) {
SScalarParam intervalParam = {.numOfRows = 5, .columnData = pTimeWindowData}; //TODO move out of this function
updateTimeWindowInfo(pTimeWindowData, pWin);
for (int32_t k = 0; k < numOfOutput; ++k) {
pCtx[k].startTs = pWin->skey;
@ -1038,6 +1065,21 @@ static void doApplyFunctions(SqlFunctionCtx* pCtx, STimeWindow* pWin, int32_t of
pCtx[k].isAggSet = false;
}
if (fmIsWindowPseudoColumnFunc(pCtx[k].functionId)) {
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(&pCtx[k]);
char* p = GET_ROWCELL_INTERBUF(pEntryInfo);
SScalarParam out = {.columnData = NULL};
out.columnData = taosMemoryCalloc(1, sizeof(SColumnInfoData));
out.columnData->info.type = TSDB_DATA_TYPE_BIGINT;
out.columnData->info.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes;
out.columnData->pData = p;
pCtx[k].sfp.process(&intervalParam, 1, &out);
pEntryInfo->numOfRes = 1;
pEntryInfo->hasResult = ',';
continue;
}
if (functionNeedToExecute(&pCtx[k])) {
pCtx[k].fpSet.process(&pCtx[k]);
}
@ -1054,7 +1096,7 @@ static int32_t getNextQualifiedWindow(SInterval* pInterval, STimeWindow* pNext,
int32_t order = pInfo->order;
bool ascQuery = (order == TSDB_ORDER_ASC);
int32_t precision = pInfo->precision;
int32_t precision = pInterval->precision;
getNextTimeWindow(pInterval, precision, order, pNext);
// next time window is not in current block
@ -1489,15 +1531,19 @@ static void doWindowBorderInterpolation(SOperatorInfo* pOperatorInfo, SSDataBloc
}
}
static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pSDataBlock,
int32_t tableGroupId) {
static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pSDataBlock, int32_t tableGroupId) {
STableIntervalOperatorInfo* pInfo = (STableIntervalOperatorInfo*)pOperatorInfo->info;
SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
int32_t numOfOutput = pOperatorInfo->numOfOutput;
SArray* pUpdated = NULL;
if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM) {
pUpdated = taosArrayInit(4, sizeof(SResultRowPosition));
}
int32_t step = 1;
bool ascQuery = true;
bool ascScan = true;
int32_t prevIndex = pResultRowInfo->curPos;
@ -1509,10 +1555,10 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
tsCols[pSDataBlock->info.rows - 1] == pSDataBlock->info.window.ekey);
}
int32_t startPos = ascQuery ? 0 : (pSDataBlock->info.rows - 1);
TSKEY ts = getStartTsKey(&pSDataBlock->info.window, tsCols, pSDataBlock->info.rows, ascQuery);
int32_t startPos = ascScan? 0 : (pSDataBlock->info.rows - 1);
TSKEY ts = getStartTsKey(&pSDataBlock->info.window, tsCols, pSDataBlock->info.rows, ascScan);
STimeWindow win = getActiveTimeWindow(pResultRowInfo, ts, &pInfo->interval, pInfo->precision, &pInfo->win);
STimeWindow win = getActiveTimeWindow(pResultRowInfo, ts, &pInfo->interval, pInfo->interval.precision, &pInfo->win);
bool masterScan = true;
SResultRow* pResult = NULL;
@ -1523,6 +1569,11 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM) {
SResultRowPosition pos = {.pageId = pResult->pageId, .offset = pResult->offset};
taosArrayPush(pUpdated, &pos);
}
int32_t forwardStep = 0;
TSKEY ekey = win.ekey;
forwardStep =
@ -1534,8 +1585,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
for (int32_t j = prevIndex; j < curIndex; ++j) { // previous time window may be all closed already.
SResultRow* pRes = getResultRow(pResultRowInfo, j);
if (pRes->closed) {
assert(resultRowInterpolated(pRes, RESULT_ROW_START_INTERP) &&
resultRowInterpolated(pRes, RESULT_ROW_END_INTERP));
assert(resultRowInterpolated(pRes, RESULT_ROW_START_INTERP) && resultRowInterpolated(pRes, RESULT_ROW_END_INTERP));
continue;
}
@ -1548,14 +1598,13 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
}
assert(!resultRowInterpolated(pResult, RESULT_ROW_END_INTERP));
doTimeWindowInterpolation(pOperatorInfo, &pInfo->binfo, pSDataBlock->pDataBlock, *(TSKEY*)pInfo->pRow[0], -1,
tsCols[startPos], startPos, w.ekey, RESULT_ROW_END_INTERP);
setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
setNotInterpoWindowKey(pInfo->binfo.pCtx, pOperatorInfo->numOfOutput, RESULT_ROW_START_INTERP);
doApplyFunctions(pInfo->binfo.pCtx, &w, startPos, 0, tsCols, pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
doApplyFunctions(pInfo->binfo.pCtx, &w, &pInfo->timeWindowData, startPos, 0, tsCols, pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
}
// restore current time window
@ -1570,8 +1619,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
// window start key interpolation
doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->binfo.pCtx, pResult, &win, startPos, forwardStep,
pInfo->order, false);
doApplyFunctions(pInfo->binfo.pCtx, &win, startPos, forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput,
TSDB_ORDER_ASC);
doApplyFunctions(pInfo->binfo.pCtx, &win, &pInfo->timeWindowData, startPos, forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
STimeWindow nextWin = win;
while (1) {
@ -1589,6 +1637,11 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM) {
SResultRowPosition pos = {.pageId = pResult->pageId, .offset = pResult->offset};
taosArrayPush(pUpdated, &pos);
}
ekey = nextWin.ekey; // reviseWindowEkey(pQueryAttr, &nextWin);
forwardStep =
getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC);
@ -1596,15 +1649,15 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
// window start(end) key interpolation
doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->binfo.pCtx, pResult, &nextWin, startPos, forwardStep,
pInfo->order, false);
doApplyFunctions(pInfo->binfo.pCtx, &nextWin, startPos, forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput,
TSDB_ORDER_ASC);
doApplyFunctions(pInfo->binfo.pCtx, &nextWin, &pInfo->timeWindowData, startPos, forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
}
if (pInfo->timeWindowInterpo) {
int32_t rowIndex = ascQuery ? (pSDataBlock->info.rows - 1) : 0;
int32_t rowIndex = ascScan ? (pSDataBlock->info.rows - 1) : 0;
saveDataBlockLastRow(pInfo->pRow, pSDataBlock->pDataBlock, rowIndex, pSDataBlock->info.numOfCols);
}
return pUpdated;
// updateResultRowInfoActiveIndex(pResultRowInfo, &pInfo->win, pRuntimeEnv->current->lastKey, true, false);
}
@ -1841,7 +1894,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
}
int32_t rowIndex = j - num;
doApplyFunctions(pCtx, &w, rowIndex, num, NULL, pBlock->info.rows, pOperator->numOfOutput, TSDB_ORDER_ASC);
doApplyFunctions(pCtx, &w, NULL, rowIndex, num, NULL, pBlock->info.rows, pOperator->numOfOutput, TSDB_ORDER_ASC);
// assign the group keys or user input constant values if required
doAssignGroupKeys(pCtx, pOperator->numOfOutput, pBlock->info.rows, rowIndex);
@ -1859,7 +1912,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
}
int32_t rowIndex = pBlock->info.rows - num;
doApplyFunctions(pCtx, &w, rowIndex, num, NULL, pBlock->info.rows, pOperator->numOfOutput, TSDB_ORDER_ASC);
doApplyFunctions(pCtx, &w, NULL, rowIndex, num, NULL, pBlock->info.rows, pOperator->numOfOutput, TSDB_ORDER_ASC);
doAssignGroupKeys(pCtx, pOperator->numOfOutput, pBlock->info.rows, rowIndex);
}
}
@ -1910,8 +1963,7 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator
}
// pInfo->numOfRows data belong to the current session window
doApplyFunctions(pInfo->binfo.pCtx, &window, pInfo->start, pInfo->numOfRows, NULL, pBlock->info.rows, numOfOutput,
TSDB_ORDER_ASC);
doApplyFunctions(pInfo->binfo.pCtx, &window, NULL, pInfo->start, pInfo->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
pInfo->curWindow.skey = tsList[j];
pInfo->curWindow.ekey = tsList[j];
@ -1931,8 +1983,7 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator
longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR);
}
doApplyFunctions(pInfo->binfo.pCtx, &window, pInfo->start, pInfo->numOfRows, NULL, pBlock->info.rows, numOfOutput,
TSDB_ORDER_ASC);
doApplyFunctions(pInfo->binfo.pCtx, &window, NULL, pInfo->start, pInfo->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
}
static void setResultRowKey(SResultRow* pResultRow, char* pData, int16_t type) {
@ -1999,11 +2050,7 @@ static bool functionNeedToExecute(SqlFunctionCtx* pCtx) {
return false;
}
if (functionId == FUNCTION_TS) {
return true;
}
if (isRowEntryCompleted(pResInfo) || functionId == FUNCTION_TAG_DUMMY || functionId == FUNCTION_TS_DUMMY) {
if (isRowEntryCompleted(pResInfo)) {
return false;
}
@ -2118,6 +2165,9 @@ static SqlFunctionCtx* createSqlFunctionCtx_rv(SExprInfo* pExprInfo, int32_t num
pCtx->fpSet.getEnv(pExpr->pExpr->_function.pFunctNode, &env);
} else {
fmGetScalarFuncExecFuncs(pCtx->functionId, &pCtx->sfp);
if (pCtx->sfp.getEnv != NULL) {
pCtx->sfp.getEnv(pExpr->pExpr->_function.pFunctNode, &env);
}
}
pCtx->resDataInfo.interBufSize = env.calcMemSize;
} else if (pExpr->pExpr->nodeType == QUERY_NODE_COLUMN) {
@ -3590,6 +3640,42 @@ void finalizeMultiTupleQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SD
}
}
void finalizeUpdatedResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SDiskbasedBuf* pBuf, SArray* pUpdateList,
int32_t* rowCellInfoOffset) {
size_t num = taosArrayGetSize(pUpdateList);
for (int32_t i = 0; i < num; ++i) {
SResultRowPosition* pPos = taosArrayGet(pUpdateList, i);
SFilePage* bufPage = getBufPage(pBuf, pPos->pageId);
SResultRow* pRow = (SResultRow*)((char*)bufPage + pPos->offset);
for (int32_t j = 0; j < numOfOutput; ++j) {
pCtx[j].resultInfo = getResultCell(pRow, j, rowCellInfoOffset);
struct SResultRowEntryInfo* pResInfo = pCtx[j].resultInfo;
if (isRowEntryCompleted(pResInfo) && isRowEntryInitialized(pResInfo)) {
continue;
}
if (pCtx[j].fpSet.process) { // TODO set the dummy function.
pCtx[j].fpSet.finalize(&pCtx[j]);
}
if (pRow->numOfRows < pResInfo->numOfRes) {
pRow->numOfRows = pResInfo->numOfRes;
}
}
releaseBufPage(pBuf, bufPage);
/*
* set the number of output results for group by normal columns, the number of output rows usually is 1 except
* the top and bottom query
*/
// buf->numOfRows = (uint16_t)getNumOfResult(pCtx, numOfOutput);
}
}
static bool hasMainOutput(STaskAttr* pQueryAttr) {
for (int32_t i = 0; i < pQueryAttr->numOfOutput; ++i) {
int32_t functionId = getExprFunctionId(&pQueryAttr->pExpr1[i]);
@ -3680,7 +3766,6 @@ void setResultRowOutputBufInitCtx(STaskRuntimeEnv* pRuntimeEnv, SResultRow* pRes
void setResultRowOutputBufInitCtx_rv(SDiskbasedBuf* pBuf, SResultRow* pResult, SqlFunctionCtx* pCtx,
int32_t numOfOutput, int32_t* rowCellInfoOffset) {
// Note: pResult->pos[i]->num == 0, there is only fixed number of results for each group
for (int32_t i = 0; i < numOfOutput; ++i) {
pCtx[i].resultInfo = getResultCell(pResult, i, rowCellInfoOffset);
@ -3688,6 +3773,11 @@ void setResultRowOutputBufInitCtx_rv(SDiskbasedBuf* pBuf, SResultRow* pResult, S
if (isRowEntryCompleted(pResInfo) && isRowEntryInitialized(pResInfo)) {
continue;
}
if (fmIsWindowPseudoColumnFunc(pCtx[i].functionId)) {
continue;
}
// int32_t functionId = pCtx[i].functionId;
// if (functionId < 0) {
// continue;
@ -4032,8 +4122,7 @@ static void toSDatablock(SGroupResInfo* pGroupResInfo, SDiskbasedBuf* pBuf, SSDa
return;
}
int32_t orderType =
TSDB_ORDER_ASC; //(pQueryAttr->pGroupbyExpr != NULL) ? pQueryAttr->pGroupbyExpr->orderType : TSDB_ORDER_ASC;
int32_t orderType = TSDB_ORDER_ASC;
doCopyToSDataBlock(pBuf, pGroupResInfo, orderType, pBlock, rowCapacity, rowCellOffset);
// add condition (pBlock->info.rows >= 1) just to runtime happy
@ -5074,12 +5163,12 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx
for (int32_t i = 0; i < totalSources; ++i) {
SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, i);
if (pDataInfo->status == DATA_EXHAUSTED) {
if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
completed += 1;
continue;
}
if (pDataInfo->status != DATA_READY) {
if (pDataInfo->status != EX_SOURCE_DATA_READY) {
continue;
}
@ -5093,7 +5182,7 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx
" try next",
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, i + 1, pDataInfo->totalRows,
pExchangeInfo->loadInfo.totalRows);
pDataInfo->status = DATA_EXHAUSTED;
pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
completed += 1;
continue;
}
@ -5111,16 +5200,15 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx
", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu,
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows, pDataInfo->totalRows,
pLoadInfo->totalRows, pLoadInfo->totalSize, i + 1, totalSources);
pDataInfo->status = DATA_EXHAUSTED;
pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
} else {
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, totalRows:%" PRIu64
", totalBytes:%" PRIu64,
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, totalRows:%" PRIu64 ", totalBytes:%" PRIu64,
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows, pLoadInfo->totalRows,
pLoadInfo->totalSize);
}
if (pDataInfo->status != DATA_EXHAUSTED) {
pDataInfo->status = DATA_NOT_READY;
if (pDataInfo->status != EX_SOURCE_DATA_EXHAUSTED) {
pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
@ -5223,7 +5311,7 @@ static SSDataBlock* seqLoadRemoteData(SOperatorInfo* pOperator) {
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pExchangeInfo->current + 1,
pDataInfo->totalRows, pLoadInfo->totalRows);
pDataInfo->status = DATA_EXHAUSTED;
pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
pExchangeInfo->current += 1;
continue;
}
@ -5240,7 +5328,7 @@ static SSDataBlock* seqLoadRemoteData(SOperatorInfo* pOperator) {
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows, pDataInfo->totalRows,
pLoadInfo->totalRows, pLoadInfo->totalSize, pExchangeInfo->current + 1, totalSources);
pDataInfo->status = DATA_EXHAUSTED;
pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
pExchangeInfo->current += 1;
} else {
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, totalRows:%" PRIu64
@ -6798,37 +6886,6 @@ static SSDataBlock* doLimit(SOperatorInfo* pOperator, bool* newgroup) {
return pBlock;
}
static SSDataBlock* doFilter(void* param, bool* newgroup) {
SOperatorInfo* pOperator = (SOperatorInfo*)param;
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
SFilterOperatorInfo* pCondInfo = pOperator->info;
STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
while (1) {
publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = pOperator->pDownstream[0]->getNextFn(pOperator->pDownstream[0], newgroup);
publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
break;
}
doSetFilterColumnInfo(pCondInfo->pFilterInfo, pCondInfo->numOfFilterCols, pBlock);
assert(pRuntimeEnv->pTsBuf == NULL);
filterRowsInDataBlock(pRuntimeEnv, pCondInfo->pFilterInfo, pCondInfo->numOfFilterCols, pBlock, true);
if (pBlock->info.rows > 0) {
return pBlock;
}
}
doSetOperatorCompleted(pOperator);
return NULL;
}
static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
if (OPTR_IS_OPENED(pOperator)) {
return TSDB_CODE_SUCCESS;
@ -6837,7 +6894,7 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
STableIntervalOperatorInfo* pInfo = pOperator->info;
int32_t order = TSDB_ORDER_ASC;
// STimeWindow win = pQueryAttr->window;
// STimeWindow win = {0};
bool newgroup = false;
SOperatorInfo* downstream = pOperator->pDownstream[0];
@ -6851,7 +6908,6 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
}
// setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfOutput);
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order);
hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, 0);
@ -6890,6 +6946,59 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator, bool* newgro
return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes;
}
static SSDataBlock* doStreamIntervalAgg(SOperatorInfo *pOperator) {
STableIntervalOperatorInfo* pInfo = pOperator->info;
int32_t order = TSDB_ORDER_ASC;
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
if (pOperator->status == OP_RES_TO_RETURN) {
toSDatablock(&pInfo->groupResInfo, pInfo->aggSup.pResultBuf, pInfo->binfo.pRes, pInfo->binfo.capacity,
pInfo->binfo.rowCellInfoOffset);
if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
pOperator->status = OP_EXEC_DONE;
}
return pInfo->binfo.pRes;
}
// STimeWindow win = {0};
bool newgroup = false;
SOperatorInfo* downstream = pOperator->pDownstream[0];
SArray* pUpdated = NULL;
while (1) {
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = downstream->getNextFn(downstream, &newgroup);
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
break;
}
// The timewindows that overlaps the timestamps of the input pBlock need to be recalculated and return to the caller.
// Note that all the time window are not close till now.
// setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfOutput);
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order);
pUpdated = hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, 0);
}
finalizeUpdatedResult(pInfo->binfo.pCtx, pOperator->numOfOutput, pInfo->aggSup.pResultBuf, pUpdated, pInfo->binfo.rowCellInfoOffset);
blockDataEnsureCapacity(pInfo->binfo.pRes, pInfo->binfo.capacity);
toSDatablock(&pInfo->groupResInfo, pInfo->aggSup.pResultBuf, pInfo->binfo.pRes, pInfo->binfo.capacity,
pInfo->binfo.rowCellInfoOffset);
ASSERT(pInfo->binfo.pRes->info.rows > 0);
pOperator->status = OP_RES_TO_RETURN;
return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes;
}
static SSDataBlock* doAllIntervalAgg(SOperatorInfo *pOperator, bool* newgroup) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
@ -7514,8 +7623,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo*
//(int32_t)(getRowNumForMultioutput(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery));
int32_t numOfRows = 1;
int32_t code =
initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, numOfRows, pResultBlock, pTaskInfo->id.str);
int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, numOfRows, pResultBlock, pTaskInfo->id.str);
pInfo->pTableQueryInfo = initTableQueryInfo(pTableGroupInfo);
if (code != TSDB_CODE_SUCCESS || pInfo->pTableQueryInfo == NULL) {
goto _error;
@ -7735,39 +7843,6 @@ _error:
return NULL;
}
SColumnInfo* extractColumnFilterInfo(SExprInfo* pExpr, int32_t numOfOutput, int32_t* numOfFilterCols) {
#if 0
SColumnInfo* pCols = taosMemoryCalloc(numOfOutput, sizeof(SColumnInfo));
int32_t numOfFilter = 0;
for(int32_t i = 0; i < numOfOutput; ++i) {
if (pExpr[i].base.flist.numOfFilters > 0) {
numOfFilter += 1;
}
pCols[i].type = pExpr[i].base.resSchema.type;
pCols[i].bytes = pExpr[i].base.resSchema.bytes;
pCols[i].colId = pExpr[i].base.resSchema.colId;
pCols[i].flist.numOfFilters = pExpr[i].base.flist.numOfFilters;
if (pCols[i].flist.numOfFilters != 0) {
pCols[i].flist.filterInfo = taosMemoryCalloc(pCols[i].flist.numOfFilters, sizeof(SColumnFilterInfo));
memcpy(pCols[i].flist.filterInfo, pExpr[i].base.flist.filterInfo, pCols[i].flist.numOfFilters * sizeof(SColumnFilterInfo));
} else {
// avoid runtime error
pCols[i].flist.filterInfo = NULL;
}
}
assert(numOfFilter > 0);
*numOfFilterCols = numOfFilter;
return pCols;
#endif
return 0;
}
SOperatorInfo* createLimitOperatorInfo(SOperatorInfo* downstream, SLimit* pLimit, SExecTaskInfo* pTaskInfo) {
SLimitOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SLimitOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
@ -7786,9 +7861,10 @@ SOperatorInfo* createLimitOperatorInfo(SOperatorInfo* downstream, SLimit* pLimit
pOperator->getNextFn = doLimit;
pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo;
int32_t code = appendDownstream(pOperator, &downstream, 1);
int32_t code = appendDownstream(pOperator, &downstream, 1);
return pOperator;
_error:
taosMemoryFreeClear(pInfo);
taosMemoryFreeClear(pOperator);
@ -7806,16 +7882,16 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
}
pInfo->order = TSDB_ORDER_ASC;
pInfo->precision = TSDB_TIME_PRECISION_MILLI;
pInfo->win = pTaskInfo->window;
pInfo->interval = *pInterval;
pInfo->win.skey = INT64_MIN;
pInfo->execModel = OPTR_EXEC_MODEL_BATCH;
pInfo->win.skey = 0;
pInfo->win.ekey = INT64_MAX;
int32_t numOfRows = 4096;
int32_t code =
initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, numOfRows, pResBlock, pTaskInfo->id.str);
int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, numOfRows, pResBlock, pTaskInfo->id.str);
initExecTimeWindowInfo(&pInfo->timeWindowData, &pInfo->win);
// pInfo->pTableQueryInfo = initTableQueryInfo(pTableGroupInfo);
if (code != TSDB_CODE_SUCCESS /* || pInfo->pTableQueryInfo == NULL*/) {
goto _error;
@ -8555,6 +8631,23 @@ static SResSchema createResSchema(int32_t type, int32_t bytes, int32_t slotId, i
return s;
}
static SColumn* createColumn(int32_t blockId, int32_t slotId, SDataType* pType) {
SColumn* pCol = taosMemoryCalloc(1, sizeof(SColumn));
if (pCol == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
pCol->slotId = slotId;
pCol->bytes = pType->bytes;
pCol->type = pType->type;
pCol->scale = pType->scale;
pCol->precision = pType->precision;
pCol->dataBlockId = blockId;
return pCol;
}
SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* numOfExprs) {
int32_t numOfFuncs = LIST_LENGTH(pNodeList);
int32_t numOfGroupKeys = 0;
@ -8586,18 +8679,11 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t*
pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam));
pExp->base.numOfParams = 1;
pExp->base.pParam[0].pCol = taosMemoryCalloc(1, sizeof(SColumn));
pExp->base.pParam[0].type = FUNC_PARAM_TYPE_COLUMN;
SDataType* pType = &pColNode->node.resType;
pExp->base.resSchema = createResSchema(pType->type, pType->bytes, pTargetNode->slotId, pType->scale, pType->precision, pColNode->colName);
SColumn* pCol = pExp->base.pParam[0].pCol;
pCol->slotId = pColNode->slotId; // TODO refactor
pCol->bytes = pType->bytes;
pCol->type = pType->type;
pCol->scale = pType->scale;
pCol->precision = pType->precision;
pExp->base.pParam[0].pCol = createColumn(pColNode->dataBlockId, pColNode->slotId, pType);
pExp->base.pParam[0].type = FUNC_PARAM_TYPE_COLUMN;
} else if (nodeType(pTargetNode->pExpr) == QUERY_NODE_FUNCTION) {
pExp->pExpr->nodeType = QUERY_NODE_FUNCTION;
SFunctionNode* pFuncNode = (SFunctionNode*)pTargetNode->pExpr;
@ -8608,8 +8694,7 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t*
pExp->pExpr->_function.functionId = pFuncNode->funcId;
pExp->pExpr->_function.pFunctNode = pFuncNode;
strncpy(pExp->pExpr->_function.functionName, pFuncNode->functionName,
tListLen(pExp->pExpr->_function.functionName));
strncpy(pExp->pExpr->_function.functionName, pFuncNode->functionName, tListLen(pExp->pExpr->_function.functionName));
// TODO: value parameter needs to be handled
int32_t numOfParam = LIST_LENGTH(pFuncNode->pParameterList);
@ -8620,21 +8705,12 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t*
for (int32_t j = 0; j < numOfParam; ++j) {
SNode* p1 = nodesListGetNode(pFuncNode->pParameterList, j);
if (p1->type == QUERY_NODE_COLUMN) {
SColumnNode* pcn = (SColumnNode*)p1; // TODO refactor
SColumnNode* pcn = (SColumnNode*) p1;
pExp->base.pParam[j].type = FUNC_PARAM_TYPE_COLUMN;
pExp->base.pParam[j].pCol = taosMemoryCalloc(1, sizeof(SColumn));
SColumn* pCol = pExp->base.pParam[j].pCol;
pCol->slotId = pcn->slotId;
pCol->bytes = pcn->node.resType.bytes;
pCol->type = pcn->node.resType.type;
pCol->scale = pcn->node.resType.scale;
pCol->precision = pcn->node.resType.precision;
pCol->dataBlockId = pcn->dataBlockId;
pExp->base.pParam[j].pCol = createColumn(pcn->dataBlockId, pcn->slotId, &pcn->node.resType);
} else if (p1->type == QUERY_NODE_VALUE) {
SValueNode* pvn = (SValueNode*)p1;
pExp->base.pParam[j].type = FUNC_PARAM_TYPE_VALUE;
}
}
@ -8644,21 +8720,14 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t*
pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam));
pExp->base.numOfParams = 1;
pExp->base.pParam[0].pCol = taosMemoryCalloc(1, sizeof(SColumn));
pExp->base.pParam[0].type = FUNC_PARAM_TYPE_COLUMN;
SDataType* pType = &pNode->node.resType;
pExp->base.resSchema = createResSchema(pType->type, pType->bytes, pTargetNode->slotId, pType->scale,
pType->precision, pNode->node.aliasName);
pExp->base.resSchema = createResSchema(pType->type, pType->bytes, pTargetNode->slotId, pType->scale, pType->precision, pNode->node.aliasName);
pExp->pExpr->_optrRoot.pRootNode = pTargetNode->pExpr;
SColumn* pCol = pExp->base.pParam[0].pCol;
pCol->slotId = pTargetNode->slotId; // TODO refactor
pCol->bytes = pType->bytes;
pCol->type = pType->type;
pCol->scale = pType->scale;
pCol->precision = pType->precision;
pExp->base.pParam[0].type = FUNC_PARAM_TYPE_COLUMN;
pExp->base.pParam[0].pCol = createColumn(pTargetNode->dataBlockId, pTargetNode->slotId, pType);
} else {
ASSERT(0);
}
@ -8692,17 +8761,15 @@ static SArray* extractColumnInfo(SNodeList* pNodeList);
static SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols);
static SArray* createSortInfo(SNodeList* pNodeList);
SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle,
SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle,
uint64_t queryId, uint64_t taskId, STableGroupInfo* pTableGroupInfo) {
if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) {
if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == nodeType(pPhyNode)) {
SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode;
int32_t numOfCols = 0;
tsdbReaderT pDataReader =
doCreateDataReader((STableScanPhysiNode*)pPhyNode, pHandle, pTableGroupInfo, (uint64_t)queryId, taskId);
SArray* pColList =
extractColMatchInfo(pScanPhyNode->pScanCols, pScanPhyNode->node.pOutputDataBlockDesc, &numOfCols);
tsdbReaderT pDataReader = doCreateDataReader((STableScanPhysiNode*)pPhyNode, pHandle, pTableGroupInfo, (uint64_t)queryId, taskId);
SArray* pColList = extractColMatchInfo(pScanPhyNode->pScanCols, pScanPhyNode->node.pOutputDataBlockDesc, &numOfCols);
return createTableScanOperatorInfo(pDataReader, pScanPhyNode->order, numOfCols, pScanPhyNode->count,
pScanPhyNode->reverse, pColList, pTaskInfo);
@ -8745,7 +8812,7 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa
assert(size == 1);
SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0);
SOperatorInfo* op = doCreateOperatorTreeNode(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo);
SOperatorInfo* op = createOperatorTree(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo);
int32_t num = 0;
SExprInfo* pExprInfo = createExprInfo(((SProjectPhysiNode*)pPhyNode)->pProjections, NULL, &num);
@ -8757,7 +8824,7 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa
for (int32_t i = 0; i < size; ++i) {
SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i);
SOperatorInfo* op = doCreateOperatorTreeNode(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo);
SOperatorInfo* op = createOperatorTree(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo);
int32_t num = 0;
@ -8778,7 +8845,7 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa
for (int32_t i = 0; i < size; ++i) {
SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i);
SOperatorInfo* op = doCreateOperatorTreeNode(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo);
SOperatorInfo* op = createOperatorTree(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo);
SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode;
@ -8791,7 +8858,8 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa
.sliding = pIntervalPhyNode->sliding,
.intervalUnit = pIntervalPhyNode->intervalUnit,
.slidingUnit = pIntervalPhyNode->slidingUnit,
.offset = pIntervalPhyNode->offset};
.offset = pIntervalPhyNode->offset,
.precision = TSDB_TIME_PRECISION_MILLI};
return createIntervalOperatorInfo(op, pExprInfo, num, pResBlock, &interval, pTableGroupInfo, pTaskInfo);
}
} else if (QUERY_NODE_PHYSICAL_PLAN_SORT == nodeType(pPhyNode)) {
@ -8799,7 +8867,7 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa
assert(size == 1);
SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0);
SOperatorInfo* op = doCreateOperatorTreeNode(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo);
SOperatorInfo* op = createOperatorTree(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo);
SSortPhysiNode* pSortPhyNode = (SSortPhysiNode*)pPhyNode;
@ -8811,7 +8879,7 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa
assert(size == 1);
SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0);
SOperatorInfo* op = doCreateOperatorTreeNode(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo);
SOperatorInfo* op = createOperatorTree(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo);
SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode;
@ -8827,7 +8895,7 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa
for (int32_t i = 0; i < size; ++i) {
SPhysiNode* pChildNode = taosArrayGetP(pPhyNode->pChildren, i);
SOperatorInfo* op = doCreateOperatorTreeNode(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo);
SOperatorInfo* op = createOperatorTree(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo);
return createMultiTableAggOperatorInfo(op, pPhyNode->pTargets, pTaskInfo, pTableGroupInfo);
}
}*/
@ -8972,11 +9040,11 @@ SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNod
for (int32_t i = 0; i < num; ++i) {
SSlotDescNode* pNode = (SSlotDescNode*)nodesListGetNode(pOutputNodeList->pSlots, i);
SColMatchInfo* info = taosArrayGet(pList, pNode->slotId);
// if (pNode->output) {
if (pNode->output) {
(*numOfOutputCols) += 1;
// } else {
// info->output = false;
// }
} else {
info->output = false;
}
}
return pList;
@ -9045,7 +9113,7 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead
}
STableGroupInfo group = {0};
(*pTaskInfo)->pRoot = doCreateOperatorTreeNode(pPlan->pNode, *pTaskInfo, pHandle, queryId, taskId, &group);
(*pTaskInfo)->pRoot = createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, queryId, taskId, &group);
if (NULL == (*pTaskInfo)->pRoot) {
code = terrno;
goto _complete;

View File

@ -315,31 +315,31 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{
.name = "_qstartts",
.type = FUNCTION_TYPE_QSTARTTS,
.classification = FUNC_MGT_PSEUDO_COLUMN_FUNC,
.classification = FUNC_MGT_PSEUDO_COLUMN_FUNC | FUNC_MGT_WINDOW_PC_FUNC,
.checkFunc = stubCheckAndGetResultType,
.getEnvFunc = NULL,
.getEnvFunc = getTimePseudoFuncEnv,
.initFunc = NULL,
.sprocessFunc = NULL,
.sprocessFunc = qStartTsFunction,
.finalizeFunc = NULL
},
{
.name = "_qendts",
.type = FUNCTION_TYPE_QENDTS,
.classification = FUNC_MGT_PSEUDO_COLUMN_FUNC,
.classification = FUNC_MGT_PSEUDO_COLUMN_FUNC | FUNC_MGT_WINDOW_PC_FUNC,
.checkFunc = stubCheckAndGetResultType,
.getEnvFunc = NULL,
.getEnvFunc = getTimePseudoFuncEnv,
.initFunc = NULL,
.sprocessFunc = NULL,
.sprocessFunc = qEndTsFunction,
.finalizeFunc = NULL
},
{
.name = "_wstartts",
.type = FUNCTION_TYPE_QSTARTTS,
.type = FUNCTION_TYPE_WSTARTTS,
.classification = FUNC_MGT_PSEUDO_COLUMN_FUNC | FUNC_MGT_WINDOW_PC_FUNC,
.checkFunc = stubCheckAndGetResultType,
.getEnvFunc = NULL,
.getEnvFunc = getTimePseudoFuncEnv,
.initFunc = NULL,
.sprocessFunc = NULL,
.sprocessFunc = winStartTsFunction,
.finalizeFunc = NULL
},
{
@ -347,9 +347,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.type = FUNCTION_TYPE_QENDTS,
.classification = FUNC_MGT_PSEUDO_COLUMN_FUNC | FUNC_MGT_WINDOW_PC_FUNC,
.checkFunc = stubCheckAndGetResultType,
.getEnvFunc = NULL,
.getEnvFunc = getTimePseudoFuncEnv,
.initFunc = NULL,
.sprocessFunc = NULL,
.sprocessFunc = winEndTsFunction,
.finalizeFunc = NULL
},
{
@ -357,9 +357,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.type = FUNCTION_TYPE_WDURATION,
.classification = FUNC_MGT_PSEUDO_COLUMN_FUNC | FUNC_MGT_WINDOW_PC_FUNC,
.checkFunc = stubCheckAndGetResultType,
.getEnvFunc = NULL,
.getEnvFunc = getTimePseudoFuncEnv,
.initFunc = NULL,
.sprocessFunc = NULL,
.sprocessFunc = winDurFunction,
.finalizeFunc = NULL
}
};
@ -368,6 +368,7 @@ const int32_t funcMgtBuiltinsNum = (sizeof(funcMgtBuiltins) / sizeof(SBuiltinFun
int32_t stubCheckAndGetResultType(SFunctionNode* pFunc) {
switch(pFunc->funcType) {
case FUNCTION_TYPE_WDURATION:
case FUNCTION_TYPE_COUNT:
pFunc->node.resType = (SDataType){.bytes = sizeof(int64_t), .type = TSDB_DATA_TYPE_BIGINT};
break;
@ -400,14 +401,18 @@ int32_t stubCheckAndGetResultType(SFunctionNode* pFunc) {
}
case FUNCTION_TYPE_CONCAT:
case FUNCTION_TYPE_ROWTS:
case FUNCTION_TYPE_TBNAME:
case FUNCTION_TYPE_QSTARTTS:
case FUNCTION_TYPE_QENDTS:
case FUNCTION_TYPE_WSTARTTS:
case FUNCTION_TYPE_WENDTS:
case FUNCTION_TYPE_WDURATION:
case FUNCTION_TYPE_TBNAME: {
// todo
break;
}
case FUNCTION_TYPE_QENDTS:
case FUNCTION_TYPE_QSTARTTS:
case FUNCTION_TYPE_WENDTS:
case FUNCTION_TYPE_WSTARTTS: {
pFunc->node.resType = (SDataType){.bytes = sizeof(int64_t), .type = TSDB_DATA_TYPE_TIMESTAMP};
break;
}
case FUNCTION_TYPE_ABS:
case FUNCTION_TYPE_CEIL:

View File

@ -92,6 +92,7 @@ int32_t fmGetScalarFuncExecFuncs(int32_t funcId, SScalarFuncExecFuncs* pFpSet) {
return TSDB_CODE_FAILED;
}
pFpSet->process = funcMgtBuiltins[funcId].sprocessFunc;
pFpSet->getEnv = funcMgtBuiltins[funcId].getEnvFunc;
return TSDB_CODE_SUCCESS;
}

View File

@ -116,42 +116,6 @@ bool exprTreeApplyFilter(tExprNode *pExpr, const void *pItem, SExprTraverseSupp
return param->nodeFilterFn(pItem, pExpr->_node.info);
}
static void exprTreeToBinaryImpl(SBufferWriter* bw, tExprNode* expr) {
tbufWriteUint8(bw, expr->nodeType);
if (expr->nodeType == TEXPR_VALUE_NODE) {
SVariant* pVal = expr->pVal;
tbufWriteUint32(bw, pVal->nType);
if (pVal->nType == TSDB_DATA_TYPE_BINARY) {
tbufWriteInt32(bw, pVal->nLen);
tbufWrite(bw, pVal->pz, pVal->nLen);
} else {
tbufWriteInt64(bw, pVal->i);
}
} else if (expr->nodeType == TEXPR_COL_NODE) {
SSchema* pSchema = expr->pSchema;
tbufWriteInt16(bw, pSchema->colId);
tbufWriteInt16(bw, pSchema->bytes);
tbufWriteUint8(bw, pSchema->type);
tbufWriteString(bw, pSchema->name);
} else if (expr->nodeType == TEXPR_BINARYEXPR_NODE) {
tbufWriteUint8(bw, expr->_node.optr);
exprTreeToBinaryImpl(bw, expr->_node.pLeft);
exprTreeToBinaryImpl(bw, expr->_node.pRight);
}
}
void exprTreeToBinary(SBufferWriter* bw, tExprNode* expr) {
if (expr != NULL) {
exprTreeToBinaryImpl(bw, expr);
}
}
// TODO: these three functions should be made global
static void* exception_calloc(size_t nmemb, size_t size) {
void* p = taosMemoryCalloc(nmemb, size);
@ -230,97 +194,6 @@ tExprNode* exprTreeFromBinary(const void* data, size_t size) {
return exprTreeFromBinaryImpl(&br);
}
tExprNode* exprTreeFromTableName(const char* tbnameCond) {
if (!tbnameCond) {
return NULL;
}
int32_t anchor = CLEANUP_GET_ANCHOR();
tExprNode* expr = exception_calloc(1, sizeof(tExprNode));
CLEANUP_PUSH_VOID_PTR_PTR(true, tExprTreeDestroy, expr, NULL);
expr->nodeType = TEXPR_BINARYEXPR_NODE;
tExprNode* left = exception_calloc(1, sizeof(tExprNode));
expr->_node.pLeft = left;
left->nodeType = TEXPR_COL_NODE;
SSchema* pSchema = exception_calloc(1, sizeof(SSchema));
left->pSchema = pSchema;
// *pSchema = NULL;//*tGetTbnameColumnSchema();
tExprNode* right = exception_calloc(1, sizeof(tExprNode));
expr->_node.pRight = right;
if (strncmp(tbnameCond, QUERY_COND_REL_PREFIX_LIKE, QUERY_COND_REL_PREFIX_LIKE_LEN) == 0) {
right->nodeType = TEXPR_VALUE_NODE;
expr->_node.optr = OP_TYPE_LIKE;
SVariant* pVal = exception_calloc(1, sizeof(SVariant));
right->pVal = pVal;
size_t len = strlen(tbnameCond + QUERY_COND_REL_PREFIX_LIKE_LEN) + 1;
pVal->pz = exception_malloc(len);
memcpy(pVal->pz, tbnameCond + QUERY_COND_REL_PREFIX_LIKE_LEN, len);
pVal->nType = TSDB_DATA_TYPE_BINARY;
pVal->nLen = (int32_t)len;
} else if (strncmp(tbnameCond, QUERY_COND_REL_PREFIX_MATCH, QUERY_COND_REL_PREFIX_MATCH_LEN) == 0) {
right->nodeType = TEXPR_VALUE_NODE;
expr->_node.optr = OP_TYPE_MATCH;
SVariant* pVal = exception_calloc(1, sizeof(SVariant));
right->pVal = pVal;
size_t len = strlen(tbnameCond + QUERY_COND_REL_PREFIX_MATCH_LEN) + 1;
pVal->pz = exception_malloc(len);
memcpy(pVal->pz, tbnameCond + QUERY_COND_REL_PREFIX_MATCH_LEN, len);
pVal->nType = TSDB_DATA_TYPE_BINARY;
pVal->nLen = (int32_t)len;
} else if (strncmp(tbnameCond, QUERY_COND_REL_PREFIX_NMATCH, QUERY_COND_REL_PREFIX_NMATCH_LEN) == 0) {
right->nodeType = TEXPR_VALUE_NODE;
expr->_node.optr = OP_TYPE_NMATCH;
SVariant* pVal = exception_calloc(1, sizeof(SVariant));
right->pVal = pVal;
size_t len = strlen(tbnameCond + QUERY_COND_REL_PREFIX_NMATCH_LEN) + 1;
pVal->pz = exception_malloc(len);
memcpy(pVal->pz, tbnameCond + QUERY_COND_REL_PREFIX_NMATCH_LEN, len);
pVal->nType = TSDB_DATA_TYPE_BINARY;
pVal->nLen = (int32_t)len;
} else if (strncmp(tbnameCond, QUERY_COND_REL_PREFIX_IN, QUERY_COND_REL_PREFIX_IN_LEN) == 0) {
right->nodeType = TEXPR_VALUE_NODE;
expr->_node.optr = OP_TYPE_IN;
SVariant* pVal = exception_calloc(1, sizeof(SVariant));
right->pVal = pVal;
pVal->nType = TSDB_DATA_TYPE_POINTER_ARRAY;
pVal->arr = taosArrayInit(2, POINTER_BYTES);
const char* cond = tbnameCond + QUERY_COND_REL_PREFIX_IN_LEN;
for (const char *e = cond; *e != 0; e++) {
if (*e == TS_PATH_DELIMITER[0]) {
cond = e + 1;
} else if (*e == ',') {
size_t len = e - cond;
char* p = exception_malloc(len + VARSTR_HEADER_SIZE);
STR_WITH_SIZE_TO_VARSTR(p, cond, (VarDataLenT)len);
cond += len;
taosArrayPush(pVal->arr, &p);
}
}
if (*cond != 0) {
size_t len = strlen(cond) + VARSTR_HEADER_SIZE;
char* p = exception_malloc(len);
STR_WITH_SIZE_TO_VARSTR(p, cond, (VarDataLenT)(len - VARSTR_HEADER_SIZE));
taosArrayPush(pVal->arr, &p);
}
taosArraySortString(pVal->arr, taosArrayCompareString);
}
CLEANUP_EXECUTE_TO(anchor, false);
return expr;
}
void buildFilterSetFromBinary(void **q, const char *buf, int32_t len) {
SBufferReader br = tbufInitReader(buf, len, false);
uint32_t type = tbufReadUint32(&br);

View File

@ -1,13 +0,0 @@
#include "tunaryoperator.h"
// TODO dynamic define these functions
//_unary_scalar_fn_t getUnaryScalarOperatorFn(int32_t operator) {
// assert(0);
//}
//bool isStringOperatorFn(int32_t op) {
// return op == FUNCTION_LENGTH;
//}

View File

@ -288,7 +288,7 @@ _return:
SCL_RET(code);
}
int32_t sclExecFuncion(SFunctionNode *node, SScalarCtx *ctx, SScalarParam *output) {
int32_t sclExecFunction(SFunctionNode *node, SScalarCtx *ctx, SScalarParam *output) {
if (NULL == node->pParameterList || node->pParameterList->length <= 0) {
sclError("invalid function parameter list, list:%p, paramNum:%d", node->pParameterList, node->pParameterList ? node->pParameterList->length : 0);
SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
@ -420,7 +420,7 @@ EDealRes sclRewriteFunction(SNode** pNode, SScalarCtx *ctx) {
SFunctionNode *node = (SFunctionNode *)*pNode;
SScalarParam output = {0};
ctx->code = sclExecFuncion(node, ctx, &output);
ctx->code = sclExecFunction(node, ctx, &output);
if (ctx->code) {
return DEAL_RES_ERROR;
}
@ -547,7 +547,7 @@ EDealRes sclWalkFunction(SNode* pNode, SScalarCtx *ctx) {
SFunctionNode *node = (SFunctionNode *)pNode;
SScalarParam output = {0};
ctx->code = sclExecFuncion(node, ctx, &output);
ctx->code = sclExecFunction(node, ctx, &output);
if (ctx->code) {
return DEAL_RES_ERROR;
}
@ -667,7 +667,7 @@ int32_t scalarCalculateConstants(SNode *pNode, SNode **pRes) {
int32_t code = 0;
SScalarCtx ctx = {0};
ctx.pRes = taosHashInit(SCL_DEFAULT_OP_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
ctx.pRes = taosHashInit(SCL_DEFAULT_OP_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
if (NULL == ctx.pRes) {
sclError("taosHashInit failed, num:%d", SCL_DEFAULT_OP_NUM);
SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
@ -689,7 +689,7 @@ int32_t scalarCalculate(SNode *pNode, SArray *pBlockList, SScalarParam *pDst) {
int32_t code = 0;
SScalarCtx ctx = {.code = 0, .pBlockList = pBlockList};
// TODO: OPT performance
ctx.pRes = taosHashInit(SCL_DEFAULT_OP_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
if (NULL == ctx.pRes) {
sclError("taosHashInit failed, num:%d", SCL_DEFAULT_OP_NUM);
@ -716,6 +716,3 @@ _return:
sclFreeRes(ctx.pRes);
return code;
}

View File

@ -377,3 +377,34 @@ static void reverseCopy(char* dest, const char* src, int16_t type, int32_t numOf
}
}
bool getTimePseudoFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
pEnv->calcMemSize = sizeof(int64_t);
return true;
}
int32_t qStartTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
ASSERT(inputNum == 1);
colDataAppendInt64(pOutput->columnData, pOutput->numOfRows, (int64_t *)colDataGetData(pInput->columnData, 0));
}
int32_t qEndTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
ASSERT(inputNum == 1);
colDataAppendInt64(pOutput->columnData, pOutput->numOfRows, (int64_t *)colDataGetData(pInput->columnData, 1));
}
int32_t winDurFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
ASSERT(inputNum == 1);
colDataAppendInt64(pOutput->columnData, pOutput->numOfRows, (int64_t *)colDataGetData(pInput->columnData, 2));
}
int32_t winStartTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
ASSERT(inputNum == 1);
colDataAppendInt64(pOutput->columnData, pOutput->numOfRows, (int64_t*) colDataGetData(pInput->columnData, 3));
return TSDB_CODE_SUCCESS;
}
int32_t winEndTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
ASSERT(inputNum == 1);
colDataAppendInt64(pOutput->columnData, pOutput->numOfRows, (int64_t*) colDataGetData(pInput->columnData, 4));
return TSDB_CODE_SUCCESS;
}