From 70927458e53f0d4c118eb8b5de82d85d371f7135 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 30 May 2022 20:10:30 +0800 Subject: [PATCH] refactor(query): do some internal refactor. --- include/libs/function/function.h | 30 -- source/common/src/tdatablock.c | 11 +- source/libs/executor/inc/executil.h | 12 +- source/libs/executor/src/executil.c | 14 +- source/libs/executor/src/executorimpl.c | 58 ++-- source/libs/executor/src/scanoperator.c | 4 +- source/libs/executor/src/timewindowoperator.c | 315 ++++++++++-------- source/libs/function/src/builtinsimpl.c | 135 ++------ source/libs/function/src/texpr.c | 14 - 9 files changed, 253 insertions(+), 340 deletions(-) diff --git a/include/libs/function/function.h b/include/libs/function/function.h index 01f347fd05..e8cb363e08 100644 --- a/include/libs/function/function.h +++ b/include/libs/function/function.h @@ -61,9 +61,6 @@ typedef struct SFileBlockInfo { #define TSDB_BLOCK_DIST_STEP_ROWS 8 #define MAX_INTERVAL_TIME_WINDOW 1000000 // maximum allowed time windows in final results -#define FUNCTION_TYPE_SCALAR 1 -#define FUNCTION_TYPE_AGG 2 - #define TOP_BOTTOM_QUERY_LIMIT 100 #define FUNCTIONS_NAME_MAX_LENGTH 16 @@ -165,9 +162,6 @@ enum { typedef struct tExprNode { int32_t nodeType; union { - SSchema *pSchema;// column node - struct SVariant *pVal; // value node - struct {// function node char functionName[FUNCTIONS_NAME_MAX_LENGTH]; // todo refactor int32_t functionId; @@ -210,47 +204,23 @@ struct SScalarParam { int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionId, int32_t param, SResultDataInfo* pInfo, int16_t extLength, bool isSuperTable); -bool qIsValidUdf(SArray* pUdfInfo, const char* name, int32_t len, int32_t* functionId); - void resetResultRowEntryResult(SqlFunctionCtx* pCtx, int32_t num); void cleanupResultRowEntry(struct SResultRowEntryInfo* pCell); int32_t getNumOfResult(SqlFunctionCtx* pCtx, int32_t num, SSDataBlock* pResBlock); bool isRowEntryCompleted(struct SResultRowEntryInfo* pEntry); bool isRowEntryInitialized(struct SResultRowEntryInfo* pEntry); -/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -// fill api -struct SFillInfo; -struct SFillColInfo; - typedef struct SPoint { int64_t key; void * val; } SPoint; -//void taosFillSetStartInfo(struct SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey); -//void taosResetFillInfo(struct SFillInfo* pFillInfo, TSKEY startTimestamp); -//void taosFillSetInputDataBlock(struct SFillInfo* pFillInfo, const struct SSDataBlock* pInput); -//struct SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfOutput, const SValueNode* val); -//bool taosFillHasMoreResults(struct SFillInfo* pFillInfo); -// -//struct SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_t capacity, int32_t numOfCols, -// SInterval* pInterval, int32_t fillType, -// struct SFillColInfo* pCol, const char* id); -// -//void* taosDestroyFillInfo(struct SFillInfo *pFillInfo); -//int64_t taosFillResultDataBlock(struct SFillInfo* pFillInfo, void** output, int32_t capacity); -//int64_t getFillInfoStart(struct SFillInfo *pFillInfo); - int32_t taosGetLinearInterpolationVal(SPoint* point, int32_t outputType, SPoint* point1, SPoint* point2, int32_t inputType); /////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // udf api struct SUdfInfo; -void qAddUdfInfo(uint64_t id, struct SUdfInfo* pUdfInfo); -void qRemoveUdfInfo(uint64_t id, struct SUdfInfo* pUdfInfo); - /** * create udfd proxy, called once in process that call doSetupUdf/callUdfxxx/doTeardownUdf * @return error code diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 49c438d8cf..1bd35bdefe 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -354,14 +354,19 @@ int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock, int32_t tsColumnIndex) return -1; } - int32_t index = (tsColumnIndex == -1) ? 0 : tsColumnIndex; + int32_t index = (tsColumnIndex == -1) ? 0 : tsColumnIndex; + SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, index); if (pColInfoData->info.type != TSDB_DATA_TYPE_TIMESTAMP) { return 0; } - pDataBlock->info.window.skey = *(TSKEY*)colDataGetData(pColInfoData, 0); - pDataBlock->info.window.ekey = *(TSKEY*)colDataGetData(pColInfoData, (pDataBlock->info.rows - 1)); + TSKEY skey = *(TSKEY*)colDataGetData(pColInfoData, 0); + TSKEY ekey = *(TSKEY*)colDataGetData(pColInfoData, (pDataBlock->info.rows - 1)); + + pDataBlock->info.window.skey = TMIN(skey, ekey); + pDataBlock->info.window.ekey = TMAX(skey, ekey); + return 0; } diff --git a/source/libs/executor/inc/executil.h b/source/libs/executor/inc/executil.h index dcd4e8f918..b8975854c9 100644 --- a/source/libs/executor/inc/executil.h +++ b/source/libs/executor/inc/executil.h @@ -80,11 +80,10 @@ typedef struct SResultRowInfo { struct SqlFunctionCtx; -size_t getResultRowSize(struct SqlFunctionCtx* pCtx, int32_t numOfOutput); +size_t getResultRowSize(struct SqlFunctionCtx* pCtx, int32_t numOfOutput); int32_t initResultRowInfo(SResultRowInfo* pResultRowInfo, int32_t size); void cleanupResultRowInfo(SResultRowInfo* pResultRowInfo); -int32_t numOfClosedResultRows(SResultRowInfo* pResultRowInfo); void closeAllResultRows(SResultRowInfo* pResultRowInfo); void initResultRow(SResultRow *pResultRow); @@ -93,15 +92,6 @@ bool isResultRowClosed(SResultRow* pResultRow); struct SResultRowEntryInfo* getResultCell(const SResultRow* pRow, int32_t index, const int32_t* offset); -static FORCE_INLINE SResultRow *getResultRow(SDiskbasedBuf* pBuf, SResultRowInfo *pResultRowInfo, int32_t slot) { - ASSERT(pResultRowInfo != NULL && slot >= 0 && slot < pResultRowInfo->size); - SResultRowPosition* pos = &pResultRowInfo->pPosition[slot]; - - SFilePage* bufPage = (SFilePage*) getBufPage(pBuf, pos->pageId); - SResultRow* pRow = (SResultRow*)((char*)bufPage + pos->offset); - return pRow; -} - static FORCE_INLINE SResultRow *getResultRowByPos(SDiskbasedBuf* pBuf, SResultRowPosition* pos) { SFilePage* bufPage = (SFilePage*) getBufPage(pBuf, pos->pageId); SResultRow* pRow = (SResultRow*)((char*)bufPage + pos->offset); diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index b8915ae218..3f3fa9836f 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -101,20 +101,8 @@ void resetResultRowInfo(STaskRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRow pResultRowInfo->size = 0; } -int32_t numOfClosedResultRows(SResultRowInfo *pResultRowInfo) { - int32_t i = 0; -// while (i < pResultRowInfo->size && pResultRowInfo->pResult[i]->closed) { -// ++i; -// } - - return i; -} - void closeAllResultRows(SResultRowInfo *pResultRowInfo) { - assert(pResultRowInfo->size >= 0 && pResultRowInfo->capacity >= pResultRowInfo->size); - - for (int32_t i = 0; i < pResultRowInfo->size; ++i) { - } +// do nothing } bool isResultRowClosed(SResultRow* pRow) { diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 9c5f93613a..23493ae647 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -345,6 +345,8 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR // In case of group by column query, the required SResultRow object must be existInCurrentResusltRowInfo in the // pResultRowInfo object. if (p1 != NULL) { + + // todo pResult = getResultRowByPos(pResultBuf, p1); ASSERT(pResult->pageId == p1->pageId && pResult->offset == p1->offset); } @@ -353,11 +355,9 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR // 1. close current opened time window if (pResultRowInfo->cur.pageId != -1 && ((pResult == NULL) || (pResult->pageId != pResultRowInfo->cur.pageId && pResult->offset != pResultRowInfo->cur.offset))) { - // todo extract function SResultRowPosition pos = pResultRowInfo->cur; - SFilePage* pPage = getBufPage(pResultBuf, pos.pageId); - SResultRow* pRow = (SResultRow*)((char*)pPage + pos.offset); - closeResultRow(pRow); + SFilePage* + pPage = getBufPage(pResultBuf, pos.pageId); releaseBufPage(pResultBuf, pPage); } @@ -552,11 +552,13 @@ void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow colDataAppendInt64(pColData, 4, &pQueryWindow->ekey); } + void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pTimeWindowData, int32_t offset, int32_t forwardStep, TSKEY* tsCol, int32_t numOfTotal, int32_t numOfOutput, int32_t order) { for (int32_t k = 0; k < numOfOutput; ++k) { // keep it temporarily + // todo no need this?? bool hasAgg = pCtx[k].input.colDataAggIsSet; int32_t numOfRows = pCtx[k].input.numOfRows; int32_t startOffset = pCtx[k].input.startRowIndex; @@ -576,7 +578,8 @@ void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow if (fmIsWindowPseudoColumnFunc(pCtx[k].functionId)) { SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(&pCtx[k]); - char* p = GET_ROWCELL_INTERBUF(pEntryInfo); + + char* p = GET_ROWCELL_INTERBUF(pEntryInfo); SColumnInfoData idata = {0}; idata.info.type = TSDB_DATA_TYPE_BIGINT; @@ -587,22 +590,23 @@ void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow SScalarParam tw = {.numOfRows = 5, .columnData = pTimeWindowData}; pCtx[k].sfp.process(&tw, 1, &out); pEntryInfo->numOfRes = 1; - continue; - } - int32_t code = TSDB_CODE_SUCCESS; - if (functionNeedToExecute(&pCtx[k]) && pCtx[k].fpSet.process != NULL) { - code = pCtx[k].fpSet.process(&pCtx[k]); - if (code != TSDB_CODE_SUCCESS) { - qError("%s apply functions error, code: %s", GET_TASKID(taskInfo), tstrerror(code)); - taskInfo->code = code; - longjmp(taskInfo->env, code); - } - } + } else { + int32_t code = TSDB_CODE_SUCCESS; + if (functionNeedToExecute(&pCtx[k]) && pCtx[k].fpSet.process != NULL) { + code = pCtx[k].fpSet.process(&pCtx[k]); - // restore it - pCtx[k].input.colDataAggIsSet = hasAgg; - pCtx[k].input.startRowIndex = startOffset; - pCtx[k].input.numOfRows = numOfRows; + if (code != TSDB_CODE_SUCCESS) { + qError("%s apply functions error, code: %s", GET_TASKID(taskInfo), tstrerror(code)); + taskInfo->code = code; + longjmp(taskInfo->env, code); + } + } + + // restore it + pCtx[k].input.colDataAggIsSet = hasAgg; + pCtx[k].input.startRowIndex = startOffset; + pCtx[k].input.numOfRows = numOfRows; + } } } @@ -741,12 +745,14 @@ static int32_t doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SqlFunct for (int32_t k = 0; k < pOperator->numOfExprs; ++k) { if (functionNeedToExecute(&pCtx[k])) { // todo add a dummy funtion to avoid process check - if (pCtx[k].fpSet.process != NULL) { - int32_t code = pCtx[k].fpSet.process(&pCtx[k]); - if (code != TSDB_CODE_SUCCESS) { - qError("%s aggregate function error happens, code: %s", GET_TASKID(pOperator->pTaskInfo), tstrerror(code)); - return code; - } + if (pCtx[k].fpSet.process == NULL) { + continue; + } + + int32_t code = pCtx[k].fpSet.process(&pCtx[k]); + if (code != TSDB_CODE_SUCCESS) { + qError("%s aggregate function error happens, code: %s", GET_TASKID(pOperator->pTaskInfo), tstrerror(code)); + return code; } } } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 11b50a38bc..c84883823d 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -534,8 +534,8 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, pInfo->pPseudoCtx = createSqlFunctionCtx(pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, &pInfo->rowCellInfoOffset); } -// pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]}; - pInfo->scanInfo = (SScanInfo){.numOfAsc = 0, .numOfDesc = 1}; // for debug purpose + pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]}; +// pInfo->scanInfo = (SScanInfo){.numOfAsc = 0, .numOfDesc = 1}; // for debug purpose pInfo->readHandle = *readHandle; pInfo->interval = extractIntervalInfo(pTableScanNode); diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index a1a7aecf5f..6595456c17 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -12,6 +12,11 @@ typedef enum SResultTsInterpType { static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator); static SSDataBlock* doStreamSessionWindowAgg(SOperatorInfo* pOperator); +static int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo* pInfo); + +static SResultRowPosition addToOpenWindowList(SResultRowInfo* pResultRowInfo, const SResultRow* pResult); +static void doCloseWindow(SResultRowInfo* pResultRowInfo, const SIntervalAggOperatorInfo* pInfo, SResultRow* pResult); + /* * There are two cases to handle: * @@ -135,8 +140,10 @@ static int32_t setTimeWindowOutputBuf(SResultRowInfo* pResultRowInfo, STimeWindo // set time window for current result pResultRow->win = (*win); + *pResult = pResultRow; setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowCellInfoOffset); + return TSDB_CODE_SUCCESS; } @@ -164,38 +171,38 @@ static void doKeepNewWindowStartInfo(SWindowRowsSup* pRowSup, const int64_t* tsL static FORCE_INLINE int32_t getForwardStepsInBlock(int32_t numOfRows, __block_search_fn_t searchFn, TSKEY ekey, int16_t pos, int16_t order, int64_t* pData) { - int32_t forwardStep = 0; + int32_t forwardRows = 0; if (order == TSDB_ORDER_ASC) { int32_t end = searchFn((char*)&pData[pos], numOfRows - pos, ekey, order); if (end >= 0) { - forwardStep = end; + forwardRows = end; if (pData[end + pos] == ekey) { - forwardStep += 1; + forwardRows += 1; } } } else { int32_t end = searchFn((char*)&pData[pos], numOfRows - pos, ekey, order); if (end >= 0) { - forwardStep = end; + forwardRows = end; if (pData[end + pos] == ekey) { - forwardStep += 1; + forwardRows += 1; } } // int32_t end = searchFn((char*)pData, pos + 1, ekey, order); // if (end >= 0) { -// forwardStep = pos - end; +// forwardRows = pos - end; // // if (pData[end] == ekey) { -// forwardStep += 1; +// forwardRows += 1; // } // } } - assert(forwardStep >= 0); - return forwardStep; + assert(forwardRows >= 0); + return forwardRows; } int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order) { @@ -431,10 +438,7 @@ static void setNotInterpoWindowKey(SqlFunctionCtx* pCtx, int32_t numOfOutput, in static bool setTimeWindowInterpolationStartTs(SIntervalAggOperatorInfo *pInfo, SqlFunctionCtx* pCtx, int32_t numOfExprs, int32_t pos, SSDataBlock* pBlock, const TSKEY* tsCols, STimeWindow* win) { - int32_t numOfRows = pBlock->info.rows; - bool ascQuery = (pInfo->order == TSDB_ORDER_ASC); - int32_t step = 1; // GET_FORWARD_DIRECTION_FACTOR(pQueryAttr->order.order); TSKEY curTs = tsCols[pos]; @@ -449,14 +453,15 @@ static bool setTimeWindowInterpolationStartTs(SIntervalAggOperatorInfo *pInfo, S return true; } - if (lastTs == INT64_MIN && ((pos == 0 && ascQuery) || (pos == ( - 1) && !ascQuery))) { + // it is the first time window, no need to do interpolation + if (pTsKey->isNull && pos == 0) { setNotInterpoWindowKey(pCtx, numOfExprs, RESULT_ROW_START_INTERP); - return true; + } else { + TSKEY prevTs = ((pos == 0) ? lastTs : tsCols[pos - 1]); + doTimeWindowInterpolation(pInfo, numOfExprs, pBlock->pDataBlock, prevTs, pos - 1, curTs, pos, key, + RESULT_ROW_START_INTERP); } - TSKEY prevTs = ((pos == 0 && ascQuery) || (pos == (numOfRows - 1) && !ascQuery)) ? lastTs : tsCols[pos - step]; - - doTimeWindowInterpolation(pInfo, numOfExprs, pBlock->pDataBlock, prevTs, pos - step, curTs, pos, key, RESULT_ROW_START_INTERP); return true; } @@ -473,14 +478,13 @@ static bool setTimeWindowInterpolationEndTs(SIntervalAggOperatorInfo *pInfo, Sql return false; } - // there is actual end point of current time window, no interpolation need + // there is actual end point of current time window, no interpolation needs if (key == actualEndKey) { setNotInterpoWindowKey(pCtx, numOfExprs, RESULT_ROW_END_INTERP); return true; } - int32_t step = GET_FORWARD_DIRECTION_FACTOR(order); - int32_t nextRowIndex = endRowIndex + step; + int32_t nextRowIndex = endRowIndex + 1; assert(nextRowIndex >= 0); TSKEY nextKey = tsCols[nextRowIndex]; @@ -575,27 +579,23 @@ static void setResultRowInterpo(SResultRow* pResult, SResultTsInterpType type) { } static void doWindowBorderInterpolation(SIntervalAggOperatorInfo *pInfo, SSDataBlock* pBlock, int32_t numOfExprs, SqlFunctionCtx* pCtx, - SResultRow* pResult, STimeWindow* win, int32_t startPos, int32_t forwardStep) { + SResultRow* pResult, STimeWindow* win, int32_t startPos, int32_t forwardRows) { if (!pInfo->timeWindowInterpo) { return; } - assert(pBlock != NULL); - int32_t step = GET_FORWARD_DIRECTION_FACTOR(pInfo->order); - + ASSERT(pBlock != NULL); if (pBlock->pDataBlock == NULL) { // tscError("pBlock->pDataBlock == NULL"); return; } - // todo set the correct primary timestamp column index - SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, 0); + SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex); TSKEY* tsCols = (TSKEY*)(pColInfo->pData); bool done = isResultRowInterpolated(pResult, RESULT_ROW_START_INTERP); if (!done) { // it is not interpolated, now start to generated the interpolated value - int32_t startRowIndex = startPos; - bool interp = setTimeWindowInterpolationStartTs(pInfo, pCtx, numOfExprs, startRowIndex, pBlock, tsCols, win); + bool interp = setTimeWindowInterpolationStartTs(pInfo, pCtx, numOfExprs, startPos, pBlock, tsCols, win); if (interp) { setResultRowInterpo(pResult, RESULT_ROW_START_INTERP); } @@ -611,7 +611,7 @@ static void doWindowBorderInterpolation(SIntervalAggOperatorInfo *pInfo, SSDataB // interpolation query does not generate the time window end interpolation done = isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP); if (!done) { - int32_t endRowIndex = startPos + (forwardStep - 1) * step; + int32_t endRowIndex = startPos + forwardRows - 1; TSKEY endKey = (pInfo->order == TSDB_ORDER_ASC) ? pBlock->info.window.ekey : pBlock->info.window.skey; bool interp = @@ -654,43 +654,87 @@ static void saveDataBlockLastRow(SArray* pPrevKeys, const SSDataBlock* pBlock, S } } -static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock, - uint64_t tableGroupId) { +static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t numOfExprs, SResultRowInfo* pResultRowInfo, + SSDataBlock* pBlock, int32_t scanFlag, int64_t* tsCols, SResultRowPosition* p) { + SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo; + SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)pOperatorInfo->info; - SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo; - int32_t numOfOutput = pOperatorInfo->numOfExprs; + int32_t startPos = 0; + int32_t numOfOutput = pOperatorInfo->numOfExprs; + uint64_t groupId = pBlock->info.groupId; + + SResultRow* pResult = NULL; + + while (1) { + SListNode* pn = tdListGetHead(pResultRowInfo->openWindow); + + SResultRowPosition* p1 = (SResultRowPosition*)pn->data; + if (p->pageId == p1->pageId && p->offset == p1->offset) { + break; + } + + SResultRow* pr = getResultRowByPos(pInfo->aggSup.pResultBuf, p1); + ASSERT(pr->offset == p1->offset && pr->pageId == p1->pageId); + + if (pr->closed) { + ASSERT(isResultRowInterpolated(pr, RESULT_ROW_START_INTERP) && isResultRowInterpolated(pr, RESULT_ROW_END_INTERP)); + tdListPopHead(pResultRowInfo->openWindow); + continue; + } + + STimeWindow w = pr->win; + int32_t ret = setTimeWindowOutputBuf(pResultRowInfo, &w, (scanFlag == MAIN_SCAN), &pResult, groupId, pInfo->binfo.pCtx, + numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, pTaskInfo); + if (ret != TSDB_CODE_SUCCESS) { + longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + ASSERT(!isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)); + + SGroupKeys *pTsKey = taosArrayGet(pInfo->pPrevValues, 0); + int64_t prevTs = *(int64_t*) pTsKey->pData; + doTimeWindowInterpolation(pInfo, numOfOutput, pBlock->pDataBlock, prevTs, -1, tsCols[startPos], startPos, + w.ekey, RESULT_ROW_END_INTERP); + + setResultRowInterpo(pResult, RESULT_ROW_END_INTERP); + setNotInterpoWindowKey(pInfo->binfo.pCtx, numOfExprs, RESULT_ROW_START_INTERP); + + doApplyFunctions(pTaskInfo, pInfo->binfo.pCtx, &w, &pInfo->twAggSup.timeWindowData, startPos, 0, tsCols, + pBlock->info.rows, numOfExprs, pInfo->order); + + if (isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) { + closeResultRow(pr); + tdListPopHead(pResultRowInfo->openWindow); + } else { // the remains are can not be closed yet. + break; + } + } +} + +static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock, + int32_t scanFlag) { + SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)pOperatorInfo->info; SArray* pUpdated = NULL; if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM) { pUpdated = taosArrayInit(4, POINTER_BYTES); } - int32_t step = 1; + SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo; - bool ascScan = (pInfo->order == TSDB_ORDER_ASC); - - // int32_t prevIndex = pResultRowInfo->curPos; - - TSKEY* tsCols = NULL; - if (pBlock->pDataBlock != NULL) { - SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex); - tsCols = (int64_t*)pColDataInfo->pData; - - if (tsCols != NULL) { - blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex); - } - } - - int32_t startPos = 0; - TSKEY ts = getStartTsKey(&pBlock->info.window, tsCols, pBlock->info.rows, ascScan); + int32_t startPos = 0; + int32_t numOfOutput = pOperatorInfo->numOfExprs; + int64_t *tsCols = extractTsCol(pBlock, pInfo); + uint64_t tableGroupId = pBlock->info.groupId; + bool ascScan = (pInfo->order == TSDB_ORDER_ASC); + TSKEY ts = getStartTsKey(&pBlock->info.window, tsCols, pBlock->info.rows, ascScan); + SResultRow* pResult = NULL; STimeWindow win = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, pInfo->interval.precision, &pInfo->win); - bool masterScan = true; - SResultRow* pResult = NULL; - int32_t ret = setTimeWindowOutputBuf(pResultRowInfo, &win, masterScan, &pResult, tableGroupId, pInfo->binfo.pCtx, + int32_t ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pInfo->binfo.pCtx, numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, pTaskInfo); if (ret != TSDB_CODE_SUCCESS || pResult == NULL) { longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); @@ -705,84 +749,35 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe taosArrayPush(pUpdated, &pos); } - int32_t forwardStep = 0; TSKEY ekey = ascScan? win.ekey:win.skey; - forwardStep = - getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, pInfo->order); - ASSERT(forwardStep > 0); + int32_t forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, pInfo->order); + ASSERT(forwardRows > 0); // prev time window not interpolation yet. - SResultRowPosition pos = {0}; if (pInfo->timeWindowInterpo) { - pos = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset}; - SListNode* pn = tdListGetTail(pResultRowInfo->openWindow); + SResultRowPosition pos = addToOpenWindowList(pResultRowInfo, pResult); + doInterpUnclosedTimeWindow(pOperatorInfo, numOfOutput, pResultRowInfo, pBlock, scanFlag, tsCols, &pos); - SResultRowPosition* px = (SResultRowPosition*)pn->data; - - // do nothing - if (pn != NULL && px->pageId == pos.pageId && px->offset == pos.offset) { - - } else { - tdListAppend(pResultRowInfo->openWindow, &pos); - } - } - - if (pInfo->timeWindowInterpo) { - while (1) { - SListNode* pn = tdListGetHead(pResultRowInfo->openWindow); - SResultRowPosition* p1 = (SResultRowPosition*)pn->data; - if (p1->pageId == pos.pageId && p1->offset == pos.offset) { - break; - } - - SResultRow* pr = getResultRowByPos(pInfo->aggSup.pResultBuf, p1); - if (pr->closed) { - ASSERT(isResultRowInterpolated(pr, RESULT_ROW_START_INTERP) && isResultRowInterpolated(pr, RESULT_ROW_END_INTERP)); - tdListPopHead(pResultRowInfo->openWindow); - continue; - } - - STimeWindow w = pr->win; - ret = setTimeWindowOutputBuf(pResultRowInfo, &w, masterScan, &pResult, tableGroupId, pInfo->binfo.pCtx, - numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, pTaskInfo); - if (ret != TSDB_CODE_SUCCESS) { - longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); - } - - ASSERT(!isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)); - - SGroupKeys *pTsKey = taosArrayGet(pInfo->pPrevValues, 0); - int64_t prevTs = *(int64_t*) pTsKey->pData; - doTimeWindowInterpolation(pInfo, numOfOutput, pBlock->pDataBlock, prevTs, -1, tsCols[startPos], startPos, - w.ekey, RESULT_ROW_END_INTERP); - - setResultRowInterpo(pResult, RESULT_ROW_END_INTERP); - setNotInterpoWindowKey(pInfo->binfo.pCtx, pOperatorInfo->numOfExprs, RESULT_ROW_START_INTERP); - - doApplyFunctions(pTaskInfo, pInfo->binfo.pCtx, &w, &pInfo->twAggSup.timeWindowData, startPos, forwardStep, tsCols, - pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); - tdListPopHead(pResultRowInfo->openWindow); - } - - // todo keep current outputbuffer info to avoid repeatly set the buffer info // restore current time window - ret = setTimeWindowOutputBuf(pResultRowInfo, &win, masterScan, &pResult, tableGroupId, pInfo->binfo.pCtx, + ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pInfo->binfo.pCtx, numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, pTaskInfo); if (ret != TSDB_CODE_SUCCESS) { longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } // window start key interpolation - doWindowBorderInterpolation(pInfo, pBlock, numOfOutput, pInfo->binfo.pCtx, pResult, &win, startPos, forwardStep); + doWindowBorderInterpolation(pInfo, pBlock, numOfOutput, pInfo->binfo.pCtx, pResult, &win, startPos, forwardRows); } updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &win, true); - doApplyFunctions(pTaskInfo, pInfo->binfo.pCtx, &win, &pInfo->twAggSup.timeWindowData, startPos, forwardStep, tsCols, - pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); + doApplyFunctions(pTaskInfo, pInfo->binfo.pCtx, &win, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, tsCols, + pBlock->info.rows, numOfOutput, pInfo->order); + + doCloseWindow(pResultRowInfo, pInfo, pResult); STimeWindow nextWin = win; while (1) { - int32_t prevEndPos = (forwardStep - 1) * step + startPos; + int32_t prevEndPos = forwardRows - 1 + startPos; startPos = getNextQualifiedWindow(&pInfo->interval, &nextWin, &pBlock->info, tsCols, prevEndPos, pInfo->order); if (startPos < 0) { break; @@ -790,7 +785,7 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe // null data, failed to allocate more memory buffer int32_t code = - setTimeWindowOutputBuf(pResultRowInfo, &nextWin, masterScan, &pResult, tableGroupId, pInfo->binfo.pCtx, + setTimeWindowOutputBuf(pResultRowInfo, &nextWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pInfo->binfo.pCtx, numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, pTaskInfo); if (code != TSDB_CODE_SUCCESS || pResult == NULL) { longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); @@ -806,24 +801,61 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe } ekey = ascScan? nextWin.ekey:nextWin.skey; - forwardStep = + forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, pInfo->order); // window start(end) key interpolation - doWindowBorderInterpolation(pInfo, pBlock, numOfOutput, pInfo->binfo.pCtx, pResult, &nextWin, startPos, forwardStep); + doWindowBorderInterpolation(pInfo, pBlock, numOfOutput, pInfo->binfo.pCtx, pResult, &nextWin, startPos, forwardRows); updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true); - doApplyFunctions(pTaskInfo, pInfo->binfo.pCtx, &nextWin, &pInfo->twAggSup.timeWindowData, startPos, forwardStep, tsCols, - pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); + doApplyFunctions(pTaskInfo, pInfo->binfo.pCtx, &nextWin, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, tsCols, + pBlock->info.rows, numOfOutput, pInfo->order); + doCloseWindow(pResultRowInfo, pInfo, pResult); } if (pInfo->timeWindowInterpo) { - int32_t rowIndex = ascScan? (pBlock->info.rows - 1) : 0; saveDataBlockLastRow(pInfo->pPrevValues, pBlock, pInfo->pInterpCols); } return pUpdated; - // updateResultRowInfoActiveIndex(pResultRowInfo, &pInfo->win, pRuntimeEnv->current->lastKey, true, false); +} + +void doCloseWindow(SResultRowInfo* pResultRowInfo, const SIntervalAggOperatorInfo* pInfo, SResultRow* pResult) { + // current result is done in computing final results. + if (pInfo->timeWindowInterpo && isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) { + closeResultRow(pResult); + tdListPopHead(pResultRowInfo->openWindow); + } +} + +SResultRowPosition addToOpenWindowList(SResultRowInfo* pResultRowInfo, const SResultRow* pResult) { + SResultRowPosition pos = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset}; + SListNode* pn = tdListGetTail(pResultRowInfo->openWindow); + if (pn == NULL) { + tdListAppend(pResultRowInfo->openWindow, &pos); + return pos; + } + + SResultRowPosition* px = (SResultRowPosition*)pn->data; + if (px->pageId != pos.pageId || px->offset != pos.offset) { + tdListAppend(pResultRowInfo->openWindow, &pos); + } + + return pos; +} + +int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo* pInfo) { + TSKEY* tsCols = NULL; + if (pBlock->pDataBlock != NULL) { + SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex); + tsCols = (int64_t*)pColDataInfo->pData; + + if (tsCols != NULL) { + blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex); + } + } + + return tsCols; } static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) { @@ -852,7 +884,7 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) { STableQueryInfo* pTableQueryInfo = pInfo->pCurrent; setIntervalQueryRange(pTableQueryInfo, pBlock->info.window.skey, &pTaskInfo->window); - hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, pBlock->info.groupId); + hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, scanFlag); #if 0 // test for encode/decode result info if(pOperator->encodeResultRow){ @@ -983,8 +1015,9 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) { } SStateWindowOperatorInfo* pInfo = pOperator->info; - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - SOptrBasicInfo* pBInfo = &pInfo->binfo; + + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SOptrBasicInfo* pBInfo = &pInfo->binfo; if (pOperator->status == OP_RES_TO_RETURN) { doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); @@ -1165,7 +1198,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { break; } - // The timewindows that overlaps the timestamps of the input pBlock need to be recalculated and return to the + // The timewindow that overlaps the timestamps of the input pBlock need to be recalculated and return to the // caller. Note that all the time window are not close till now. // the pDataBlock are always the same one, no need to call this again setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, pInfo->order, MAIN_SCAN, true); @@ -1180,7 +1213,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { continue; } - pUpdated = hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, pBlock->info.groupId); + pUpdated = hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, MAIN_SCAN); } finalizeUpdatedResult(pOperator->numOfExprs, pInfo->aggSup.pResultBuf, pUpdated, pInfo->binfo.rowCellInfoOffset); @@ -1230,26 +1263,26 @@ static bool allInvertible(SqlFunctionCtx* pFCtx, int32_t numOfCols) { return true; } -static bool timeWindowinterpNeeded(SqlFunctionCtx* pCtx, int32_t numOfCols, SArray** pInterpList, SArray** pPrevCols) { +static bool timeWindowinterpNeeded(SqlFunctionCtx* pCtx, int32_t numOfCols, SIntervalAggOperatorInfo* pInfo) { // the primary timestamp column bool needed = false; - *pInterpList = taosArrayInit(4, sizeof(SColumn)); - *pPrevCols = taosArrayInit(4, sizeof(SGroupKeys)); + pInfo->pInterpCols = taosArrayInit(4, sizeof(SColumn)); + pInfo->pPrevValues = taosArrayInit(4, sizeof(SGroupKeys)); { // ts column SColumn c = {0}; c.colId = 1; - c.slotId = 0; + c.slotId = pInfo->primaryTsIndex; c.type = TSDB_DATA_TYPE_TIMESTAMP; c.bytes = sizeof(int64_t); - taosArrayPush(*pInterpList, &c); + taosArrayPush(pInfo->pInterpCols, &c); SGroupKeys key = {0}; key.bytes = c.bytes; key.type = c.type; - key.isNull = false; + key.isNull = true; // to denote no value is assigned yet key.pData = taosMemoryCalloc(1, c.bytes); - taosArrayPush(*pPrevCols, &key); + taosArrayPush(pInfo->pPrevValues, &key); } for(int32_t i = 0; i < numOfCols; ++i) { @@ -1259,7 +1292,7 @@ static bool timeWindowinterpNeeded(SqlFunctionCtx* pCtx, int32_t numOfCols, SArr SFunctParam* pParam = &pExpr->base.pParam[0]; SColumn c = *pParam->pCol; - taosArrayPush(*pInterpList, &c); + taosArrayPush(pInfo->pInterpCols, &c); needed = true; SGroupKeys key = {0}; @@ -1267,7 +1300,7 @@ static bool timeWindowinterpNeeded(SqlFunctionCtx* pCtx, int32_t numOfCols, SArr key.type = c.type; key.isNull = false; key.pData = taosMemoryCalloc(1, c.bytes); - taosArrayPush(*pPrevCols, &key); + taosArrayPush(pInfo->pPrevValues, &key); } } @@ -1302,7 +1335,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pInfo->invertible = allInvertible(pInfo->binfo.pCtx, numOfCols); pInfo->invertible = false; // Todo(liuyao): Dependent TSDB API - pInfo->timeWindowInterpo = timeWindowinterpNeeded(pInfo->binfo.pCtx, numOfCols, &pInfo->pInterpCols, &pInfo->pPrevValues); + pInfo->timeWindowInterpo = timeWindowinterpNeeded(pInfo->binfo.pCtx, numOfCols, pInfo); if (pInfo->timeWindowInterpo) { pInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SResultRowPosition)); } @@ -1780,7 +1813,7 @@ static SArray* doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataB bool ascScan = true; TSKEY* tsCols = NULL; SResultRow* pResult = NULL; - int32_t forwardStep = 0; + int32_t forwardRows = 0; if (pSDataBlock->pDataBlock != NULL) { SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex); @@ -1805,15 +1838,15 @@ static SArray* doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataB pos->pos = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset}; *(int64_t*)pos->key = pResult->win.skey; taosArrayPush(pUpdated, &pos); - forwardStep = + forwardRows = getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, nextWin.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC); // window start(end) key interpolation // disable it temporarily -// doWindowBorderInterpolation(pInfo, pSDataBlock, numOfOutput, pInfo->binfo.pCtx, pResult, &nextWin, startPos, forwardStep); +// doWindowBorderInterpolation(pInfo, pSDataBlock, numOfOutput, pInfo->binfo.pCtx, pResult, &nextWin, startPos, forwardRows); updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true); - doApplyFunctions(pTaskInfo, pInfo->binfo.pCtx, &nextWin, &pInfo->twAggSup.timeWindowData, startPos, forwardStep, tsCols, + doApplyFunctions(pTaskInfo, pInfo->binfo.pCtx, &nextWin, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, tsCols, pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); - int32_t prevEndPos = (forwardStep - 1) * step + startPos; + int32_t prevEndPos = (forwardRows - 1) * step + startPos; startPos = getNextQualifiedWindow(&pInfo->interval, &nextWin, &pSDataBlock->info, tsCols, prevEndPos, pInfo->order); if (startPos < 0) { break; @@ -2321,7 +2354,7 @@ static void doStreamSessionWindowAggImpl(SOperatorInfo* pOperator, longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } // window start(end) key interpolation - // doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->binfo.pCtx, pResult, &nextWin, startPos, forwardStep, + // doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->binfo.pCtx, pResult, &nextWin, startPos, forwardRows, // pInfo->order, false); int32_t winNum = getNumCompactWindow(pAggSup->pResultRows, winIndex, gap); if (winNum > 0) { diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 78e057f7a8..6694a8cdb3 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -3946,7 +3946,6 @@ int32_t uniqueFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { typedef struct STwaInfo { double dOutput; - int8_t hasResult; // flag to denote has value SPoint1 p; STimeWindow win; } STwaInfo; @@ -3977,28 +3976,25 @@ static double twa_get_area(SPoint1 s, SPoint1 e) { return val; } +#define INIT_INTP_POINT(_p, _k, _v) \ + do { \ + (_p).key = (_k); \ + (_p).val = (_v); \ + } while (0) + int32_t twaFunction(SqlFunctionCtx* pCtx) { SInputColumnInfoData* pInput = &pCtx->input; SColumnInfoData* pInputCol = pInput->pData[0]; - int32_t numOfElems = 0; - TSKEY* tsList = (int64_t*)pInput->pPTS->pData; SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); STwaInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo); + SPoint1 *last = &pInfo->p; + int32_t numOfElems = 0; - SPoint1* last = &pInfo->p; - - int32_t step = 1;//GET_FORWARD_DIRECTION_FACTOR(pCtx->order); int32_t i = pInput->startRowIndex; - for (i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += step) { - if (!colDataIsNull_f(pInputCol->nullbitmap, i)) { - break; - } - } - if (pCtx->start.key != INT64_MIN) { ASSERT((pCtx->start.key < tsList[i] && pCtx->order == TSDB_ORDER_ASC) || (pCtx->start.key > tsList[i] && pCtx->order == TSDB_ORDER_DESC)); @@ -4009,37 +4005,30 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) { GET_TYPED_DATA(last->val, double, pInputCol->info.type, colDataGetData(pInputCol, i)); pInfo->dOutput += twa_get_area(pCtx->start, *last); - - pInfo->hasResult = DATA_SET_FLAG; pInfo->win.skey = pCtx->start.key; numOfElems++; - i += step; + i += 1; } else if (pInfo->p.key == INT64_MIN) { last->key = tsList[i]; GET_TYPED_DATA(last->val, double, pInputCol->info.type, colDataGetData(pInputCol, i)); - pInfo->hasResult = DATA_SET_FLAG; pInfo->win.skey = last->key; numOfElems++; - i += step; + i += 1; } + SPoint1 st = {0}; + // calculate the value of switch(pInputCol->info.type) { case TSDB_DATA_TYPE_TINYINT: { int8_t *val = (int8_t*) colDataGetData(pInputCol, 0); - for (; i < pInput->numOfRows + pInput->startRowIndex; i += step) { + for (; i < pInput->numOfRows + pInput->startRowIndex; i += 1) { if (colDataIsNull_f(pInputCol->nullbitmap, i)) { continue; } -#ifndef _TD_NINGSI_60 - SPoint1 st = {.key = tsList[i], .val = val[i]}; -#else - SPoint1 st; - st.key = tsList[i]; - st.val = val[i]; -#endif + INIT_INTP_POINT(st, tsList[i], val[i]); pInfo->dOutput += twa_get_area(pInfo->p, st); pInfo->p = st; } @@ -4048,18 +4037,12 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) { case TSDB_DATA_TYPE_SMALLINT: { int16_t *val = (int16_t*) colDataGetData(pInputCol, 0); - for (; i < pInput->numOfRows + pInput->startRowIndex; i += step) { + for (; i < pInput->numOfRows + pInput->startRowIndex; i += 1) { if (colDataIsNull_f(pInputCol->nullbitmap, i)) { continue; } -#ifndef _TD_NINGSI_60 - SPoint1 st = {.key = tsList[i], .val = val[i]}; -#else - SPoint1 st; - st.key = tsList[i]; - st.val = val[i]; -#endif + INIT_INTP_POINT(st, tsList[i], val[i]); pInfo->dOutput += twa_get_area(pInfo->p, st); pInfo->p = st; } @@ -4067,18 +4050,12 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) { } case TSDB_DATA_TYPE_INT: { int32_t *val = (int32_t*) colDataGetData(pInputCol, 0); - for (; i < pInput->numOfRows + pInput->startRowIndex; i += step) { + for (; i < pInput->numOfRows + pInput->startRowIndex; i += 1) { if (colDataIsNull_f(pInputCol->nullbitmap, i)) { continue; } -#ifndef _TD_NINGSI_60 - SPoint1 st = {.key = tsList[i], .val = val[i]}; -#else - SPoint1 st; - st.key = tsList[i]; - st.val = val[i]; -#endif + INIT_INTP_POINT(st, tsList[i], val[i]); pInfo->dOutput += twa_get_area(pInfo->p, st); pInfo->p = st; } @@ -4086,18 +4063,12 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) { } case TSDB_DATA_TYPE_BIGINT: { int64_t *val = (int64_t*) colDataGetData(pInputCol, 0); - for (; i < pInput->numOfRows + pInput->startRowIndex; i += step) { + for (; i < pInput->numOfRows + pInput->startRowIndex; i += 1) { if (colDataIsNull_f(pInputCol->nullbitmap, i)) { continue; } -#ifndef _TD_NINGSI_60 - SPoint1 st = {.key = tsList[i], .val = (double) val[i]}; -#else - SPoint1 st; - st.key = tsList[i]; - st.val = (double)val[i]; -#endif + INIT_INTP_POINT(st, tsList[i], val[i]); pInfo->dOutput += twa_get_area(pInfo->p, st); pInfo->p = st; } @@ -4105,18 +4076,12 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) { } case TSDB_DATA_TYPE_FLOAT: { float *val = (float*) colDataGetData(pInputCol, 0); - for (; i < pInput->numOfRows + pInput->startRowIndex; i += step) { + for (; i < pInput->numOfRows + pInput->startRowIndex; i += 1) { if (colDataIsNull_f(pInputCol->nullbitmap, i)) { continue; } -#ifndef _TD_NINGSI_60 - SPoint1 st = {.key = tsList[i], .val = val[i]}; -#else - SPoint1 st; - st.key = tsList[i]; - st.val = (double)val[i]; -#endif + INIT_INTP_POINT(st, tsList[i], val[i]); pInfo->dOutput += twa_get_area(pInfo->p, st); pInfo->p = st; } @@ -4124,18 +4089,12 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) { } case TSDB_DATA_TYPE_DOUBLE: { double *val = (double*) colDataGetData(pInputCol, 0); - for (; i < pInput->numOfRows + pInput->startRowIndex; i += step) { + for (; i < pInput->numOfRows + pInput->startRowIndex; i += 1) { if (colDataIsNull_f(pInputCol->nullbitmap, i)) { continue; } -#ifndef _TD_NINGSI_60 - SPoint1 st = {.key = tsList[i], .val = val[i]}; -#else - SPoint1 st; - st.key = tsList[i]; - st.val = val[i]; -#endif + INIT_INTP_POINT(st, tsList[i], val[i]); pInfo->dOutput += twa_get_area(pInfo->p, st); pInfo->p = st; } @@ -4143,18 +4102,12 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) { } case TSDB_DATA_TYPE_UTINYINT: { uint8_t *val = (uint8_t*) colDataGetData(pInputCol, 0); - for (; i < pInput->numOfRows + pInput->startRowIndex; i += step) { + for (; i < pInput->numOfRows + pInput->startRowIndex; i += 1) { if (colDataIsNull_f(pInputCol->nullbitmap, i)) { continue; } -#ifndef _TD_NINGSI_60 - SPoint1 st = {.key = tsList[i], .val = val[i]}; -#else - SPoint1 st; - st.key = tsList[i]; - st.val = val[i]; -#endif + INIT_INTP_POINT(st, tsList[i], val[i]); pInfo->dOutput += twa_get_area(pInfo->p, st); pInfo->p = st; } @@ -4162,18 +4115,12 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) { } case TSDB_DATA_TYPE_USMALLINT: { uint16_t *val = (uint16_t*) colDataGetData(pInputCol, 0); - for (; i < pInput->numOfRows + pInput->startRowIndex; i += step) { + for (; i < pInput->numOfRows + pInput->startRowIndex; i += 1) { if (colDataIsNull_f(pInputCol->nullbitmap, i)) { continue; } -#ifndef _TD_NINGSI_60 - SPoint1 st = {.key = tsList[i], .val = val[i]}; -#else - SPoint1 st; - st.key = tsList[i]; - st.val = val[i]; -#endif + INIT_INTP_POINT(st, tsList[i], val[i]); pInfo->dOutput += twa_get_area(pInfo->p, st); pInfo->p = st; } @@ -4181,18 +4128,12 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) { } case TSDB_DATA_TYPE_UINT: { uint32_t *val = (uint32_t*) colDataGetData(pInputCol, 0); - for (; i < pInput->numOfRows + pInput->startRowIndex; i += step) { + for (; i < pInput->numOfRows + pInput->startRowIndex; i += 1) { if (colDataIsNull_f(pInputCol->nullbitmap, i)) { continue; } -#ifndef _TD_NINGSI_60 - SPoint1 st = {.key = tsList[i], .val = val[i]}; -#else - SPoint1 st; - st.key = tsList[i]; - st.val = val[i]; -#endif + INIT_INTP_POINT(st, tsList[i], val[i]); pInfo->dOutput += twa_get_area(pInfo->p, st); pInfo->p = st; } @@ -4200,25 +4141,19 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) { } case TSDB_DATA_TYPE_UBIGINT: { uint64_t *val = (uint64_t*) colDataGetData(pInputCol, 0); - for (; i < pInput->numOfRows + pInput->startRowIndex; i += step) { + for (; i < pInput->numOfRows + pInput->startRowIndex; i += 1) { if (colDataIsNull_f(pInputCol->nullbitmap, i)) { continue; } -#ifndef _TD_NINGSI_60 - SPoint1 st = {.key = tsList[i], .val = (double) val[i]}; -#else - SPoint1 st; - st.key = tsList[i]; - st.val = (double) val[i]; -#endif + INIT_INTP_POINT(st, tsList[i], val[i]); pInfo->dOutput += twa_get_area(pInfo->p, st); pInfo->p = st; } break; } - default: assert(0); + default: ASSERT(0); } // the last interpolated time window value @@ -4229,7 +4164,7 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) { pInfo->win.ekey = pInfo->p.key; - SET_VAL(GET_RES_INFO(pCtx), numOfElems, 1); + SET_VAL(pResInfo, numOfElems, 1); return TSDB_CODE_SUCCESS; } @@ -4250,7 +4185,7 @@ int32_t twaFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock) { SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); STwaInfo *pInfo = (STwaInfo *)GET_ROWCELL_INTERBUF(pResInfo); - if (pInfo->hasResult != DATA_SET_FLAG) { + if (pResInfo->numOfRes == 0) { pResInfo->isNullRes = 1; } else { // assert(pInfo->win.ekey == pInfo->p.key && pInfo->hasResult == pResInfo->hasResult); diff --git a/source/libs/function/src/texpr.c b/source/libs/function/src/texpr.c index b91af2d157..703b19ced7 100644 --- a/source/libs/function/src/texpr.c +++ b/source/libs/function/src/texpr.c @@ -36,12 +36,7 @@ void tExprTreeDestroy(tExprNode *pNode, void (*fp)(void *)) { if (pNode->nodeType == TEXPR_BINARYEXPR_NODE || pNode->nodeType == TEXPR_UNARYEXPR_NODE) { doExprTreeDestroy(&pNode, fp); - } else if (pNode->nodeType == TEXPR_VALUE_NODE) { - taosVariantDestroy(pNode->pVal); - } else if (pNode->nodeType == TEXPR_COL_NODE) { - taosMemoryFreeClear(pNode->pSchema); } - taosMemoryFree(pNode); } @@ -49,15 +44,6 @@ static void doExprTreeDestroy(tExprNode **pExpr, void (*fp)(void *)) { if (*pExpr == NULL) { return; } - - int32_t type = (*pExpr)->nodeType; - if (type == TEXPR_VALUE_NODE) { - taosVariantDestroy((*pExpr)->pVal); - taosMemoryFree((*pExpr)->pVal); - } else if (type == TEXPR_COL_NODE) { - taosMemoryFree((*pExpr)->pSchema); - } - taosMemoryFree(*pExpr); *pExpr = NULL; }