From 531b85ce22e6b83ccbafb1ae96cd025b06a6d98e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 14 Jun 2022 11:54:13 +0800 Subject: [PATCH] enh(query): add interp function. --- include/libs/function/function.h | 1 - include/libs/function/functionMgt.h | 1 + source/libs/executor/inc/executil.h | 5 +- source/libs/executor/inc/executorimpl.h | 8 +- source/libs/executor/src/executil.c | 246 ------------------ source/libs/executor/src/executorimpl.c | 5 - source/libs/executor/src/groupoperator.c | 11 +- source/libs/executor/src/timewindowoperator.c | 145 ++++++++--- source/libs/function/inc/functionMgtInt.h | 1 + source/libs/function/src/builtins.c | 16 +- source/libs/function/src/builtinsimpl.c | 98 ++++++- source/libs/function/src/functionMgt.c | 2 + 12 files changed, 225 insertions(+), 314 deletions(-) diff --git a/include/libs/function/function.h b/include/libs/function/function.h index 831c561ceb..b359fa5d6a 100644 --- a/include/libs/function/function.h +++ b/include/libs/function/function.h @@ -132,7 +132,6 @@ typedef struct SqlFunctionCtx { char *pOutput; // final result output buffer, point to sdata->data int32_t numOfParams; SFunctParam *param; // input parameter, e.g., top(k, 20), the number of results for top query is kept in param - int64_t *ptsList; // corresponding timestamp array list, todo remove it SColumnInfoData *pTsOutput; // corresponding output buffer for timestamp of each result, e.g., top/bottom*/ int32_t offset; struct SResultRowEntryInfo *resultInfo; diff --git a/include/libs/function/functionMgt.h b/include/libs/function/functionMgt.h index 8bc086ca9e..9ffffcc928 100644 --- a/include/libs/function/functionMgt.h +++ b/include/libs/function/functionMgt.h @@ -176,6 +176,7 @@ bool fmIsRepeatScanFunc(int32_t funcId); bool fmIsUserDefinedFunc(int32_t funcId); bool fmIsDistExecFunc(int32_t funcId); bool fmIsForbidFillFunc(int32_t funcId); +bool fmIsIntervalInterpoFunc(int32_t funcId); int32_t fmGetDistMethod(const SFunctionNode* pFunc, SFunctionNode** pPartialFunc, SFunctionNode** pMergeFunc); diff --git a/source/libs/executor/inc/executil.h b/source/libs/executor/inc/executil.h index b8975854c9..ca25b7aab1 100644 --- a/source/libs/executor/inc/executil.h +++ b/source/libs/executor/inc/executil.h @@ -44,7 +44,6 @@ typedef struct SGroupResInfo { int32_t index; SArray* pRows; // SArray - int32_t position; } SGroupResInfo; typedef struct SResultRow { @@ -56,7 +55,7 @@ typedef struct SResultRow { uint32_t numOfRows; // number of rows of current time window STimeWindow win; struct SResultRowEntryInfo pEntryInfo[]; // For each result column, there is a resultInfo -// char *key; // start key of current result row +// char *key; // start key of current result row } SResultRow; typedef struct SResultRowPosition { @@ -71,9 +70,7 @@ typedef struct SResKeyPos { } SResKeyPos; typedef struct SResultRowInfo { - SResultRowPosition *pPosition; // todo remove this int32_t size; // number of result set - int32_t capacity; // max capacity SResultRowPosition cur; SList* openWindow; } SResultRowInfo; diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 94c6512e77..92e2fd278a 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -472,10 +472,8 @@ typedef struct SIntervalAggOperatorInfo { bool timeWindowInterpo; // interpolation needed or not char** pRow; // previous row/tuple of already processed datablock SArray* pInterpCols; // interpolation columns - STableQueryInfo* pCurrent; // current tableQueryInfo struct int32_t order; // current SSDataBlock scan order EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model] - SArray* pUpdatedWindow; // updated time window due to the input data block from the downstream operator. STimeWindowAggSupp twAggSup; bool invertible; SArray* pPrevValues; // SArray used to keep the previous not null value for interpolation. @@ -504,8 +502,6 @@ typedef struct SAggOperatorInfo { STableQueryInfo *current; uint64_t groupId; SGroupResInfo groupResInfo; - STableQueryInfo *pTableQueryInfo; - SExprInfo *pScalarExprInfo; int32_t numOfScalarExpr; // the number of scalar expression before the aggregate function can be applied SqlFunctionCtx *pScalarCtx; // scalar function requried sql function struct. @@ -640,8 +636,12 @@ typedef struct SStreamSessionAggOperatorInfo { typedef struct STimeSliceOperatorInfo { SOptrBasicInfo binfo; + STimeWindow win; SInterval interval; + int64_t current; SGroupResInfo groupResInfo; // multiple results build supporter + SArray* pPrevRow; // SArray + SArray* pCols; // SArray } STimeSliceOperatorInfo; typedef struct SStateWindowOperatorInfo { diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 7550c744c8..6ba1cf859e 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -41,13 +41,7 @@ int32_t getOutputInterResultBufSize(STaskAttr* pQueryAttr) { int32_t initResultRowInfo(SResultRowInfo *pResultRowInfo, int32_t size) { pResultRowInfo->size = 0; - pResultRowInfo->capacity = size; pResultRowInfo->cur.pageId = -1; - - pResultRowInfo->pPosition = taosMemoryCalloc(pResultRowInfo->capacity, sizeof(SResultRowPosition)); - if (pResultRowInfo->pPosition == NULL) { - return TSDB_CODE_QRY_OUT_OF_MEMORY; - } return TSDB_CODE_SUCCESS; } @@ -56,25 +50,14 @@ void cleanupResultRowInfo(SResultRowInfo *pResultRowInfo) { return; } - if (pResultRowInfo->capacity == 0) { -// assert(pResultRowInfo->pResult == NULL); - return; - } - for(int32_t i = 0; i < pResultRowInfo->size; ++i) { // if (pResultRowInfo->pResult[i]) { // taosMemoryFreeClear(pResultRowInfo->pResult[i]->key); // } } - - taosMemoryFreeClear(pResultRowInfo->pPosition); } void resetResultRowInfo(STaskRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo) { - if (pResultRowInfo == NULL || pResultRowInfo->capacity == 0) { - return; - } - for (int32_t i = 0; i < pResultRowInfo->size; ++i) { // SResultRow *pWindowRes = pResultRowInfo->pResult[i]; // clearResultRow(pRuntimeEnv, pWindowRes); @@ -288,232 +271,3 @@ void orderTheResultRows(STaskRuntimeEnv* pRuntimeEnv) { taosArraySort(pRuntimeEnv->pResultRowArrayList, fn); } - -static int32_t mergeIntoGroupResultImplRv(STaskRuntimeEnv *pRuntimeEnv, SGroupResInfo* pGroupResInfo, uint64_t groupId, int32_t* rowCellInfoOffset) { - if (pGroupResInfo->pRows == NULL) { - pGroupResInfo->pRows = taosArrayInit(100, POINTER_BYTES); - } - - size_t len = taosArrayGetSize(pRuntimeEnv->pResultRowArrayList); - for(; pGroupResInfo->position < len; ++pGroupResInfo->position) { - SResultRowCell* pResultRowCell = taosArrayGet(pRuntimeEnv->pResultRowArrayList, pGroupResInfo->position); - if (pResultRowCell->groupId != groupId) { - break; - } - - - int64_t num = 0;//getNumOfResultWindowRes(pRuntimeEnv, &pResultRowCell->pos, rowCellInfoOffset); - if (num <= 0) { - continue; - } - - taosArrayPush(pGroupResInfo->pRows, &pResultRowCell->pos); -// pResultRowCell->pRow->numOfRows = (uint32_t) num; - } - - return TSDB_CODE_SUCCESS; -} - -static UNUSED_FUNC int32_t mergeIntoGroupResultImpl(STaskRuntimeEnv *pRuntimeEnv, SGroupResInfo* pGroupResInfo, SArray *pTableList, - int32_t* rowCellInfoOffset) { - bool ascQuery = true; -#if 0 - int32_t code = TSDB_CODE_SUCCESS; - - int32_t *posList = NULL; - SMultiwayMergeTreeInfo *pTree = NULL; - STableQueryInfo **pTableQueryInfoList = NULL; - - size_t size = taosArrayGetSize(pTableList); - if (pGroupResInfo->pRows == NULL) { - pGroupResInfo->pRows = taosArrayInit(100, POINTER_BYTES); - } - - posList = taosMemoryCalloc(size, sizeof(int32_t)); - pTableQueryInfoList = taosMemoryMalloc(POINTER_BYTES * size); - - if (pTableQueryInfoList == NULL || posList == NULL || pGroupResInfo->pRows == NULL || pGroupResInfo->pRows == NULL) { -// qError("QInfo:%"PRIu64" failed alloc memory", GET_TASKID(pRuntimeEnv)); - code = TSDB_CODE_QRY_OUT_OF_MEMORY; - goto _end; - } - - int32_t numOfTables = 0; - for (int32_t i = 0; i < size; ++i) { - STableQueryInfo *item = taosArrayGetP(pTableList, i); -// if (item->resInfo.size > 0) { -// pTableQueryInfoList[numOfTables++] = item; -// } - } - - // there is no data in current group - // no need to merge results since only one table in each group -// if (numOfTables == 0) { -// goto _end; -// } - - int32_t order = TSDB_ORDER_ASC; - SCompSupporter cs = {pTableQueryInfoList, posList, order}; - - int32_t ret = tMergeTreeCreate(&pTree, numOfTables, &cs, tableResultComparFn); - if (ret != TSDB_CODE_SUCCESS) { - code = TSDB_CODE_QRY_OUT_OF_MEMORY; - goto _end; - } - - int64_t lastTimestamp = ascQuery? INT64_MIN:INT64_MAX; - int64_t startt = taosGetTimestampMs(); - - while (1) { - int32_t tableIndex = tMergeTreeGetChosenIndex(pTree); - - SResultRowInfo *pWindowResInfo = &pTableQueryInfoList[tableIndex]->resInfo; - ASSERT(0); - SResultRow *pWindowRes = NULL;//getResultRow(pBuf, pWindowResInfo, cs.rowIndex[tableIndex]); - - int64_t num = 0;//getNumOfResultWindowRes(pRuntimeEnv, pWindowRes, rowCellInfoOffset); - if (num <= 0) { - cs.rowIndex[tableIndex] += 1; - - if (cs.rowIndex[tableIndex] >= pWindowResInfo->size) { - cs.rowIndex[tableIndex] = -1; - if (--numOfTables == 0) { // all input sources are exhausted - break; - } - } - } else { - assert((pWindowRes->win.skey >= lastTimestamp && ascQuery) || (pWindowRes->win.skey <= lastTimestamp && !ascQuery)); - - if (pWindowRes->win.skey != lastTimestamp) { - taosArrayPush(pGroupResInfo->pRows, &pWindowRes); - pWindowRes->numOfRows = (uint32_t) num; - } - - lastTimestamp = pWindowRes->win.skey; - - // move to the next row of current entry - if ((++cs.rowIndex[tableIndex]) >= pWindowResInfo->size) { - cs.rowIndex[tableIndex] = -1; - - // all input sources are exhausted - if ((--numOfTables) == 0) { - break; - } - } - } - - tMergeTreeAdjust(pTree, tMergeTreeGetAdjustIndex(pTree)); - } - - int64_t endt = taosGetTimestampMs(); - -// qDebug("QInfo:%"PRIx64" result merge completed for group:%d, elapsed time:%" PRId64 " ms", GET_TASKID(pRuntimeEnv), -// pGroupResInfo->currentGroup, endt - startt); - - _end: - taosMemoryFreeClear(pTableQueryInfoList); - taosMemoryFreeClear(posList); - taosMemoryFreeClear(pTree); - - return code; -} - -int32_t mergeIntoGroupResult(SGroupResInfo* pGroupResInfo, STaskRuntimeEnv* pRuntimeEnv, int32_t* offset) { - int64_t st = taosGetTimestampUs(); - - while (pGroupResInfo->currentGroup < pGroupResInfo->totalGroup) { - mergeIntoGroupResultImplRv(pRuntimeEnv, pGroupResInfo, pGroupResInfo->currentGroup, offset); - - // this group generates at least one result, return results - if (taosArrayGetSize(pGroupResInfo->pRows) > 0) { - break; - } - -// qDebug("QInfo:%"PRIu64" no result in group %d, continue", GET_TASKID(pRuntimeEnv), pGroupResInfo->currentGroup); - cleanupGroupResInfo(pGroupResInfo); - incNextGroup(pGroupResInfo); - } - -// int64_t elapsedTime = taosGetTimestampUs() - st; -// qDebug("QInfo:%"PRIu64" merge res data into group, index:%d, total group:%d, elapsed time:%" PRId64 "us", GET_TASKID(pRuntimeEnv), -// pGroupResInfo->currentGroup, pGroupResInfo->totalGroup, elapsedTime); -#endif - - return TSDB_CODE_SUCCESS; -} - -//void blockDistInfoToBinary(STableBlockDist* pDist, struct SBufferWriter* bw) { -// tbufWriteUint32(bw, pDist->numOfTables); -// tbufWriteUint16(bw, pDist->numOfFiles); -// tbufWriteUint64(bw, pDist->totalSize); -// tbufWriteUint64(bw, pDist->totalRows); -// tbufWriteInt32(bw, pDist->maxRows); -// tbufWriteInt32(bw, pDist->minRows); -// tbufWriteUint32(bw, pDist->numOfInmemRows); -// tbufWriteUint32(bw, pDist->numOfSmallBlocks); -// tbufWriteUint64(bw, taosArrayGetSize(pDist->dataBlockInfos)); -// -// // compress the binary string -// char* p = TARRAY_GET_START(pDist->dataBlockInfos); -// -// // compress extra bytes -// size_t x = taosArrayGetSize(pDist->dataBlockInfos) * pDist->dataBlockInfos->elemSize; -// char* tmp = taosMemoryMalloc(x + 2); -// -// bool comp = false; -// int32_t len = tsCompressString(p, (int32_t)x, 1, tmp, (int32_t)x, ONE_STAGE_COMP, NULL, 0); -// if (len == -1 || len >= x) { // compress failed, do not compress this binary data -// comp = false; -// len = (int32_t)x; -// } else { -// comp = true; -// } -// -// tbufWriteUint8(bw, comp); -// tbufWriteUint32(bw, len); -// if (comp) { -// tbufWriteBinary(bw, tmp, len); -// } else { -// tbufWriteBinary(bw, p, len); -// } -// taosMemoryFreeClear(tmp); -//} - -//void blockDistInfoFromBinary(const char* data, int32_t len, STableBlockDist* pDist) { -// SBufferReader br = tbufInitReader(data, len, false); -// -// pDist->numOfTables = tbufReadUint32(&br); -// pDist->numOfFiles = tbufReadUint16(&br); -// pDist->totalSize = tbufReadUint64(&br); -// pDist->totalRows = tbufReadUint64(&br); -// pDist->maxRows = tbufReadInt32(&br); -// pDist->minRows = tbufReadInt32(&br); -// pDist->numOfInmemRows = tbufReadUint32(&br); -// pDist->numOfSmallBlocks = tbufReadUint32(&br); -// int64_t numSteps = tbufReadUint64(&br); -// -// bool comp = tbufReadUint8(&br); -// uint32_t compLen = tbufReadUint32(&br); -// -// size_t originalLen = (size_t) (numSteps *sizeof(SFileBlockInfo)); -// -// char* outputBuf = NULL; -// if (comp) { -// outputBuf = taosMemoryMalloc(originalLen); -// -// size_t actualLen = compLen; -// const char* compStr = tbufReadBinary(&br, &actualLen); -// -// int32_t orignalLen = tsDecompressString(compStr, compLen, 1, outputBuf, -// (int32_t)originalLen , ONE_STAGE_COMP, NULL, 0); -// assert(orignalLen == numSteps *sizeof(SFileBlockInfo)); -// } else { -// outputBuf = (char*) tbufReadBinary(&br, &originalLen); -// } -// -// pDist->dataBlockInfos = taosArrayFromList(outputBuf, (uint32_t)numSteps, sizeof(SFileBlockInfo)); -// if (comp) { -// taosMemoryFreeClear(outputBuf); -// } -//} - diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 789fe9929b..3219ac061f 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -118,7 +118,6 @@ static void destroyIntervalOperatorInfo(void* param, int32_t numOfOutput); static void destroyExchangeOperatorInfo(void* param, int32_t numOfOutput); static void destroyOperatorInfo(SOperatorInfo* pOperator); -static void destroySysTableScannerOperatorInfo(void* param, int32_t numOfOutput); void doSetOperatorCompleted(SOperatorInfo* pOperator) { pOperator->status = OP_EXEC_DONE; @@ -562,10 +561,6 @@ void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow pCtx[k].input.startRowIndex = offset; pCtx[k].input.numOfRows = forwardStep; - if (tsCol != NULL) { - pCtx[k].ptsList = tsCol; - } - // not a whole block involved in query processing, statistics data can not be used // NOTE: the original value of isSet have been changed here if (pCtx[k].input.colDataAggIsSet && forwardStep < numOfTotal) { diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 132f93a6a5..2aa98e099c 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -378,8 +378,9 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) { return (rows == 0)? NULL:pRes; } -SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SArray* pGroupColList, - SNode* pCondition, SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo) { +SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, + SSDataBlock* pResultBlock, SArray* pGroupColList, SNode* pCondition, + SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo) { SGroupbyOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SGroupbyOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { @@ -407,7 +408,7 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx pOperator->status = OP_NOT_OPENED; // pOperator->operatorType = OP_Groupby; pOperator->pExpr = pExprInfo; - pOperator->numOfExprs = numOfCols; + pOperator->numOfExprs = numOfCols; pOperator->info = pInfo; pOperator->pTaskInfo = pTaskInfo; @@ -659,8 +660,8 @@ static void destroyPartitionOperatorInfo(void* param, int32_t numOfOutput) { taosMemoryFree(pInfo->columnOffset); } -SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SArray* pGroupColList, - SExecTaskInfo* pTaskInfo) { +SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, + SSDataBlock* pResultBlock, SArray* pGroupColList, SExecTaskInfo* pTaskInfo) { SPartitionOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SPartitionOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 9a7045110c..fcb8f329d6 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -30,18 +30,18 @@ static int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo static SResultRowPosition addToOpenWindowList(SResultRowInfo* pResultRowInfo, const SResultRow* pResult); static void doCloseWindow(SResultRowInfo* pResultRowInfo, const SIntervalAggOperatorInfo* pInfo, SResultRow* pResult); -/* - * There are two cases to handle: - * - * 1. Query range is not set yet (queryRangeSet = 0). we need to set the query range info, including - * pQueryAttr->lastKey, pQueryAttr->window.skey, and pQueryAttr->eKey. - * 2. Query range is set and query is in progress. There may be another result with the same query ranges to be - * merged during merge stage. In this case, we need the pTableQueryInfo->lastResRows to decide if there - * is a previous result generated or not. - */ -static void setIntervalQueryRange(STableQueryInfo* pTableQueryInfo, TSKEY key, STimeWindow* pQRange) { - // do nothing -} +///* +// * There are two cases to handle: +// * +// * 1. Query range is not set yet (queryRangeSet = 0). we need to set the query range info, including +// * pQueryAttr->lastKey, pQueryAttr->window.skey, and pQueryAttr->eKey. +// * 2. Query range is set and query is in progress. There may be another result with the same query ranges to be +// * merged during merge stage. In this case, we need the pTableQueryInfo->lastResRows to decide if there +// * is a previous result generated or not. +// */ +//static void setIntervalQueryRange(STableQueryInfo* pTableQueryInfo, TSKEY key, STimeWindow* pQRange) { +// // do nothing +//} static TSKEY getStartTsKey(STimeWindow* win, const TSKEY* tsCols) { return tsCols == NULL ? win->skey : tsCols[0]; } @@ -327,8 +327,7 @@ void doTimeWindowInterpolation(SIntervalAggOperatorInfo* pInfo, int32_t numOfExp int32_t index = 1; for (int32_t k = 0; k < numOfExprs; ++k) { - // todo use flag instead of function name - if (strcmp(pCtx[k].pExpr->pExpr->_function.functionName, "twa") != 0) { + if (!fmIsIntervalInterpoFunc(pCtx[k].functionId)) { pCtx[k].start.key = INT64_MIN; continue; } @@ -958,9 +957,6 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) { // the pDataBlock are always the same one, no need to call this again setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, pInfo->order, scanFlag, true); - STableQueryInfo* pTableQueryInfo = pInfo->pCurrent; - - setIntervalQueryRange(pTableQueryInfo, pBlock->info.window.skey, &pTaskInfo->window); hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, scanFlag, NULL); #if 0 // test for encode/decode result info @@ -1415,7 +1411,7 @@ static bool timeWindowinterpNeeded(SqlFunctionCtx* pCtx, int32_t numOfCols, SInt for (int32_t i = 0; i < numOfCols; ++i) { SExprInfo* pExpr = pCtx[i].pExpr; - if (strcmp(pExpr->pExpr->_function.functionName, "twa") == 0) { + if (fmIsIntervalInterpoFunc(pCtx[i].functionId)) { SFunctParam* pParam = &pExpr->base.pParam[0]; SColumn c = *pParam->pCol; @@ -1476,11 +1472,9 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pInfo->timeWindowInterpo = timeWindowinterpNeeded(pInfo->binfo.pCtx, numOfCols, pInfo); if (pInfo->timeWindowInterpo) { pInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SResultRowPosition)); - } - - // pInfo->pTableQueryInfo = initTableQueryInfo(pTableGroupInfo); - if (code != TSDB_CODE_SUCCESS /* || pInfo->pTableQueryInfo == NULL*/) { - goto _error; + if (pInfo->binfo.resultRowInfo.openWindow == NULL) { + goto _error; + } } initResultRowInfo(&pInfo->binfo.resultRowInfo, (int32_t)1); @@ -1695,22 +1689,25 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) { return (rows == 0) ? NULL : pBInfo->pRes; } -static SSDataBlock* doAllIntervalAgg(SOperatorInfo* pOperator) { +static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { if (pOperator->status == OP_EXEC_DONE) { return NULL; } STimeSliceOperatorInfo* pSliceInfo = pOperator->info; + SSDataBlock* pResBlock = pSliceInfo->binfo.pRes; + if (pOperator->status == OP_RES_TO_RETURN) { // doBuildResultDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes); - if (pSliceInfo->binfo.pRes->info.rows == 0 || !hashRemainDataInGroupInfo(&pSliceInfo->groupResInfo)) { + if (pResBlock->info.rows == 0 || !hashRemainDataInGroupInfo(&pSliceInfo->groupResInfo)) { doSetOperatorCompleted(pOperator); } - return pSliceInfo->binfo.pRes; + return pResBlock; } - int32_t order = TSDB_ORDER_ASC; + int32_t order = TSDB_ORDER_ASC; + SInterval* pInterval = &pSliceInfo->interval; SOperatorInfo* downstream = pOperator->pDownstream[0]; while (1) { @@ -1721,7 +1718,38 @@ static SSDataBlock* doAllIntervalAgg(SOperatorInfo* pOperator) { // the pDataBlock are always the same one, no need to call this again setInputDataBlock(pOperator, pSliceInfo->binfo.pCtx, pBlock, order, MAIN_SCAN, true); - // hashAllIntervalAgg(pOperator, &pSliceInfo->binfo.resultRowInfo, pBlock, 0); + + SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, 0); + for(int32_t i = 0; i < pBlock->info.rows; ++i) { + int64_t ts = *(int64_t*) colDataGetData(pTsCol, i); + + if (ts == pSliceInfo->current) { + // output the result + + pSliceInfo->current += taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); + if (pSliceInfo->current > pSliceInfo->win.ekey) { + doSetOperatorCompleted(pOperator); + break; + } + } else if (ts < pSliceInfo->current) { + if (i != pBlock->info.window.ekey) { + int64_t nextTs = *(int64_t*) colDataGetData(pTsCol, i + 1); + if (nextTs > pSliceInfo->current) { + // output the result + + pSliceInfo->current += taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); + if (pSliceInfo->current > pSliceInfo->win.ekey) { + doSetOperatorCompleted(pOperator); + break; + } + } else { + // keep current row + } + } else { // it is the last row of current block + // keep current row + } + } + } } // restore the value @@ -1733,11 +1761,38 @@ static SSDataBlock* doAllIntervalAgg(SOperatorInfo* pOperator) { // initGroupedResultInfo(&pSliceInfo->groupResInfo, &pSliceInfo->binfo.resultRowInfo); // doBuildResultDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pSliceInfo->pRes); - if (pSliceInfo->binfo.pRes->info.rows == 0 || !hashRemainDataInGroupInfo(&pSliceInfo->groupResInfo)) { + if (pResBlock->info.rows == 0 || !hashRemainDataInGroupInfo(&pSliceInfo->groupResInfo)) { pOperator->status = OP_EXEC_DONE; } - return pSliceInfo->binfo.pRes->info.rows == 0 ? NULL : pSliceInfo->binfo.pRes; + return pResBlock->info.rows == 0 ? NULL : pResBlock; +} + +static int32_t initTimesliceInfo(STimeSliceOperatorInfo* pInfo, SqlFunctionCtx* pCtx, int32_t numOfCols) { + pInfo->pPrevRow = taosArrayInit(4, sizeof(SGroupKeys)); + pInfo->pCols = taosArrayInit(4, sizeof(SColumn)); + + if (pInfo->pPrevRow == NULL || pInfo->pCols == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + for (int32_t i = 0; i < numOfCols; ++i) { + SExprInfo* pExpr = pCtx[i].pExpr; + + SFunctParam* pParam = &pExpr->base.pParam[0]; + + SColumn c = *pParam->pCol; + taosArrayPush(pInfo->pCols, &c); + + SGroupKeys key = {0}; + key.bytes = c.bytes; + key.type = c.type; + key.isNull = false; + key.pData = taosMemoryCalloc(1, c.bytes); + taosArrayPush(pInfo->pPrevRow, &key); + } + + return TSDB_CODE_SUCCESS; } SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, @@ -1748,21 +1803,28 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo* goto _error; } + int32_t code = initTimesliceInfo(pInfo, pInfo->binfo.pCtx, numOfCols); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + initResultRowInfo(&pInfo->binfo.resultRowInfo, 8); - pOperator->name = "TimeSliceOperator"; - // pOperator->operatorType = OP_AllTimeWindow; - pOperator->blocking = true; - pOperator->status = OP_NOT_OPENED; - pOperator->pExpr = pExprInfo; - pOperator->numOfExprs = numOfCols; - pOperator->info = pInfo; - pOperator->pTaskInfo = pTaskInfo; + pInfo->binfo.pRes = pResultBlock; - pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doAllIntervalAgg, NULL, NULL, destroyBasicOperatorInfo, + pOperator->name = "TimeSliceOperator"; + // pOperator->operatorType = OP_AllTimeWindow; + pOperator->blocking = true; + pOperator->status = OP_NOT_OPENED; + pOperator->pExpr = pExprInfo; + pOperator->numOfExprs = numOfCols; + pOperator->info = pInfo; + pOperator->pTaskInfo = pTaskInfo; + + pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTimeslice, NULL, NULL, destroyBasicOperatorInfo, NULL, NULL, NULL); - int32_t code = appendDownstream(pOperator, &downstream, 1); + code = appendDownstream(pOperator, &downstream, 1); return pOperator; _error: @@ -3326,9 +3388,6 @@ static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) { getTableScanInfo(pOperator, &iaInfo->order, &scanFlag); setInputDataBlock(pOperator, iaInfo->binfo.pCtx, pBlock, iaInfo->order, scanFlag, true); - STableQueryInfo* pTableQueryInfo = iaInfo->pCurrent; - - setIntervalQueryRange(pTableQueryInfo, pBlock->info.window.skey, &pTaskInfo->window); doMergeIntervalAggImpl(pOperator, &iaInfo->binfo.resultRowInfo, pBlock, scanFlag, pRes); if (pRes->info.rows >= pOperator->resultInfo.threshold) { diff --git a/source/libs/function/inc/functionMgtInt.h b/source/libs/function/inc/functionMgtInt.h index d1af6b6051..7b1326c157 100644 --- a/source/libs/function/inc/functionMgtInt.h +++ b/source/libs/function/inc/functionMgtInt.h @@ -42,6 +42,7 @@ extern "C" { #define FUNC_MGT_SELECT_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(13) #define FUNC_MGT_REPEAT_SCAN_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(14) #define FUNC_MGT_FORBID_FILL_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(15) +#define FUNC_MGT_INTERVAL_INTERPO_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(16) #define FUNC_MGT_TEST_MASK(val, mask) (((val) & (mask)) != 0) diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 85da523cd0..ba93fdf716 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -1508,6 +1508,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .type = FUNCTION_TYPE_AVG, .classification = FUNC_MGT_AGG_FUNC, .translateFunc = translateInNumOutDou, + .dataRequiredFunc = statisDataRequired, .getEnvFunc = getAvgFuncEnv, .initFunc = avgFunctionSetup, .processFunc = avgFunction, @@ -1679,7 +1680,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "elapsed", .type = FUNCTION_TYPE_ELAPSED, - .classification = FUNC_MGT_AGG_FUNC, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_INTERVAL_INTERPO_FUNC, .dataRequiredFunc = statisDataRequired, .translateFunc = translateElapsed, .getEnvFunc = getElapsedFuncEnv, @@ -1717,6 +1718,16 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .invertFunc = NULL, .combineFunc = elapsedCombine, }, +// { +// .name = "interp", +// .type = FUNCTION_TYPE_INTERP, +// .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_INTERVAL_INTERPO_FUNC, +// .translateFunc = translateFirstLast, +// .getEnvFunc = getSelectivityFuncEnv, +// .initFunc = functionSetup, +// .processFunc = interpFunction, +// .finalizeFunc = NULL +// }, { .name = "derivative", .type = FUNCTION_TYPE_DERIVATIVE, @@ -1762,8 +1773,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "twa", .type = FUNCTION_TYPE_TWA, - .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TIMELINE_FUNC, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_INTERVAL_INTERPO_FUNC, .translateFunc = translateInNumOutDou, + .dataRequiredFunc = statisDataRequired, .getEnvFunc = getTwaFuncEnv, .initFunc = twaFunctionSetup, .processFunc = twaFunction, diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index b63406b6da..0dc37c93cd 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -933,9 +933,6 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { index = pInput->pColumnDataAgg[0]->maxIndex; } - // the index is the original position, not the relative position - TSKEY key = (pCtx->ptsList != NULL) ? pCtx->ptsList[index] : TSKEY_INITIAL_VAL; - if (!pBuf->assign) { pBuf->v = *(int64_t*)tval; if (pCtx->subsidiaries.num > 0) { @@ -3284,7 +3281,7 @@ int32_t elapsedFunction(SqlFunctionCtx *pCtx) { SColumnInfoData* pCol = pInput->pData[0]; int32_t start = pInput->startRowIndex; - TSKEY* ptsList = (int64_t*)colDataGetData(pCol, start); + TSKEY* ptsList = (int64_t*)colDataGetData(pCol, 0); if (pCtx->order == TSDB_ORDER_DESC) { if (pCtx->start.key == INT64_MIN) { pInfo->max = (pInfo->max < ptsList[start + pInput->numOfRows - 1]) ? ptsList[start + pInput->numOfRows - 1] : pInfo->max; @@ -5085,4 +5082,97 @@ int32_t derivativeFunction(SqlFunctionCtx *pCtx) { } return numOfElems; +} + +int32_t interpFunction(SqlFunctionCtx *pCtx) { +#if 0 + int32_t fillType = (int32_t) pCtx->param[2].i64; + //bool ascQuery = (pCtx->order == TSDB_ORDER_ASC); + + if (pCtx->start.key == pCtx->startTs) { + assert(pCtx->start.key != INT64_MIN); + + COPY_TYPED_DATA(pCtx->pOutput, pCtx->inputType, &pCtx->start.val); + + goto interp_success_exit; + } else if (pCtx->end.key == pCtx->startTs && pCtx->end.key != INT64_MIN && fillType == TSDB_FILL_NEXT) { + COPY_TYPED_DATA(pCtx->pOutput, pCtx->inputType, &pCtx->end.val); + + goto interp_success_exit; + } + + switch (fillType) { + case TSDB_FILL_NULL: + setNull(pCtx->pOutput, pCtx->outputType, pCtx->outputBytes); + break; + + case TSDB_FILL_SET_VALUE: + tVariantDump(&pCtx->param[1], pCtx->pOutput, pCtx->inputType, true); + break; + + case TSDB_FILL_LINEAR: + if (pCtx->start.key == INT64_MIN || pCtx->start.key > pCtx->startTs + || pCtx->end.key == INT64_MIN || pCtx->end.key < pCtx->startTs) { + goto interp_exit; + } + + double v1 = -1, v2 = -1; + GET_TYPED_DATA(v1, double, pCtx->inputType, &pCtx->start.val); + GET_TYPED_DATA(v2, double, pCtx->inputType, &pCtx->end.val); + + SPoint point1 = {.key = pCtx->start.key, .val = &v1}; + SPoint point2 = {.key = pCtx->end.key, .val = &v2}; + SPoint point = {.key = pCtx->startTs, .val = pCtx->pOutput}; + + int32_t srcType = pCtx->inputType; + if (isNull((char *)&pCtx->start.val, srcType) || isNull((char *)&pCtx->end.val, srcType)) { + setNull(pCtx->pOutput, srcType, pCtx->inputBytes); + } else { + bool exceedMax = false, exceedMin = false; + taosGetLinearInterpolationVal(&point, pCtx->outputType, &point1, &point2, TSDB_DATA_TYPE_DOUBLE, &exceedMax, &exceedMin); + if (exceedMax || exceedMin) { + __compar_fn_t func = getComparFunc((int32_t)pCtx->inputType, 0); + if (func(&pCtx->start.val, &pCtx->end.val) <= 0) { + COPY_TYPED_DATA(pCtx->pOutput, pCtx->inputType, exceedMax ? &pCtx->start.val : &pCtx->end.val); + } else { + COPY_TYPED_DATA(pCtx->pOutput, pCtx->inputType, exceedMax ? &pCtx->end.val : &pCtx->start.val); + } + } + } + break; + + case TSDB_FILL_PREV: + if (pCtx->start.key == INT64_MIN || pCtx->start.key > pCtx->startTs) { + goto interp_exit; + } + + COPY_TYPED_DATA(pCtx->pOutput, pCtx->inputType, &pCtx->start.val); + break; + + case TSDB_FILL_NEXT: + if (pCtx->end.key == INT64_MIN || pCtx->end.key < pCtx->startTs) { + goto interp_exit; + } + + COPY_TYPED_DATA(pCtx->pOutput, pCtx->inputType, &pCtx->end.val); + break; + + case TSDB_FILL_NONE: + // do nothing + default: + goto interp_exit; + } + + + interp_success_exit: + *(TSKEY*)pCtx->ptsOutputBuf = pCtx->startTs; + INC_INIT_VAL(pCtx, 1); + + interp_exit: + pCtx->start.key = INT64_MIN; + pCtx->end.key = INT64_MIN; + pCtx->endTs = pCtx->startTs; +#endif + + return TSDB_CODE_SUCCESS; } \ No newline at end of file diff --git a/source/libs/function/src/functionMgt.c b/source/libs/function/src/functionMgt.c index caae4c05f7..f240a6737d 100644 --- a/source/libs/function/src/functionMgt.c +++ b/source/libs/function/src/functionMgt.c @@ -161,6 +161,8 @@ bool fmIsUserDefinedFunc(int32_t funcId) { return funcId > FUNC_UDF_ID_START; } bool fmIsForbidFillFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_FORBID_FILL_FUNC); } +bool fmIsIntervalInterpoFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_INTERVAL_INTERPO_FUNC); } + void fmFuncMgtDestroy() { void* m = gFunMgtService.pFuncNameHashTable; if (m != NULL && atomic_val_compare_exchange_ptr((void**)&gFunMgtService.pFuncNameHashTable, m, 0) == m) {