From 1652cd0ec2a47955dffdfc0ee126a1a9b608eaef Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 29 May 2022 12:35:11 +0800 Subject: [PATCH] enh(query): enable twa function in select clause. --- include/common/tmsg.h | 7 - include/libs/executor/executor.h | 12 - include/libs/function/function.h | 47 +-- include/util/tlist.h | 2 +- source/libs/executor/inc/executil.h | 1 + source/libs/executor/inc/executorimpl.h | 6 +- source/libs/executor/src/executil.c | 28 +- source/libs/executor/src/executorimpl.c | 104 +---- source/libs/executor/src/groupoperator.c | 12 +- source/libs/executor/src/scanoperator.c | 4 +- source/libs/executor/src/timewindowoperator.c | 318 ++++++++++----- source/libs/function/inc/builtinsimpl.h | 4 + source/libs/function/inc/taggfunction.h | 7 - source/libs/function/src/builtins.c | 13 +- source/libs/function/src/builtinsimpl.c | 373 ++++++++++++++++-- source/libs/function/src/taggfunction.c | 3 +- source/util/src/tlist.c | 2 +- 17 files changed, 595 insertions(+), 348 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index faf4addb4b..c4abfffc61 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -571,13 +571,6 @@ int32_t tSerializeSGetUserAuthRsp(void* buf, int32_t bufLen, SGetUserAuthRsp* pR int32_t tDeserializeSGetUserAuthRsp(void* buf, int32_t bufLen, SGetUserAuthRsp* pRsp); void tFreeSGetUserAuthRsp(SGetUserAuthRsp* pRsp); -typedef struct { - int16_t colId; // column id - int16_t colIndex; // column index in colList if it is a normal column or index in tagColList if a tag - int16_t flag; // denote if it is a tag or a normal column - char name[TSDB_DB_FNAME_LEN]; -} SColIndex; - typedef struct { int16_t lowerRelOptr; int16_t upperRelOptr; diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 5379a8f712..288248422b 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -156,18 +156,6 @@ int64_t qGetQueriedTableUid(qTaskInfo_t tinfo); */ int32_t qGetQualifiedTableIdList(void* pTableList, const char* tagCond, int32_t tagCondLen, SArray* pTableIdList); -/** - * Create the table group according to the group by tags info - * @param pTableIdList - * @param skey - * @param groupInfo - * @param groupByIndex - * @param numOfIndex - * @return - */ -// int32_t qCreateTableGroupByGroupExpr(SArray* pTableIdList, TSKEY skey, STableGroupInfo groupInfo, SColIndex* -// groupByIndex, int32_t numOfIndex); - /** * Update the table id list of a given query. * @param uid child table uid diff --git a/include/libs/function/function.h b/include/libs/function/function.h index 21b7309055..01f347fd05 100644 --- a/include/libs/function/function.h +++ b/include/libs/function/function.h @@ -67,50 +67,6 @@ typedef struct SFileBlockInfo { #define TOP_BOTTOM_QUERY_LIMIT 100 #define FUNCTIONS_NAME_MAX_LENGTH 16 -#define FUNCTION_INVALID_ID -1 -#define FUNCTION_COUNT 0 -#define FUNCTION_SUM 1 -#define FUNCTION_AVG 2 -#define FUNCTION_MIN 3 -#define FUNCTION_MAX 4 -#define FUNCTION_STDDEV 5 -#define FUNCTION_PERCT 6 -#define FUNCTION_APERCT 7 -#define FUNCTION_FIRST 8 -#define FUNCTION_LAST 9 -#define FUNCTION_LAST_ROW 10 -#define FUNCTION_TOP 11 -#define FUNCTION_BOTTOM 12 -#define FUNCTION_SPREAD 13 -#define FUNCTION_TWA 14 -#define FUNCTION_LEASTSQR 15 - -#define FUNCTION_TS 16 -#define FUNCTION_TS_DUMMY 17 -#define FUNCTION_TAG_DUMMY 18 -#define FUNCTION_TS_COMP 19 - -#define FUNCTION_TAG 20 -#define FUNCTION_PRJ 21 - -#define FUNCTION_TAGPRJ 22 -#define FUNCTION_ARITHM 23 -#define FUNCTION_DIFF 24 - -#define FUNCTION_FIRST_DST 25 -#define FUNCTION_LAST_DST 26 -#define FUNCTION_STDDEV_DST 27 -#define FUNCTION_INTERP 28 - -#define FUNCTION_RATE 29 -#define FUNCTION_IRATE 30 -#define FUNCTION_TID_TAG 31 -#define FUNCTION_DERIVATIVE 32 -#define FUNCTION_BLKINFO 33 - - -#define FUNCTION_COV 38 - typedef struct SResultRowEntryInfo { bool initialized:1; // output buffer has been initialized bool complete:1; // query has completed @@ -180,10 +136,9 @@ typedef struct SqlFunctionCtx { char *pOutput; // final result output buffer, point to sdata->data int32_t numOfParams; SFunctParam *param; // input parameter, e.g., top(k, 20), the number of results for top query is kept in param - int64_t *ptsList; // corresponding timestamp array list + int64_t *ptsList; // corresponding timestamp array list, todo remove it SColumnInfoData *pTsOutput; // corresponding output buffer for timestamp of each result, e.g., top/bottom*/ int32_t offset; - SVariant tag; struct SResultRowEntryInfo *resultInfo; SSubsidiaryResInfo subsidiaries; SPoint1 start; diff --git a/include/util/tlist.h b/include/util/tlist.h index 43833d7ecd..1954bda145 100644 --- a/include/util/tlist.h +++ b/include/util/tlist.h @@ -229,7 +229,7 @@ int32_t tdListAppend(SList *list, void *data); SListNode *tdListPopHead(SList *list); SListNode *tdListPopTail(SList *list); SListNode *tdListGetHead(SList *list); -SListNode *tsListGetTail(SList *list); +SListNode *tdListGetTail(SList *list); SListNode *tdListPopNode(SList *list, SListNode *node); void tdListMove(SList *src, SList *dst); void tdListDiscard(SList *list); diff --git a/source/libs/executor/inc/executil.h b/source/libs/executor/inc/executil.h index 63c398618f..dcd4e8f918 100644 --- a/source/libs/executor/inc/executil.h +++ b/source/libs/executor/inc/executil.h @@ -75,6 +75,7 @@ typedef struct SResultRowInfo { int32_t size; // number of result set int32_t capacity; // max capacity SResultRowPosition cur; + SList* openWindow; } SResultRowInfo; struct SqlFunctionCtx; diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 9e15f3b12d..3f25939bd2 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -451,15 +451,15 @@ typedef struct SIntervalAggOperatorInfo { int32_t primaryTsIndex; // primary time stamp slot id from result of downstream operator. STimeWindow win; // query time range bool timeWindowInterpo; // interpolation needed or not - char** pRow; // previous row/tuple of already processed datablock + SArray* pInterpCols; // interpolation columns SAggSupporter aggSup; // aggregate supporter STableQueryInfo* pCurrent; // current tableQueryInfo struct int32_t order; // current SSDataBlock scan order EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model] SArray* pUpdatedWindow; // updated time window due to the input data block from the downstream operator. STimeWindowAggSupp twAggSup; - struct SFillInfo* pFillInfo; // fill info bool invertible; + SArray* pPrevValues; // SArray used to keep the previous not null value for interpolation. } SIntervalAggOperatorInfo; typedef struct SStreamFinalIntervalOperatorInfo { @@ -802,7 +802,7 @@ int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order); int32_t initCacheSupporter(SCatchSupporter* pCatchSup, size_t rowSize, const char* pKey, const char* pDir); int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey); -SResultRow* getNewResultRow_rv(SDiskbasedBuf* pResultBuf, int64_t tableGroupId, int32_t interBufSize); +SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int64_t tableGroupId, int32_t interBufSize); SResultWindowInfo* getSessionTimeWindow(SArray* pWinInfos, TSKEY ts, int64_t gap, int32_t* pIndex); int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pTs, int32_t rows, diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 5a02547f58..b8915ae218 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -258,32 +258,6 @@ int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo) { return (int32_t) taosArrayGetSize(pGroupResInfo->pRows); } -static int64_t getNumOfResultWindowRes(STaskRuntimeEnv* pRuntimeEnv, SResultRowPosition *pos, int32_t* rowCellInfoOffset) { - STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; - ASSERT(0); - - for (int32_t j = 0; j < pQueryAttr->numOfOutput; ++j) { - int32_t functionId = 0;//pQueryAttr->pExpr1[j].base.functionId; - - /* - * ts, tag, tagprj function can not decide the output number of current query - * the number of output result is decided by main output - */ - if (functionId == FUNCTION_TS || functionId == FUNCTION_TAG || functionId == FUNCTION_TAGPRJ) { - continue; - } - -// SResultRowEntryInfo *pResultInfo = getResultCell(pResultRow, j, rowCellInfoOffset); -// assert(pResultInfo != NULL); -// -// if (pResultInfo->numOfRes > 0) { -// return pResultInfo->numOfRes; -// } - } - - return 0; -} - static int32_t tableResultComparFn(const void *pLeft, const void *pRight, void *param) { int32_t left = *(int32_t *)pLeft; int32_t right = *(int32_t *)pRight; @@ -381,7 +355,7 @@ static int32_t mergeIntoGroupResultImplRv(STaskRuntimeEnv *pRuntimeEnv, SGroupRe } - int64_t num = getNumOfResultWindowRes(pRuntimeEnv, &pResultRowCell->pos, rowCellInfoOffset); + int64_t num = 0;//getNumOfResultWindowRes(pRuntimeEnv, &pResultRowCell->pos, rowCellInfoOffset); if (num <= 0) { continue; } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 3e52ff1836..9c5f93613a 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -239,36 +239,6 @@ static bool hasNull(SColumn* pColumn, SColumnDataAgg* pStatis) { return true; } -static void prepareResultListBuffer(SResultRowInfo* pResultRowInfo, jmp_buf env) { - int64_t newCapacity = 0; - - // more than the capacity, reallocate the resources - if (pResultRowInfo->size < pResultRowInfo->capacity) { - return; - } - - if (pResultRowInfo->capacity > 10000) { - newCapacity = (int64_t)(pResultRowInfo->capacity * 1.25); - } else { - newCapacity = (int64_t)(pResultRowInfo->capacity * 1.5); - } - - if (newCapacity <= pResultRowInfo->capacity) { - newCapacity += 4; - } - - char* p = taosMemoryRealloc(pResultRowInfo->pPosition, newCapacity * sizeof(SResultRowPosition)); - if (p == NULL) { - longjmp(env, TSDB_CODE_OUT_OF_MEMORY); - } - - pResultRowInfo->pPosition = (SResultRowPosition*)p; - - int32_t inc = (int32_t)newCapacity - pResultRowInfo->capacity; - memset(&pResultRowInfo->pPosition[pResultRowInfo->capacity], 0, sizeof(SResultRowPosition) * inc); - pResultRowInfo->capacity = (int32_t)newCapacity; -} - static bool chkResultRowFromKey(STaskRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo, char* pData, int16_t bytes, bool masterscan, uint64_t uid) { bool existed = false; @@ -306,7 +276,7 @@ static bool chkResultRowFromKey(STaskRuntimeEnv* pRuntimeEnv, SResultRowInfo* pR return p1 != NULL; } -SResultRow* getNewResultRow_rv(SDiskbasedBuf* pResultBuf, int64_t tableGroupId, int32_t interBufSize) { +SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int64_t tableGroupId, int32_t interBufSize) { SFilePage* pData = NULL; // in the first scan, new space needed for results @@ -392,25 +362,22 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR } // allocate a new buffer page - prepareResultListBuffer(pResultRowInfo, pTaskInfo->env); if (pResult == NULL) { ASSERT(pSup->resultRowSize > 0); - pResult = getNewResultRow_rv(pResultBuf, groupId, pSup->resultRowSize); + pResult = getNewResultRow(pResultBuf, groupId, pSup->resultRowSize); + initResultRow(pResult); // add a new result set for a new group SResultRowPosition pos = {.pageId = pResult->pageId, .offset = pResult->offset}; - taosHashPut(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), &pos, - sizeof(SResultRowPosition)); + taosHashPut(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), &pos, sizeof(SResultRowPosition)); } // 2. set the new time window to be the new active time window - pResultRowInfo->pPosition[pResultRowInfo->size++] = - (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset}; pResultRowInfo->cur = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset}; // too many time window in query - if (pResultRowInfo->size > MAX_INTERVAL_TIME_WINDOW) { + if (taosHashGetSize(pSup->pResultRowHashTable) > MAX_INTERVAL_TIME_WINDOW) { longjmp(pTaskInfo->env, TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW); } @@ -1218,7 +1185,6 @@ static void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput) { taosVariantDestroy(&pCtx[i].param[j].param); } - taosVariantDestroy(&pCtx[i].tag); taosMemoryFreeClear(pCtx[i].subsidiaries.pCtx); taosMemoryFree(pCtx[i].input.pData); taosMemoryFree(pCtx[i].input.pColumnDataAgg); @@ -1248,9 +1214,9 @@ void setTaskKilled(SExecTaskInfo* pTaskInfo) { pTaskInfo->code = TSDB_CODE_TSC_Q static bool isCachedLastQuery(STaskAttr* pQueryAttr) { for (int32_t i = 0; i < pQueryAttr->numOfOutput; ++i) { int32_t functionId = getExprFunctionId(&pQueryAttr->pExpr1[i]); - if (functionId == FUNCTION_LAST || functionId == FUNCTION_LAST_DST) { - continue; - } +// if (functionId == FUNCTION_LAST || functionId == FUNCTION_LAST_DST) { +// continue; +// } return false; } @@ -1300,7 +1266,7 @@ static int32_t updateBlockLoadStatus(STaskAttr* pQuery, int32_t status) { for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { int32_t functionId = getExprFunctionId(&pQuery->pExpr1[i]); - +#if 0 if (functionId == FUNCTION_TS || functionId == FUNCTION_TS_DUMMY || functionId == FUNCTION_TAG || functionId == FUNCTION_TAG_DUMMY) { continue; @@ -1311,6 +1277,8 @@ static int32_t updateBlockLoadStatus(STaskAttr* pQuery, int32_t status) { } else { hasOtherFunc = true; } +#endif + } if (hasFirstLastFunc && status == BLK_DATA_NOT_LOAD) { @@ -1786,41 +1754,13 @@ void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t* bufCapacity, int32_t numOf // set the correct pointer after the memory buffer reallocated. int32_t functionId = pBInfo->pCtx[i].functionId; - +#if 0 if (functionId == FUNCTION_TOP || functionId == FUNCTION_BOTTOM || functionId == FUNCTION_DIFF || functionId == FUNCTION_DERIVATIVE) { // if (i > 0) pBInfo->pCtx[i].pTsOutput = pBInfo->pCtx[i - 1].pOutput; } - } -} +#endif -void copyTsColoum(SSDataBlock* pRes, SqlFunctionCtx* pCtx, int32_t numOfOutput) { - bool needCopyTs = false; - int32_t tsNum = 0; - char* src = NULL; - for (int32_t i = 0; i < numOfOutput; i++) { - int32_t functionId = pCtx[i].functionId; - if (functionId == FUNCTION_DIFF || functionId == FUNCTION_DERIVATIVE) { - needCopyTs = true; - if (i > 0 && pCtx[i - 1].functionId == FUNCTION_TS_DUMMY) { - SColumnInfoData* pColRes = taosArrayGet(pRes->pDataBlock, i - 1); // find ts data - src = pColRes->pData; - } - } else if (functionId == FUNCTION_TS_DUMMY) { - tsNum++; - } - } - - if (!needCopyTs) return; - if (tsNum < 2) return; - if (src == NULL) return; - - for (int32_t i = 0; i < numOfOutput; i++) { - int32_t functionId = pCtx[i].functionId; - if (functionId == FUNCTION_TS_DUMMY) { - SColumnInfoData* pColRes = taosArrayGet(pRes->pDataBlock, i); - memcpy(pColRes->pData, src, pColRes->info.bytes * pRes->info.rows); - } } } @@ -3569,7 +3509,7 @@ bool aggDecodeResultRow(SOperatorInfo* pOperator, SAggSupporter* pSup, SOptrBasi offset += sizeof(int32_t); uint64_t tableGroupId = *(uint64_t*)(result + offset); - SResultRow* resultRow = getNewResultRow_rv(pSup->pResultBuf, tableGroupId, pSup->resultRowSize); + SResultRow* resultRow = getNewResultRow(pSup->pResultBuf, tableGroupId, pSup->resultRowSize); if (!resultRow) { longjmp(pOperator->pTaskInfo->env, TSDB_CODE_TSC_INVALID_INPUT); } @@ -3592,10 +3532,6 @@ bool aggDecodeResultRow(SOperatorInfo* pOperator, SAggSupporter* pSup, SOptrBasi offset += valueLen; initResultRow(resultRow); - prepareResultListBuffer(&pInfo->resultRowInfo, pOperator->pTaskInfo->env); - // pInfo->resultRowInfo.cur = pInfo->resultRowInfo.size; - // pInfo->resultRowInfo.pPosition[pInfo->resultRowInfo.size++] = - // (SResultRowPosition){.pageId = resultRow->pageId, .offset = resultRow->offset}; pInfo->resultRowInfo.cur = (SResultRowPosition){.pageId = resultRow->pageId, .offset = resultRow->offset}; } @@ -3887,18 +3823,6 @@ static SSDataBlock* doFill(SOperatorInfo* pOperator) { } } -// todo set the attribute of query scan count -static int32_t getNumOfScanTimes(STaskAttr* pQueryAttr) { - for (int32_t i = 0; i < pQueryAttr->numOfOutput; ++i) { - int32_t functionId = getExprFunctionId(&pQueryAttr->pExpr1[i]); - if (functionId == FUNCTION_STDDEV || functionId == FUNCTION_PERCT) { - return 2; - } - } - - return 1; -} - static void destroyOperatorInfo(SOperatorInfo* pOperator) { if (pOperator == NULL) { return; diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index ef770e8afc..7ba37c282d 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -110,9 +110,11 @@ static bool groupKeyCompare(SArray* pGroupCols, SArray* pGroupColVals, SSDataBlo return true; } -static void recordNewGroupKeys(SArray* pGroupCols, SArray* pGroupColVals, SSDataBlock* pBlock, int32_t rowIndex, int32_t numOfGroupCols) { +static void recordNewGroupKeys(SArray* pGroupCols, SArray* pGroupColVals, SSDataBlock* pBlock, int32_t rowIndex) { SColumnDataAgg* pColAgg = NULL; + size_t numOfGroupCols = taosArrayGetSize(pGroupCols); + for (int32_t i = 0; i < numOfGroupCols; ++i) { SColumn* pCol = taosArrayGet(pGroupCols, i); SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pCol->slotId); @@ -208,7 +210,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) { for (int32_t j = 0; j < pBlock->info.rows; ++j) { // Compare with the previous row of this column, and do not set the output buffer again if they are identical. if (!pInfo->isInit) { - recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j, numOfGroupCols); + recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j); pInfo->isInit = true; num++; continue; @@ -223,7 +225,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) { // The first row of a new block does not belongs to the previous existed group if (j == 0) { num++; - recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j, numOfGroupCols); + recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j); continue; } @@ -238,7 +240,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) { // assign the group keys or user input constant values if required doAssignGroupKeys(pCtx, pOperator->numOfExprs, pBlock->info.rows, rowIndex); - recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j, numOfGroupCols); + recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j); num = 1; } @@ -396,7 +398,7 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) { int32_t numOfGroupCols = taosArrayGetSize(pInfo->pGroupCols); for (int32_t j = 0; j < pBlock->info.rows; ++j) { - recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j, numOfGroupCols); + recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j); int32_t len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals); SDataGroupInfo* pGInfo = NULL; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index fe7668f370..11b50a38bc 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -534,8 +534,8 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, pInfo->pPseudoCtx = createSqlFunctionCtx(pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, &pInfo->rowCellInfoOffset); } - pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]}; - // pInfo->scanInfo = (SScanInfo){.numOfAsc = 0, .numOfDesc = 1}; // for debug purpose +// pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]}; + pInfo->scanInfo = (SScanInfo){.numOfAsc = 0, .numOfDesc = 1}; // for debug purpose pInfo->readHandle = *readHandle; pInfo->interval = extractIntervalInfo(pTableScanNode); diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 5d0a0a0270..a1a7aecf5f 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1,3 +1,4 @@ +#include #include "executorimpl.h" #include "functionMgt.h" #include "tdatablock.h" @@ -339,34 +340,40 @@ static void getNextTimeWindow(SInterval* pInterval, int32_t precision, int32_t o tw->ekey -= 1; } -void doTimeWindowInterpolation(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo, SArray* pDataBlock, TSKEY prevTs, +void doTimeWindowInterpolation(SIntervalAggOperatorInfo *pInfo, int32_t numOfExprs, SArray* pDataBlock, TSKEY prevTs, int32_t prevRowIndex, TSKEY curTs, int32_t curRowIndex, TSKEY windowKey, int32_t type) { - SExprInfo* pExpr = pOperator->pExpr; + SqlFunctionCtx* pCtx = pInfo->binfo.pCtx; - SqlFunctionCtx* pCtx = pInfo->pCtx; + int32_t index = 1; + for (int32_t k = 0; k < numOfExprs; ++k) { - for (int32_t k = 0; k < pOperator->numOfExprs; ++k) { - int32_t functionId = pCtx[k].functionId; - if (functionId != FUNCTION_TWA && functionId != FUNCTION_INTERP) { + // todo use flag instead of function name + if (strcmp(pCtx[k].pExpr->pExpr->_function.functionName, "twa") != 0) { pCtx[k].start.key = INT64_MIN; continue; } - SColIndex* pColIndex = NULL /*&pExpr[k].base.colInfo*/; - int16_t index = pColIndex->colIndex; - SColumnInfoData* pColInfo = taosArrayGet(pDataBlock, index); +// if (functionId != FUNCTION_TWA && functionId != FUNCTION_INTERP) { +// pCtx[k].start.key = INT64_MIN; +// continue; +// } + + SFunctParam* pParam = &pCtx[k].param[0]; + SColumnInfoData* pColInfo = taosArrayGet(pDataBlock, pParam->pCol->slotId); + + ASSERT(pColInfo->info.colId == pParam->pCol->colId && curTs != windowKey); - // assert(pColInfo->info.colId == pColIndex->info.colId && curTs != windowKey); double v1 = 0, v2 = 0, v = 0; - if (prevRowIndex == -1) { - // GET_TYPED_DATA(v1, double, pColInfo->info.type, (char*)pRuntimeEnv->prevRow[index]); + SGroupKeys* p = taosArrayGet(pInfo->pPrevValues, index); + GET_TYPED_DATA(v1, double, pColInfo->info.type, p->pData); } else { - GET_TYPED_DATA(v1, double, pColInfo->info.type, (char*)pColInfo->pData + prevRowIndex * pColInfo->info.bytes); + GET_TYPED_DATA(v1, double, pColInfo->info.type, colDataGetData(pColInfo, prevRowIndex)); } - GET_TYPED_DATA(v2, double, pColInfo->info.type, (char*)pColInfo->pData + curRowIndex * pColInfo->info.bytes); + GET_TYPED_DATA(v2, double, pColInfo->info.type, colDataGetData(pColInfo, curRowIndex)); +#if 0 if (functionId == FUNCTION_INTERP) { if (type == RESULT_ROW_START_INTERP) { pCtx[k].start.key = prevTs; @@ -386,6 +393,8 @@ void doTimeWindowInterpolation(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo, } } } else if (functionId == FUNCTION_TWA) { +#endif + SPoint point1 = (SPoint){.key = prevTs, .val = &v1}; SPoint point2 = (SPoint){.key = curTs, .val = &v2}; SPoint point = (SPoint){.key = windowKey, .val = &v}; @@ -399,8 +408,13 @@ void doTimeWindowInterpolation(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo, pCtx[k].end.key = point.key; pCtx[k].end.val = v; } + + index += 1; } +#if 0 } +#endif + } static void setNotInterpoWindowKey(SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t type) { @@ -415,53 +429,53 @@ static void setNotInterpoWindowKey(SqlFunctionCtx* pCtx, int32_t numOfOutput, in } } -static bool setTimeWindowInterpolationStartTs(SOperatorInfo* pOperatorInfo, SqlFunctionCtx* pCtx, int32_t pos, - int32_t numOfRows, SArray* pDataBlock, const TSKEY* tsCols, - STimeWindow* win) { - bool ascQuery = true; +static bool setTimeWindowInterpolationStartTs(SIntervalAggOperatorInfo *pInfo, SqlFunctionCtx* pCtx, int32_t numOfExprs, int32_t pos, + SSDataBlock* pBlock, const TSKEY* tsCols, STimeWindow* win) { + int32_t numOfRows = pBlock->info.rows; + + bool ascQuery = (pInfo->order == TSDB_ORDER_ASC); + int32_t step = 1; // GET_FORWARD_DIRECTION_FACTOR(pQueryAttr->order.order); + TSKEY curTs = tsCols[pos]; - TSKEY lastTs = 0; //*(TSKEY*)pRuntimeEnv->prevRow[0]; + + SGroupKeys* pTsKey = taosArrayGet(pInfo->pPrevValues, 0); + TSKEY lastTs = *(int64_t*) pTsKey->pData; // lastTs == INT64_MIN and pos == 0 means this is the first time window, interpolation is not needed. // start exactly from this point, no need to do interpolation TSKEY key = ascQuery ? win->skey : win->ekey; if (key == curTs) { - setNotInterpoWindowKey(pCtx, pOperatorInfo->numOfExprs, RESULT_ROW_START_INTERP); + setNotInterpoWindowKey(pCtx, numOfExprs, RESULT_ROW_START_INTERP); return true; } - if (lastTs == INT64_MIN && ((pos == 0 && ascQuery) || (pos == (numOfRows - 1) && !ascQuery))) { - setNotInterpoWindowKey(pCtx, pOperatorInfo->numOfExprs, RESULT_ROW_START_INTERP); + if (lastTs == INT64_MIN && ((pos == 0 && ascQuery) || (pos == ( - 1) && !ascQuery))) { + setNotInterpoWindowKey(pCtx, numOfExprs, RESULT_ROW_START_INTERP); return true; } - int32_t step = 1; // GET_FORWARD_DIRECTION_FACTOR(pQueryAttr->order.order); TSKEY prevTs = ((pos == 0 && ascQuery) || (pos == (numOfRows - 1) && !ascQuery)) ? lastTs : tsCols[pos - step]; - doTimeWindowInterpolation(pOperatorInfo, pOperatorInfo->info, pDataBlock, prevTs, pos - step, curTs, pos, key, - RESULT_ROW_START_INTERP); + doTimeWindowInterpolation(pInfo, numOfExprs, pBlock->pDataBlock, prevTs, pos - step, curTs, pos, key, RESULT_ROW_START_INTERP); return true; } -static bool setTimeWindowInterpolationEndTs(SOperatorInfo* pOperatorInfo, SqlFunctionCtx* pCtx, int32_t endRowIndex, - SArray* pDataBlock, const TSKEY* tsCols, TSKEY blockEkey, - STimeWindow* win) { - int32_t order = TSDB_ORDER_ASC; - int32_t numOfOutput = pOperatorInfo->numOfExprs; +static bool setTimeWindowInterpolationEndTs(SIntervalAggOperatorInfo *pInfo, SqlFunctionCtx* pCtx, int32_t numOfExprs, int32_t endRowIndex, + SArray* pDataBlock, const TSKEY* tsCols, TSKEY blockEkey, STimeWindow* win) { + int32_t order = pInfo->order; TSKEY actualEndKey = tsCols[endRowIndex]; - TSKEY key = order ? win->ekey : win->skey; + TSKEY key = (order == TSDB_ORDER_ASC) ? win->ekey : win->skey; // not ended in current data block, do not invoke interpolation - if ((key > blockEkey /*&& QUERY_IS_ASC_QUERY(pQueryAttr)*/) || - (key < blockEkey /*&& !QUERY_IS_ASC_QUERY(pQueryAttr)*/)) { - setNotInterpoWindowKey(pCtx, numOfOutput, RESULT_ROW_END_INTERP); + if ((key > blockEkey && (order == TSDB_ORDER_ASC)) || (key < blockEkey && (order == TSDB_ORDER_DESC))) { + setNotInterpoWindowKey(pCtx, numOfExprs, RESULT_ROW_END_INTERP); return false; } // there is actual end point of current time window, no interpolation need if (key == actualEndKey) { - setNotInterpoWindowKey(pCtx, numOfOutput, RESULT_ROW_END_INTERP); + setNotInterpoWindowKey(pCtx, numOfExprs, RESULT_ROW_END_INTERP); return true; } @@ -470,7 +484,7 @@ static bool setTimeWindowInterpolationEndTs(SOperatorInfo* pOperatorInfo, SqlFun assert(nextRowIndex >= 0); TSKEY nextKey = tsCols[nextRowIndex]; - doTimeWindowInterpolation(pOperatorInfo, pOperatorInfo->info, pDataBlock, actualEndKey, endRowIndex, nextKey, + doTimeWindowInterpolation(pInfo, numOfExprs, pDataBlock, actualEndKey, endRowIndex, nextKey, nextRowIndex, key, RESULT_ROW_END_INTERP); return true; } @@ -542,8 +556,8 @@ static int32_t getNextQualifiedWindow(SInterval* pInterval, STimeWindow* pNext, return startPos; } -static bool resultRowInterpolated(SResultRow* pResult, SResultTsInterpType type) { - assert(pResult != NULL && (type == RESULT_ROW_START_INTERP || type == RESULT_ROW_END_INTERP)); +static bool isResultRowInterpolated(SResultRow* pResult, SResultTsInterpType type) { + ASSERT(pResult != NULL && (type == RESULT_ROW_START_INTERP || type == RESULT_ROW_END_INTERP)); if (type == RESULT_ROW_START_INTERP) { return pResult->startInterp == true; } else { @@ -560,34 +574,33 @@ static void setResultRowInterpo(SResultRow* pResult, SResultTsInterpType type) { } } -static void doWindowBorderInterpolation(SOperatorInfo* pOperatorInfo, SSDataBlock* pBlock, SqlFunctionCtx* pCtx, - SResultRow* pResult, STimeWindow* win, int32_t startPos, int32_t forwardStep, - int32_t order, bool timeWindowInterpo) { - if (!timeWindowInterpo) { +static void doWindowBorderInterpolation(SIntervalAggOperatorInfo *pInfo, SSDataBlock* pBlock, int32_t numOfExprs, SqlFunctionCtx* pCtx, + SResultRow* pResult, STimeWindow* win, int32_t startPos, int32_t forwardStep) { + if (!pInfo->timeWindowInterpo) { return; } assert(pBlock != NULL); - int32_t step = GET_FORWARD_DIRECTION_FACTOR(order); + int32_t step = GET_FORWARD_DIRECTION_FACTOR(pInfo->order); if (pBlock->pDataBlock == NULL) { // tscError("pBlock->pDataBlock == NULL"); return; } + // todo set the correct primary timestamp column index SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, 0); TSKEY* tsCols = (TSKEY*)(pColInfo->pData); - bool done = resultRowInterpolated(pResult, RESULT_ROW_START_INTERP); + bool done = isResultRowInterpolated(pResult, RESULT_ROW_START_INTERP); if (!done) { // it is not interpolated, now start to generated the interpolated value int32_t startRowIndex = startPos; - bool interp = setTimeWindowInterpolationStartTs(pOperatorInfo, pCtx, startRowIndex, pBlock->info.rows, - pBlock->pDataBlock, tsCols, win); + bool interp = setTimeWindowInterpolationStartTs(pInfo, pCtx, numOfExprs, startRowIndex, pBlock, tsCols, win); if (interp) { setResultRowInterpo(pResult, RESULT_ROW_START_INTERP); } } else { - setNotInterpoWindowKey(pCtx, pOperatorInfo->numOfExprs, RESULT_ROW_START_INTERP); + setNotInterpoWindowKey(pCtx, numOfExprs, RESULT_ROW_START_INTERP); } // point interpolation does not require the end key time window interpolation. @@ -596,29 +609,48 @@ static void doWindowBorderInterpolation(SOperatorInfo* pOperatorInfo, SSDataBloc // } // interpolation query does not generate the time window end interpolation - done = resultRowInterpolated(pResult, RESULT_ROW_END_INTERP); + done = isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP); if (!done) { int32_t endRowIndex = startPos + (forwardStep - 1) * step; - TSKEY endKey = (order == TSDB_ORDER_ASC) ? pBlock->info.window.ekey : pBlock->info.window.skey; + TSKEY endKey = (pInfo->order == TSDB_ORDER_ASC) ? pBlock->info.window.ekey : pBlock->info.window.skey; bool interp = - setTimeWindowInterpolationEndTs(pOperatorInfo, pCtx, endRowIndex, pBlock->pDataBlock, tsCols, endKey, win); + setTimeWindowInterpolationEndTs(pInfo, pCtx, numOfExprs, endRowIndex, pBlock->pDataBlock, tsCols, endKey, win); if (interp) { setResultRowInterpo(pResult, RESULT_ROW_END_INTERP); } } else { - setNotInterpoWindowKey(pCtx, pOperatorInfo->numOfExprs, RESULT_ROW_END_INTERP); + setNotInterpoWindowKey(pCtx, numOfExprs, RESULT_ROW_END_INTERP); } } -static void saveDataBlockLastRow(char** pRow, SArray* pDataBlock, int32_t rowIndex, int32_t numOfCols) { - if (pDataBlock == NULL) { +static void saveDataBlockLastRow(SArray* pPrevKeys, const SSDataBlock* pBlock, SArray* pCols) { + if (pBlock->pDataBlock == NULL) { return; } - for (int32_t k = 0; k < numOfCols; ++k) { - SColumnInfoData* pColInfo = taosArrayGet(pDataBlock, k); - memcpy(pRow[k], ((char*)pColInfo->pData) + (pColInfo->info.bytes * rowIndex), pColInfo->info.bytes); + size_t num = taosArrayGetSize(pPrevKeys); + for (int32_t k = 0; k < num; ++k) { + SColumn* pc = taosArrayGet(pCols, k); + + SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, pc->slotId); + + SGroupKeys* pkey = taosArrayGet(pPrevKeys, k); + for(int32_t i = pBlock->info.rows - 1; i >= 0; --i) { + if (colDataIsNull_s(pColInfo, i)) { + continue; + } + + char* val = colDataGetData(pColInfo, i); + if (IS_VAR_DATA_TYPE(pkey->type)) { + memcpy(pkey->pData, val, varDataTLen(val)); + ASSERT(varDataTLen(val) <= pkey->bytes); + } else { + memcpy(pkey->pData, val, pkey->bytes); + } + + break; + } } } @@ -635,7 +667,8 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe } int32_t step = 1; - bool ascScan = (pInfo->order == TSDB_ORDER_ASC); + + bool ascScan = (pInfo->order == TSDB_ORDER_ASC); // int32_t prevIndex = pResultRowInfo->curPos; @@ -679,48 +712,69 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe ASSERT(forwardStep > 0); // prev time window not interpolation yet. - // int32_t curIndex = pResultRowInfo->curPos; + SResultRowPosition pos = {0}; + if (pInfo->timeWindowInterpo) { + pos = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset}; + SListNode* pn = tdListGetTail(pResultRowInfo->openWindow); -#if 0 - if (prevIndex != -1 && prevIndex < curIndex && pInfo->timeWindowInterpo) { - for (int32_t j = prevIndex; j < curIndex; ++j) { // previous time window may be all closed already. - SResultRow* pRes = getResultRow(pResultRowInfo, j); - if (pRes->closed) { - assert(resultRowInterpolated(pRes, RESULT_ROW_START_INTERP) && resultRowInterpolated(pRes, RESULT_ROW_END_INTERP)); + SResultRowPosition* px = (SResultRowPosition*)pn->data; + + // do nothing + if (pn != NULL && px->pageId == pos.pageId && px->offset == pos.offset) { + + } else { + tdListAppend(pResultRowInfo->openWindow, &pos); + } + } + + if (pInfo->timeWindowInterpo) { + while (1) { + SListNode* pn = tdListGetHead(pResultRowInfo->openWindow); + SResultRowPosition* p1 = (SResultRowPosition*)pn->data; + if (p1->pageId == pos.pageId && p1->offset == pos.offset) { + break; + } + + SResultRow* pr = getResultRowByPos(pInfo->aggSup.pResultBuf, p1); + if (pr->closed) { + ASSERT(isResultRowInterpolated(pr, RESULT_ROW_START_INTERP) && isResultRowInterpolated(pr, RESULT_ROW_END_INTERP)); + tdListPopHead(pResultRowInfo->openWindow); continue; } - STimeWindow w = pRes->win; - ret = setTimeWindowOutputBuf(pResultRowInfo, pBlock->info.uid, &w, masterScan, &pResult, tableGroupId, - pInfo->binfo.pCtx, numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, - pTaskInfo); + STimeWindow w = pr->win; + ret = setTimeWindowOutputBuf(pResultRowInfo, &w, masterScan, &pResult, tableGroupId, pInfo->binfo.pCtx, + numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, pTaskInfo); if (ret != TSDB_CODE_SUCCESS) { longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } - assert(!resultRowInterpolated(pResult, RESULT_ROW_END_INTERP)); - doTimeWindowInterpolation(pOperatorInfo, &pInfo->binfo, pBlock->pDataBlock, *(TSKEY*)pInfo->pRow[0], -1, - tsCols[startPos], startPos, w.ekey, RESULT_ROW_END_INTERP); + ASSERT(!isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)); + + SGroupKeys *pTsKey = taosArrayGet(pInfo->pPrevValues, 0); + int64_t prevTs = *(int64_t*) pTsKey->pData; + doTimeWindowInterpolation(pInfo, numOfOutput, pBlock->pDataBlock, prevTs, -1, tsCols[startPos], startPos, + w.ekey, RESULT_ROW_END_INTERP); setResultRowInterpo(pResult, RESULT_ROW_END_INTERP); setNotInterpoWindowKey(pInfo->binfo.pCtx, pOperatorInfo->numOfExprs, RESULT_ROW_START_INTERP); - doApplyFunctions(pInfo->binfo.pCtx, &w, &pInfo->timeWindowData, startPos, 0, tsCols, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); + doApplyFunctions(pTaskInfo, pInfo->binfo.pCtx, &w, &pInfo->twAggSup.timeWindowData, startPos, forwardStep, tsCols, + pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); + tdListPopHead(pResultRowInfo->openWindow); } + // todo keep current outputbuffer info to avoid repeatly set the buffer info // restore current time window - ret = setTimeWindowOutputBuf(pResultRowInfo, pBlock->info.uid, &win, masterScan, &pResult, tableGroupId, - pInfo->binfo.pCtx, numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, - pTaskInfo); + ret = setTimeWindowOutputBuf(pResultRowInfo, &win, masterScan, &pResult, tableGroupId, pInfo->binfo.pCtx, + numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, pTaskInfo); if (ret != TSDB_CODE_SUCCESS) { longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } - } -#endif - // window start key interpolation - doWindowBorderInterpolation(pOperatorInfo, pBlock, pInfo->binfo.pCtx, pResult, &win, startPos, forwardStep, - pInfo->order, false); + // window start key interpolation + doWindowBorderInterpolation(pInfo, pBlock, numOfOutput, pInfo->binfo.pCtx, pResult, &win, startPos, forwardStep); + } updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &win, true); doApplyFunctions(pTaskInfo, pInfo->binfo.pCtx, &win, &pInfo->twAggSup.timeWindowData, startPos, forwardStep, tsCols, @@ -743,12 +797,12 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe } if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM) { - SResKeyPos* pos = taosMemoryMalloc(sizeof(SResKeyPos) + sizeof(uint64_t)); - pos->groupId = tableGroupId; - pos->pos = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset}; - *(int64_t*)pos->key = pResult->win.skey; + SResKeyPos* pPos = taosMemoryMalloc(sizeof(SResKeyPos) + sizeof(uint64_t)); + pPos->groupId = tableGroupId; + pPos->pos = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset}; + *(int64_t*)pPos->key = pResult->win.skey; - taosArrayPush(pUpdated, &pos); + taosArrayPush(pUpdated, &pPos); } ekey = ascScan? nextWin.ekey:nextWin.skey; @@ -756,8 +810,7 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, pInfo->order); // window start(end) key interpolation - doWindowBorderInterpolation(pOperatorInfo, pBlock, pInfo->binfo.pCtx, pResult, &nextWin, startPos, forwardStep, - pInfo->order, false); + doWindowBorderInterpolation(pInfo, pBlock, numOfOutput, pInfo->binfo.pCtx, pResult, &nextWin, startPos, forwardStep); updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true); doApplyFunctions(pTaskInfo, pInfo->binfo.pCtx, &nextWin, &pInfo->twAggSup.timeWindowData, startPos, forwardStep, tsCols, @@ -765,8 +818,8 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe } if (pInfo->timeWindowInterpo) { - int32_t rowIndex = ascScan ? (pBlock->info.rows - 1) : 0; - saveDataBlockLastRow(pInfo->pRow, pBlock->pDataBlock, rowIndex, pBlock->info.numOfCols); + int32_t rowIndex = ascScan? (pBlock->info.rows - 1) : 0; + saveDataBlockLastRow(pInfo->pPrevValues, pBlock, pInfo->pInterpCols); } return pUpdated; @@ -1040,8 +1093,7 @@ static void setInverFunction(SqlFunctionCtx* pCtx, int32_t num, EStreamType type } } -void doClearWindowImpl(SResultRowPosition* p1, SDiskbasedBuf* pResultBuf, - SOptrBasicInfo* pBinfo, int32_t numOfOutput) { +void doClearWindowImpl(SResultRowPosition* p1, SDiskbasedBuf* pResultBuf, SOptrBasicInfo* pBinfo, int32_t numOfOutput) { SResultRow* pResult = getResultRowByPos(pResultBuf, p1); SqlFunctionCtx* pCtx = pBinfo->pCtx; for (int32_t i = 0; i < numOfOutput; ++i) { @@ -1169,7 +1221,7 @@ void destroyStreamFinalIntervalOperatorInfo(void* param, int32_t numOfOutput) { } } -bool allInvertible(SqlFunctionCtx* pFCtx, int32_t numOfCols) { +static bool allInvertible(SqlFunctionCtx* pFCtx, int32_t numOfCols) { for (int32_t i = 0; i < numOfCols; i++) { if (!fmIsInvertible(pFCtx[i].functionId)) { return false; @@ -1178,6 +1230,50 @@ bool allInvertible(SqlFunctionCtx* pFCtx, int32_t numOfCols) { return true; } +static bool timeWindowinterpNeeded(SqlFunctionCtx* pCtx, int32_t numOfCols, SArray** pInterpList, SArray** pPrevCols) { + // the primary timestamp column + bool needed = false; + *pInterpList = taosArrayInit(4, sizeof(SColumn)); + *pPrevCols = taosArrayInit(4, sizeof(SGroupKeys)); + + { // ts column + SColumn c = {0}; + c.colId = 1; + c.slotId = 0; + c.type = TSDB_DATA_TYPE_TIMESTAMP; + c.bytes = sizeof(int64_t); + taosArrayPush(*pInterpList, &c); + + SGroupKeys key = {0}; + key.bytes = c.bytes; + key.type = c.type; + key.isNull = false; + key.pData = taosMemoryCalloc(1, c.bytes); + taosArrayPush(*pPrevCols, &key); + } + + for(int32_t i = 0; i < numOfCols; ++i) { + SExprInfo* pExpr = pCtx[i].pExpr; + + if (strcmp(pExpr->pExpr->_function.functionName, "twa") == 0) { + SFunctParam* pParam = &pExpr->base.pParam[0]; + + SColumn c = *pParam->pCol; + taosArrayPush(*pInterpList, &c); + needed = true; + + SGroupKeys key = {0}; + key.bytes = c.bytes; + key.type = c.type; + key.isNull = false; + key.pData = taosMemoryCalloc(1, c.bytes); + taosArrayPush(*pPrevCols, &key); + } + } + + return needed; +} + SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId, STimeWindowAggSupp* pTwAggSupp, SExecTaskInfo* pTaskInfo) { @@ -1187,11 +1283,12 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* goto _error; } - pInfo->order = TSDB_ORDER_ASC; - pInfo->interval = *pInterval; + pInfo->win = pTaskInfo->window; + pInfo->order = TSDB_ORDER_ASC; + pInfo->interval = *pInterval; pInfo->execModel = pTaskInfo->execModel; - pInfo->win = pTaskInfo->window; - pInfo->twAggSup = *pTwAggSupp; + pInfo->twAggSup = *pTwAggSupp; + pInfo->primaryTsIndex = primaryTsSlotId; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; @@ -1201,23 +1298,30 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, pResBlock, keyBufSize, pTaskInfo->id.str); initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pInfo->win); + pInfo->invertible = allInvertible(pInfo->binfo.pCtx, numOfCols); pInfo->invertible = false; // Todo(liuyao): Dependent TSDB API - if (code != TSDB_CODE_SUCCESS) { + pInfo->timeWindowInterpo = timeWindowinterpNeeded(pInfo->binfo.pCtx, numOfCols, &pInfo->pInterpCols, &pInfo->pPrevValues); + if (pInfo->timeWindowInterpo) { + pInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SResultRowPosition)); + } + + // pInfo->pTableQueryInfo = initTableQueryInfo(pTableGroupInfo); + if (code != TSDB_CODE_SUCCESS /* || pInfo->pTableQueryInfo == NULL*/) { goto _error; } initResultRowInfo(&pInfo->binfo.resultRowInfo, (int32_t)1); - pOperator->name = "TimeIntervalAggOperator"; + pOperator->name = "TimeIntervalAggOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_INTERVAL; - pOperator->blocking = true; - pOperator->status = OP_NOT_OPENED; - pOperator->pExpr = pExprInfo; - pOperator->pTaskInfo = pTaskInfo; - pOperator->numOfExprs = numOfCols; - pOperator->info = pInfo; + pOperator->blocking = true; + pOperator->status = OP_NOT_OPENED; + pOperator->pExpr = pExprInfo; + pOperator->pTaskInfo = pTaskInfo; + pOperator->numOfExprs = numOfCols; + pOperator->info = pInfo; pOperator->fpSet = createOperatorFpSet(doOpenIntervalAgg, doBuildIntervalResult, doStreamIntervalAgg, NULL, destroyIntervalOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL); @@ -1704,8 +1808,8 @@ static SArray* doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataB forwardStep = getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, nextWin.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC); // window start(end) key interpolation - doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->binfo.pCtx, pResult, &nextWin, startPos, forwardStep, - pInfo->order, false); + // disable it temporarily +// doWindowBorderInterpolation(pInfo, pSDataBlock, numOfOutput, pInfo->binfo.pCtx, pResult, &nextWin, startPos, forwardStep); updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true); doApplyFunctions(pTaskInfo, pInfo->binfo.pCtx, &nextWin, &pInfo->twAggSup.timeWindowData, startPos, forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); @@ -2082,7 +2186,7 @@ static int32_t setWindowOutputBuf(SResultWindowInfo* pWinInfo, SResultRow** pRes } if (pWinInfo->pos.pageId == -1) { - *pResult = getNewResultRow_rv(pAggSup->pResultBuf, groupId, pAggSup->resultRowSize); + *pResult = getNewResultRow(pAggSup->pResultBuf, groupId, pAggSup->resultRowSize); if (*pResult == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index cac86be917..68b83f4a19 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -140,6 +140,10 @@ bool uniqueFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) int32_t uniqueFunction(SqlFunctionCtx *pCtx); //int32_t uniqueFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); +bool getTwaFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); +bool twaFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); +int32_t twaFunction(SqlFunctionCtx *pCtx); +int32_t twaFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock); bool getSelectivityFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv); diff --git a/source/libs/function/inc/taggfunction.h b/source/libs/function/inc/taggfunction.h index d779cf50f4..c3d61d426d 100644 --- a/source/libs/function/inc/taggfunction.h +++ b/source/libs/function/inc/taggfunction.h @@ -52,13 +52,6 @@ typedef struct SInterpInfoDetail { int8_t primaryCol; } SInterpInfoDetail; -typedef struct STwaInfo { - int8_t hasResult; // flag to denote has value - double dOutput; - SPoint1 p; - STimeWindow win; -} STwaInfo; - bool topbot_datablock_filter(SqlFunctionCtx *pCtx, const char *minval, const char *maxval); /** diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index c2b1fcd549..85e0d1d521 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -18,7 +18,6 @@ #include "querynodes.h" #include "scalar.h" #include "taoserror.h" -#include "tdatablock.h" static int32_t buildFuncErrMsg(char* pErrBuf, int32_t len, int32_t errCode, const char* pFormat, ...) { va_list vArgList; @@ -314,7 +313,7 @@ static int32_t translateElapsed(SFunctionNode* pFunc, char* pErrBuf, int32_t len pValue->notReserved = true; - uint8_t paraType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 1))->resType.type; + paraType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 1))->resType.type; if (!IS_INTEGER_TYPE(paraType)) { return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); } @@ -1119,6 +1118,16 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .finalizeFunc = firstLastFinalize, .combineFunc = lastCombine, }, + { + .name = "twa", + .type = FUNCTION_TYPE_TWA, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TIMELINE_FUNC, + .translateFunc = translateInNumOutDou, + .getEnvFunc = getTwaFuncEnv, + .initFunc = twaFunctionSetup, + .processFunc = twaFunction, + .finalizeFunc = twaFinalize + }, { .name = "histogram", .type = FUNCTION_TYPE_HISTOGRAM, diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index e2ee6cc6fe..78e057f7a8 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -225,7 +225,6 @@ typedef struct SUniqueInfo { int32_t numOfPoints; uint8_t colType; int16_t colBytes; - bool hasNull; //null is not hashable, handle separately SHashObj *pHash; char pItems[]; } SUniqueInfo; @@ -300,7 +299,7 @@ int32_t functionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); - //pResInfo->isNullRes = (pResInfo->numOfRes == 0) ? 1 : 0; + pResInfo->isNullRes = (pResInfo->numOfRes == 0) ? 1 : 0; char* in = GET_ROWCELL_INTERBUF(pResInfo); colDataAppend(pCol, pBlock->info.rows, in, pResInfo->isNullRes); @@ -829,8 +828,10 @@ int32_t avgCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) { int32_t avgFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { SInputColumnInfoData* pInput = &pCtx->input; - int32_t type = pInput->pData[0]->info.type; - SAvgRes* pAvgRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); + + int32_t type = pInput->pData[0]->info.type; + SAvgRes* pAvgRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); + if (IS_INTEGER_TYPE(type)) { pAvgRes->result = pAvgRes->sum.isum / ((double)pAvgRes->count); } else { @@ -1827,7 +1828,7 @@ bool percentileFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultI } int32_t percentileFunction(SqlFunctionCtx* pCtx) { - int32_t notNullElems = 0; + int32_t numOfElems = 0; SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); SInputColumnInfoData* pInput = &pCtx->input; @@ -1905,11 +1906,11 @@ int32_t percentileFunction(SqlFunctionCtx* pCtx) { } char* data = colDataGetData(pCol, i); - notNullElems += 1; + numOfElems += 1; tMemBucketPut(pInfo->pMemBucket, data, 1); } - SET_VAL(pResInfo, notNullElems, 1); + SET_VAL(pResInfo, numOfElems, 1); } return TSDB_CODE_SUCCESS; @@ -1983,7 +1984,7 @@ bool apercentileFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResult } int32_t apercentileFunction(SqlFunctionCtx* pCtx) { - int32_t notNullElems = 0; + int32_t numOfElems = 0; SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); SInputColumnInfoData* pInput = &pCtx->input; @@ -2000,7 +2001,7 @@ int32_t apercentileFunction(SqlFunctionCtx* pCtx) { if (colDataIsNull_f(pCol->nullbitmap, i)) { continue; } - notNullElems += 1; + numOfElems += 1; char* data = colDataGetData(pCol, i); double v = 0; // value @@ -2013,7 +2014,7 @@ int32_t apercentileFunction(SqlFunctionCtx* pCtx) { if (colDataIsNull_f(pCol->nullbitmap, i)) { continue; } - notNullElems += 1; + numOfElems += 1; char* data = colDataGetData(pCol, i); double v = 0; @@ -2022,7 +2023,7 @@ int32_t apercentileFunction(SqlFunctionCtx* pCtx) { } } - SET_VAL(pResInfo, notNullElems, 1); + SET_VAL(pResInfo, numOfElems, 1); return TSDB_CODE_SUCCESS; } @@ -3690,7 +3691,6 @@ int32_t sampleFunction(SqlFunctionCtx* pCtx) { TSKEY* tsList = (int64_t*)pInput->pPTS->pData; SColumnInfoData* pInputCol = pInput->pData[0]; - SColumnInfoData* pTsOutput = pCtx->pTsOutput; SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput; int32_t startOffset = pCtx->offset; @@ -3713,24 +3713,6 @@ int32_t sampleFunction(SqlFunctionCtx* pCtx) { return pInfo->numSampled; } -//int32_t sampleFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { -// SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); -// SSampleInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); -// int32_t slotId = pCtx->pExpr->base.resSchema.slotId; -// SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); -// -// //int32_t currentRow = pBlock->info.rows; -// pResInfo->numOfRes = pInfo->numSampled; -// -// for (int32_t i = 0; i < pInfo->numSampled; ++i) { -// colDataAppend(pCol, i, pInfo->data + i * pInfo->colBytes, false); -// //TODO: handle ts output -// } -// -// return pResInfo->numOfRes; -//} - - bool getTailFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { SColumnNode* pCol = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, 0); SValueNode* pVal = (SValueNode*)nodesListGetNode(pFunc->pParameterList, 1); @@ -3806,7 +3788,6 @@ int32_t tailFunction(SqlFunctionCtx* pCtx) { TSKEY* tsList = (int64_t*)pInput->pPTS->pData; SColumnInfoData* pInputCol = pInput->pData[0]; - SColumnInfoData* pTsOutput = pCtx->pTsOutput; SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput; int32_t startOffset = pCtx->offset; @@ -3873,7 +3854,6 @@ bool uniqueFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) { pInfo->numOfPoints = 0; pInfo->colType = pCtx->resDataInfo.type; pInfo->colBytes = pCtx->resDataInfo.bytes; - pInfo->hasNull = false; if (pInfo->pHash != NULL) { taosHashClear(pInfo->pHash); } else { @@ -3887,11 +3867,10 @@ static void doUniqueAdd(SUniqueInfo* pInfo, char *data, TSKEY ts, bool isNull) { if (isNull == true) { int32_t size = sizeof(SUniqueItem) + pInfo->colBytes; SUniqueItem *pItem = (SUniqueItem *)(pInfo->pItems + pInfo->numOfPoints * size); - if (pInfo->hasNull == false && pItem->isNull == false) { + if (pItem->isNull == false) { pItem->timestamp = ts; pItem->isNull = true; pInfo->numOfPoints++; - pInfo->hasNull = true; } else if (pItem->timestamp > ts && pItem->isNull == true) { pItem->timestamp = ts; } @@ -3911,8 +3890,6 @@ static void doUniqueAdd(SUniqueInfo* pInfo, char *data, TSKEY ts, bool isNull) { } else if (pHashItem->timestamp > ts) { pHashItem->timestamp = ts; } - - return; } int32_t uniqueFunction(SqlFunctionCtx* pCtx) { @@ -3954,7 +3931,7 @@ int32_t uniqueFunction(SqlFunctionCtx* pCtx) { int32_t uniqueFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); - SUniqueInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); + SUniqueInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo); int32_t slotId = pCtx->pExpr->base.resSchema.slotId; SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); @@ -3967,3 +3944,325 @@ int32_t uniqueFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { return pResInfo->numOfRes; } +typedef struct STwaInfo { + double dOutput; + int8_t hasResult; // flag to denote has value + SPoint1 p; + STimeWindow win; +} STwaInfo; + +bool getTwaFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv) { + pEnv->calcMemSize = sizeof(STwaInfo); + return true; +} + +bool twaFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) { + if (!functionSetup(pCtx, pResultInfo)) { + return false; + } + + STwaInfo *pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); + pInfo->p.key = INT64_MIN; + pInfo->win = TSWINDOW_INITIALIZER; + return true; +} + +static double twa_get_area(SPoint1 s, SPoint1 e) { + if ((s.val >= 0 && e.val >= 0)|| (s.val <=0 && e.val <= 0)) { + return (s.val + e.val) * (e.key - s.key) / 2; + } + + double x = (s.key * e.val - e.key * s.val)/(e.val - s.val); + double val = (s.val * (x - s.key) + e.val * (e.key - x)) / 2; + return val; +} + +int32_t twaFunction(SqlFunctionCtx* pCtx) { + SInputColumnInfoData* pInput = &pCtx->input; + SColumnInfoData* pInputCol = pInput->pData[0]; + + int32_t numOfElems = 0; + + TSKEY* tsList = (int64_t*)pInput->pPTS->pData; + + SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); + + STwaInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo); + + SPoint1* last = &pInfo->p; + + int32_t step = 1;//GET_FORWARD_DIRECTION_FACTOR(pCtx->order); + int32_t i = pInput->startRowIndex; + for (i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += step) { + if (!colDataIsNull_f(pInputCol->nullbitmap, i)) { + break; + } + } + + if (pCtx->start.key != INT64_MIN) { + ASSERT((pCtx->start.key < tsList[i] && pCtx->order == TSDB_ORDER_ASC) || + (pCtx->start.key > tsList[i] && pCtx->order == TSDB_ORDER_DESC)); + + ASSERT(last->key == INT64_MIN); + last->key = tsList[i]; + + GET_TYPED_DATA(last->val, double, pInputCol->info.type, colDataGetData(pInputCol, i)); + + pInfo->dOutput += twa_get_area(pCtx->start, *last); + + pInfo->hasResult = DATA_SET_FLAG; + pInfo->win.skey = pCtx->start.key; + numOfElems++; + i += step; + } else if (pInfo->p.key == INT64_MIN) { + last->key = tsList[i]; + GET_TYPED_DATA(last->val, double, pInputCol->info.type, colDataGetData(pInputCol, i)); + + pInfo->hasResult = DATA_SET_FLAG; + pInfo->win.skey = last->key; + numOfElems++; + i += step; + } + + // calculate the value of + switch(pInputCol->info.type) { + case TSDB_DATA_TYPE_TINYINT: { + int8_t *val = (int8_t*) colDataGetData(pInputCol, 0); + for (; i < pInput->numOfRows + pInput->startRowIndex; i += step) { + if (colDataIsNull_f(pInputCol->nullbitmap, i)) { + continue; + } + +#ifndef _TD_NINGSI_60 + SPoint1 st = {.key = tsList[i], .val = val[i]}; +#else + SPoint1 st; + st.key = tsList[i]; + st.val = val[i]; +#endif + pInfo->dOutput += twa_get_area(pInfo->p, st); + pInfo->p = st; + } + break; + } + + case TSDB_DATA_TYPE_SMALLINT: { + int16_t *val = (int16_t*) colDataGetData(pInputCol, 0); + for (; i < pInput->numOfRows + pInput->startRowIndex; i += step) { + if (colDataIsNull_f(pInputCol->nullbitmap, i)) { + continue; + } + +#ifndef _TD_NINGSI_60 + SPoint1 st = {.key = tsList[i], .val = val[i]}; +#else + SPoint1 st; + st.key = tsList[i]; + st.val = val[i]; +#endif + pInfo->dOutput += twa_get_area(pInfo->p, st); + pInfo->p = st; + } + break; + } + case TSDB_DATA_TYPE_INT: { + int32_t *val = (int32_t*) colDataGetData(pInputCol, 0); + for (; i < pInput->numOfRows + pInput->startRowIndex; i += step) { + if (colDataIsNull_f(pInputCol->nullbitmap, i)) { + continue; + } + +#ifndef _TD_NINGSI_60 + SPoint1 st = {.key = tsList[i], .val = val[i]}; +#else + SPoint1 st; + st.key = tsList[i]; + st.val = val[i]; +#endif + pInfo->dOutput += twa_get_area(pInfo->p, st); + pInfo->p = st; + } + break; + } + case TSDB_DATA_TYPE_BIGINT: { + int64_t *val = (int64_t*) colDataGetData(pInputCol, 0); + for (; i < pInput->numOfRows + pInput->startRowIndex; i += step) { + if (colDataIsNull_f(pInputCol->nullbitmap, i)) { + continue; + } + +#ifndef _TD_NINGSI_60 + SPoint1 st = {.key = tsList[i], .val = (double) val[i]}; +#else + SPoint1 st; + st.key = tsList[i]; + st.val = (double)val[i]; +#endif + pInfo->dOutput += twa_get_area(pInfo->p, st); + pInfo->p = st; + } + break; + } + case TSDB_DATA_TYPE_FLOAT: { + float *val = (float*) colDataGetData(pInputCol, 0); + for (; i < pInput->numOfRows + pInput->startRowIndex; i += step) { + if (colDataIsNull_f(pInputCol->nullbitmap, i)) { + continue; + } + +#ifndef _TD_NINGSI_60 + SPoint1 st = {.key = tsList[i], .val = val[i]}; +#else + SPoint1 st; + st.key = tsList[i]; + st.val = (double)val[i]; +#endif + pInfo->dOutput += twa_get_area(pInfo->p, st); + pInfo->p = st; + } + break; + } + case TSDB_DATA_TYPE_DOUBLE: { + double *val = (double*) colDataGetData(pInputCol, 0); + for (; i < pInput->numOfRows + pInput->startRowIndex; i += step) { + if (colDataIsNull_f(pInputCol->nullbitmap, i)) { + continue; + } + +#ifndef _TD_NINGSI_60 + SPoint1 st = {.key = tsList[i], .val = val[i]}; +#else + SPoint1 st; + st.key = tsList[i]; + st.val = val[i]; +#endif + pInfo->dOutput += twa_get_area(pInfo->p, st); + pInfo->p = st; + } + break; + } + case TSDB_DATA_TYPE_UTINYINT: { + uint8_t *val = (uint8_t*) colDataGetData(pInputCol, 0); + for (; i < pInput->numOfRows + pInput->startRowIndex; i += step) { + if (colDataIsNull_f(pInputCol->nullbitmap, i)) { + continue; + } + +#ifndef _TD_NINGSI_60 + SPoint1 st = {.key = tsList[i], .val = val[i]}; +#else + SPoint1 st; + st.key = tsList[i]; + st.val = val[i]; +#endif + pInfo->dOutput += twa_get_area(pInfo->p, st); + pInfo->p = st; + } + break; + } + case TSDB_DATA_TYPE_USMALLINT: { + uint16_t *val = (uint16_t*) colDataGetData(pInputCol, 0); + for (; i < pInput->numOfRows + pInput->startRowIndex; i += step) { + if (colDataIsNull_f(pInputCol->nullbitmap, i)) { + continue; + } + +#ifndef _TD_NINGSI_60 + SPoint1 st = {.key = tsList[i], .val = val[i]}; +#else + SPoint1 st; + st.key = tsList[i]; + st.val = val[i]; +#endif + pInfo->dOutput += twa_get_area(pInfo->p, st); + pInfo->p = st; + } + break; + } + case TSDB_DATA_TYPE_UINT: { + uint32_t *val = (uint32_t*) colDataGetData(pInputCol, 0); + for (; i < pInput->numOfRows + pInput->startRowIndex; i += step) { + if (colDataIsNull_f(pInputCol->nullbitmap, i)) { + continue; + } + +#ifndef _TD_NINGSI_60 + SPoint1 st = {.key = tsList[i], .val = val[i]}; +#else + SPoint1 st; + st.key = tsList[i]; + st.val = val[i]; +#endif + pInfo->dOutput += twa_get_area(pInfo->p, st); + pInfo->p = st; + } + break; + } + case TSDB_DATA_TYPE_UBIGINT: { + uint64_t *val = (uint64_t*) colDataGetData(pInputCol, 0); + for (; i < pInput->numOfRows + pInput->startRowIndex; i += step) { + if (colDataIsNull_f(pInputCol->nullbitmap, i)) { + continue; + } + +#ifndef _TD_NINGSI_60 + SPoint1 st = {.key = tsList[i], .val = (double) val[i]}; +#else + SPoint1 st; + st.key = tsList[i]; + st.val = (double) val[i]; +#endif + pInfo->dOutput += twa_get_area(pInfo->p, st); + pInfo->p = st; + } + break; + } + + default: assert(0); + } + + // the last interpolated time window value + if (pCtx->end.key != INT64_MIN) { + pInfo->dOutput += twa_get_area(pInfo->p, pCtx->end); + pInfo->p = pCtx->end; + } + + pInfo->win.ekey = pInfo->p.key; + + SET_VAL(GET_RES_INFO(pCtx), numOfElems, 1); + return TSDB_CODE_SUCCESS; +} + +/* + * To copy the input to interResBuf to avoid the input buffer space be over writen + * by next input data. The TWA function only applies to each table, so no merge procedure + * is required, we simply copy to the resut ot interResBuffer. + */ +//void twa_function_copy(SQLFunctionCtx *pCtx) { +// assert(pCtx->inputType == TSDB_DATA_TYPE_BINARY); +// SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); +// +// memcpy(GET_ROWCELL_INTERBUF(pResInfo), pCtx->pInput, (size_t)pCtx->inputBytes); +// pResInfo->hasResult = ((STwaInfo *)pCtx->pInput)->hasResult; +//} + +int32_t twaFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock) { + SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); + + STwaInfo *pInfo = (STwaInfo *)GET_ROWCELL_INTERBUF(pResInfo); + if (pInfo->hasResult != DATA_SET_FLAG) { + pResInfo->isNullRes = 1; + } else { + // assert(pInfo->win.ekey == pInfo->p.key && pInfo->hasResult == pResInfo->hasResult); + if (pInfo->win.ekey == pInfo->win.skey) { + pInfo->dOutput = pInfo->p.val; + } else { + pInfo->dOutput = pInfo->dOutput / (pInfo->win.ekey - pInfo->win.skey); + } + + pResInfo->numOfRes = 1; + } + + return functionFinalize(pCtx, pBlock); +} + diff --git a/source/libs/function/src/taggfunction.c b/source/libs/function/src/taggfunction.c index 950655e480..e683a38cbd 100644 --- a/source/libs/function/src/taggfunction.c +++ b/source/libs/function/src/taggfunction.c @@ -236,7 +236,7 @@ bool isRowEntryCompleted(struct SResultRowEntryInfo* pEntry) { bool isRowEntryInitialized(struct SResultRowEntryInfo* pEntry) { return pEntry->initialized; } - +#if 0 int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionId, int32_t param, SResultDataInfo* pInfo, int16_t extLength, bool isSuperTable/*, SUdfInfo* pUdfInfo*/) { if (!isValidDataType(dataType)) { @@ -470,6 +470,7 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI return TSDB_CODE_SUCCESS; } +#endif static bool function_setup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) { if (pResultInfo->initialized) { diff --git a/source/util/src/tlist.c b/source/util/src/tlist.c index 1d17b4a9e1..b1c0188051 100644 --- a/source/util/src/tlist.c +++ b/source/util/src/tlist.c @@ -95,7 +95,7 @@ SListNode *tdListPopTail(SList *list) { SListNode *tdListGetHead(SList *list) { return TD_DLIST_HEAD(list); } -SListNode *tsListGetTail(SList *list) { return TD_DLIST_TAIL(list); } +SListNode *tdListGetTail(SList *list) { return TD_DLIST_TAIL(list); } SListNode *tdListPopNode(SList *list, SListNode *node) { TD_DLIST_POP(list, node);