From 2a7e0171d04b06bc4c275aa3a70bdecdf5f78cb3 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 24 Feb 2022 17:18:56 +0800 Subject: [PATCH] [td-13039] support interval query. --- 2.0/src/query/inc/qExecutor.h | 2 +- 2.0/src/query/src/qExecutor.c | 4 +- include/util/tpagedbuf.h | 2 +- source/common/src/tep.c | 1 + source/libs/executor/inc/executil.h | 1 + source/libs/executor/inc/executorimpl.h | 18 +- source/libs/executor/src/executil.c | 1 - source/libs/executor/src/executorimpl.c | 534 ++++++++++---------- source/libs/executor/src/tlinearhash.c | 54 +- source/libs/executor/test/executorTests.cpp | 164 +++++- source/libs/executor/test/lhashTests.cpp | 10 +- source/libs/function/src/taggfunction.c | 2 +- source/util/src/tpagedbuf.c | 137 +++-- 13 files changed, 585 insertions(+), 345 deletions(-) diff --git a/2.0/src/query/inc/qExecutor.h b/2.0/src/query/inc/qExecutor.h index 9c738dad98..970b826303 100644 --- a/2.0/src/query/inc/qExecutor.h +++ b/2.0/src/query/inc/qExecutor.h @@ -589,7 +589,7 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbQueryHandle, SQueryRunt SOperatorInfo* createAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createProjectOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream); -SOperatorInfo* createTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); +SOperatorInfo* createIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createAllTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createSWindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, bool multigroupResult); diff --git a/2.0/src/query/src/qExecutor.c b/2.0/src/query/src/qExecutor.c index ca656b81ff..490584c75a 100644 --- a/2.0/src/query/src/qExecutor.c +++ b/2.0/src/query/src/qExecutor.c @@ -2166,7 +2166,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf } case OP_TimeWindow: { pRuntimeEnv->proot = - createTimeIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); + createIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); int32_t opType = pRuntimeEnv->proot->upstream[0]->operatorType; if (opType != OP_DummyInput && opType != OP_Join) { setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot); @@ -6756,7 +6756,7 @@ SOperatorInfo* createLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorI return pOperator; } -SOperatorInfo* createTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { +SOperatorInfo* createIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo)); pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset); diff --git a/include/util/tpagedbuf.h b/include/util/tpagedbuf.h index decf952bcd..76501b51ad 100644 --- a/include/util/tpagedbuf.h +++ b/include/util/tpagedbuf.h @@ -32,7 +32,7 @@ typedef struct SDiskbasedBuf SDiskbasedBuf; #define DEFAULT_INTERN_BUF_PAGE_SIZE (1024L) // in bytes typedef struct SFilePage { - int64_t num; + int32_t num; char data[]; } SFilePage; diff --git a/source/common/src/tep.c b/source/common/src/tep.c index 89d8127a63..d325f57c91 100644 --- a/source/common/src/tep.c +++ b/source/common/src/tep.c @@ -127,6 +127,7 @@ int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, con case TSDB_DATA_TYPE_USMALLINT: {*(int16_t*) p = *(int16_t*) pData;break;} case TSDB_DATA_TYPE_INT: case TSDB_DATA_TYPE_UINT: {*(int32_t*) p = *(int32_t*) pData;break;} + case TSDB_DATA_TYPE_TIMESTAMP: case TSDB_DATA_TYPE_BIGINT: case TSDB_DATA_TYPE_UBIGINT: {*(int64_t*) p = *(int64_t*) pData;break;} default: diff --git a/source/libs/executor/inc/executil.h b/source/libs/executor/inc/executil.h index e45e02cdd0..e729c868a7 100644 --- a/source/libs/executor/inc/executil.h +++ b/source/libs/executor/inc/executil.h @@ -68,6 +68,7 @@ typedef struct SResultRow { } SResultRow; typedef struct SResultRowInfo { + SResultRow *pCurResult; // current active result row info SResultRow** pResult; // result list // int16_t type:8; // data type for hash key int32_t size; // number of result set diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 1289de004d..56fc1fcdb0 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -453,7 +453,19 @@ typedef struct SAggSupporter { SResultRowPool *pool; // The window result objects pool, all the resultRow Objects are allocated and managed by this object. } SAggSupporter; -typedef struct SOptrBasicInfo STableIntervalOperatorInfo; +typedef struct STableIntervalOperatorInfo { + SOptrBasicInfo binfo; + SDiskbasedBuf *pResultBuf; // query result buffer based on blocked-wised disk file + SGroupResInfo groupResInfo; + SInterval interval; + STimeWindow win; + int32_t precision; + bool timeWindowInterpo; + char **pRow; + SAggSupporter aggSup; + STableQueryInfo *pCurrent; + int32_t order; +} STableIntervalOperatorInfo; typedef struct SAggOperatorInfo { SOptrBasicInfo binfo; @@ -606,8 +618,8 @@ SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SArray SOperatorInfo* createProjectOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream); -SOperatorInfo* createTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, - int32_t numOfOutput); +SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo); + SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createSWindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index e2675115e0..1b901ee9f6 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -54,7 +54,6 @@ int32_t getOutputInterResultBufSize(STaskAttr* pQueryAttr) { } int32_t initResultRowInfo(SResultRowInfo *pResultRowInfo, int32_t size) { -// pResultRowInfo->type = type; pResultRowInfo->size = 0; pResultRowInfo->curPos = -1; pResultRowInfo->capacity = size; diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 6f1a05805a..bd2e0893af 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -141,19 +141,19 @@ static int32_t getExprFunctionId(SExprInfo *pExprInfo) { return 0; } -static void getNextTimeWindow(STaskAttr* pQueryAttr, STimeWindow* tw) { - int32_t factor = GET_FORWARD_DIRECTION_FACTOR(pQueryAttr->order.order); - if (pQueryAttr->interval.intervalUnit != 'n' && pQueryAttr->interval.intervalUnit != 'y') { - tw->skey += pQueryAttr->interval.sliding * factor; - tw->ekey = tw->skey + pQueryAttr->interval.interval - 1; +static void getNextTimeWindow(SInterval* pInterval, int32_t precision, int32_t order, STimeWindow* tw) { + int32_t factor = GET_FORWARD_DIRECTION_FACTOR(order); + if (pInterval->intervalUnit != 'n' && pInterval->intervalUnit != 'y') { + tw->skey += pInterval->sliding * factor; + tw->ekey = tw->skey + pInterval->interval - 1; return; } - int64_t key = tw->skey, interval = pQueryAttr->interval.interval; + int64_t key = tw->skey, interval = pInterval->interval; //convert key to second - key = convertTimePrecision(key, pQueryAttr->precision, TSDB_TIME_PRECISION_MILLI) / 1000; + key = convertTimePrecision(key, precision, TSDB_TIME_PRECISION_MILLI) / 1000; - if (pQueryAttr->interval.intervalUnit == 'y') { + if (pInterval->intervalUnit == 'y') { interval *= 12; } @@ -164,12 +164,12 @@ static void getNextTimeWindow(STaskAttr* pQueryAttr, STimeWindow* tw) { int mon = (int)(tm.tm_year * 12 + tm.tm_mon + interval * factor); tm.tm_year = mon / 12; tm.tm_mon = mon % 12; - tw->skey = convertTimePrecision((int64_t)mktime(&tm) * 1000L, TSDB_TIME_PRECISION_MILLI, pQueryAttr->precision); + tw->skey = convertTimePrecision((int64_t)mktime(&tm) * 1000L, TSDB_TIME_PRECISION_MILLI, precision); mon = (int)(mon + interval); tm.tm_year = mon / 12; tm.tm_mon = mon % 12; - tw->ekey = convertTimePrecision((int64_t)mktime(&tm) * 1000L, TSDB_TIME_PRECISION_MILLI, pQueryAttr->precision); + tw->ekey = convertTimePrecision((int64_t)mktime(&tm) * 1000L, TSDB_TIME_PRECISION_MILLI, precision); tw->ekey -= 1; } @@ -179,7 +179,7 @@ static void setResultOutputBuf(STaskRuntimeEnv* pRuntimeEnv, SResultRow* pResult int32_t numOfCols, int32_t* rowCellInfoOffset); void setResultRowOutputBufInitCtx(STaskRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowCellInfoOffset); -static bool functionNeedToExecute(STaskRuntimeEnv *pRuntimeEnv, SqlFunctionCtx *pCtx); +static bool functionNeedToExecute(SqlFunctionCtx *pCtx); static void setBlockStatisInfo(SqlFunctionCtx *pCtx, SSDataBlock* pSDataBlock, SColumn* pColumn); @@ -222,7 +222,8 @@ static int32_t getGroupbyColumnIndex(SGroupbyExpr *pGroupbyExpr, SSDataBlock* pD static int32_t setGroupResultOutputBuf(STaskRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *binf, int32_t numOfCols, char *pData, int16_t type, int16_t bytes, int32_t groupIndex); static void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size); -static void getAlignQueryTimeWindow(STaskAttr *pQueryAttr, int64_t key, int64_t keyFirst, int64_t keyLast, STimeWindow *win); +static void getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key, int64_t keyFirst, int64_t keyLast, STimeWindow *win); + static void setResultBufSize(STaskAttr* pQueryAttr, SRspResultInfo* pResultInfo); static void setCtxTagForJoin(STaskRuntimeEnv* pRuntimeEnv, SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, void* pTable); static void setParamForStableStddev(STaskRuntimeEnv* pRuntimeEnv, SqlFunctionCtx* pCtx, int32_t numOfOutput, SExprInfo* pExpr); @@ -394,6 +395,10 @@ static void prepareResultListBuffer(SResultRowInfo* pResultRowInfo, jmp_buf env) newCapacity = (int64_t)(pResultRowInfo->capacity * 1.5); } + if (newCapacity == pResultRowInfo->capacity) { + newCapacity += 4; + } + char *t = realloc(pResultRowInfo->pResult, (size_t)(newCapacity * POINTER_BYTES)); if (t == NULL) { longjmp(env, TSDB_CODE_QRY_OUT_OF_MEMORY); @@ -593,19 +598,19 @@ static SResultRow* doSetResultOutBufByKey_rv(SResultRowInfo* pResultRowInfo, int return pResultRowInfo->pResult[pResultRowInfo->curPos]; } -static void getInitialStartTimeWindow(STaskAttr* pQueryAttr, TSKEY ts, STimeWindow* w) { - if (QUERY_IS_ASC_QUERY(pQueryAttr)) { - getAlignQueryTimeWindow(pQueryAttr, ts, ts, pQueryAttr->window.ekey, w); +static void getInitialStartTimeWindow(SInterval* pInterval, int32_t precision, TSKEY ts, STimeWindow* w, TSKEY ekey, bool ascQuery) { + if (ascQuery) { + getAlignQueryTimeWindow(pInterval, precision, ts, ts, ekey, w); } else { // the start position of the first time window in the endpoint that spreads beyond the queried last timestamp - getAlignQueryTimeWindow(pQueryAttr, ts, pQueryAttr->window.ekey, ts, w); + getAlignQueryTimeWindow(pInterval, precision, ts, ekey, ts, w); int64_t key = w->skey; while(key < ts) { // moving towards end - if (pQueryAttr->interval.intervalUnit == 'n' || pQueryAttr->interval.intervalUnit == 'y') { - key = taosTimeAdd(key, pQueryAttr->interval.sliding, pQueryAttr->interval.slidingUnit, pQueryAttr->precision); + if (pInterval->intervalUnit == 'n' || pInterval->intervalUnit == 'y') { + key = taosTimeAdd(key, pInterval->sliding, pInterval->slidingUnit, precision); } else { - key += pQueryAttr->interval.sliding; + key += pInterval->sliding; } if (key >= ts) { @@ -618,39 +623,39 @@ static void getInitialStartTimeWindow(STaskAttr* pQueryAttr, TSKEY ts, STimeWind } // get the correct time window according to the handled timestamp -static STimeWindow getActiveTimeWindow(SResultRowInfo * pResultRowInfo, int64_t ts, STaskAttr *pQueryAttr) { +static STimeWindow getActiveTimeWindow(SResultRowInfo * pResultRowInfo, int64_t ts, SInterval* pInterval, int32_t precision, STimeWindow* win) { STimeWindow w = {0}; if (pResultRowInfo->curPos == -1) { // the first window, from the previous stored value - getInitialStartTimeWindow(pQueryAttr, ts, &w); + getInitialStartTimeWindow(pInterval, precision, ts, &w, win->ekey, true); - if (pQueryAttr->interval.intervalUnit == 'n' || pQueryAttr->interval.intervalUnit == 'y') { - w.ekey = taosTimeAdd(w.skey, pQueryAttr->interval.interval, pQueryAttr->interval.intervalUnit, pQueryAttr->precision) - 1; + if (pInterval->intervalUnit == 'n' || pInterval->intervalUnit == 'y') { + w.ekey = taosTimeAdd(w.skey, pInterval->interval, pInterval->intervalUnit, precision) - 1; } else { - w.ekey = w.skey + pQueryAttr->interval.interval - 1; + w.ekey = w.skey + pInterval->interval - 1; } } else { w = getResultRow(pResultRowInfo, pResultRowInfo->curPos)->win; } if (w.skey > ts || w.ekey < ts) { - if (pQueryAttr->interval.intervalUnit == 'n' || pQueryAttr->interval.intervalUnit == 'y') { - w.skey = taosTimeTruncate(ts, &pQueryAttr->interval, pQueryAttr->precision); - w.ekey = taosTimeAdd(w.skey, pQueryAttr->interval.interval, pQueryAttr->interval.intervalUnit, pQueryAttr->precision) - 1; + if (pInterval->intervalUnit == 'n' || pInterval->intervalUnit == 'y') { + w.skey = taosTimeTruncate(ts, pInterval, precision); + w.ekey = taosTimeAdd(w.skey, pInterval->interval, pInterval->intervalUnit, precision) - 1; } else { int64_t st = w.skey; if (st > ts) { - st -= ((st - ts + pQueryAttr->interval.sliding - 1) / pQueryAttr->interval.sliding) * pQueryAttr->interval.sliding; + st -= ((st - ts + pInterval->sliding - 1) / pInterval->sliding) * pInterval->sliding; } - int64_t et = st + pQueryAttr->interval.interval - 1; + int64_t et = st + pInterval->interval - 1; if (et < ts) { - st += ((ts - et + pQueryAttr->interval.sliding - 1) / pQueryAttr->interval.sliding) * pQueryAttr->interval.sliding; + st += ((ts - et + pInterval->sliding - 1) / pInterval->sliding) * pInterval->sliding; } w.skey = st; - w.ekey = w.skey + pQueryAttr->interval.interval - 1; + w.ekey = w.skey + pInterval->interval - 1; } } @@ -658,10 +663,7 @@ static STimeWindow getActiveTimeWindow(SResultRowInfo * pResultRowInfo, int64_t * query border check, skey should not be bounded by the query time range, since the value skey will * be used as the time window index value. So we only change ekey of time window accordingly. */ - if (w.ekey > pQueryAttr->window.ekey && QUERY_IS_ASC_QUERY(pQueryAttr)) { - w.ekey = pQueryAttr->window.ekey; - } - +// ASSERT(win->skey <= win->ekey); // todo no need this return w; } @@ -670,7 +672,7 @@ static STimeWindow getCurrentActiveTimeWindow(SResultRowInfo * pResultRowInfo, i STimeWindow w = {0}; if (pResultRowInfo->curPos == -1) { // the first window, from the previous stored value - getInitialStartTimeWindow(pQueryAttr, ts, &w); +// getInitialStartTimeWindow(pQueryAttr, ts, &w); if (pQueryAttr->interval.intervalUnit == 'n' || pQueryAttr->interval.intervalUnit == 'y') { w.ekey = taosTimeAdd(w.skey, pQueryAttr->interval.interval, pQueryAttr->interval.intervalUnit, pQueryAttr->precision) - 1; @@ -772,6 +774,37 @@ static int32_t setResultOutputBufByKey(STaskRuntimeEnv *pRuntimeEnv, SResultRowI return TSDB_CODE_SUCCESS; } +static void setResultRowOutputBufInitCtx_rv(SDiskbasedBuf * pBuf, SResultRow *pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowCellInfoOffset); + +static int32_t setResultOutputBufByKey_rv(SResultRowInfo *pResultRowInfo, int64_t id, STimeWindow *win, + bool masterscan, SResultRow **pResult, int64_t tableGroupId, SqlFunctionCtx* pCtx, + int32_t numOfOutput, int32_t* rowCellInfoOffset, SDiskbasedBuf *pBuf, SAggSupporter *pAggSup, SExecTaskInfo* pTaskInfo) { + assert(win->skey <= win->ekey); + SResultRow *pResultRow = doSetResultOutBufByKey_rv(pResultRowInfo, id, (char *)&win->skey, TSDB_KEYSIZE, masterscan, tableGroupId, + pTaskInfo, true, pAggSup); + + if (pResultRow == NULL) { + *pResult = NULL; + return TSDB_CODE_SUCCESS; + } + + // not assign result buffer yet, add new result buffer + if (pResultRow->pageId == -1) { // todo intermediate result size + int32_t ret = addNewWindowResultBuf(pResultRow, pBuf, (int32_t) tableGroupId, 0); + if (ret != TSDB_CODE_SUCCESS) { + return -1; + } + } + + // set time window for current result + pResultRow->win = (*win); + *pResult = pResultRow; + setResultRowOutputBufInitCtx_rv(pBuf, pResultRow, pCtx, numOfOutput, rowCellInfoOffset); + + return TSDB_CODE_SUCCESS; +} + + static void setResultRowInterpo(SResultRow* pResult, SResultTsInterpType type) { assert(pResult != NULL && (type == RESULT_ROW_START_INTERP || type == RESULT_ROW_END_INTERP)); if (type == RESULT_ROW_START_INTERP) { @@ -873,48 +906,44 @@ static void doUpdateResultRowIndex(SResultRowInfo*pResultRowInfo, TSKEY lastKey, } } -static void updateResultRowInfoActiveIndex(SResultRowInfo* pResultRowInfo, STaskAttr* pQueryAttr, TSKEY lastKey) { - bool ascQuery = QUERY_IS_ASC_QUERY(pQueryAttr); - if ((lastKey > pQueryAttr->window.ekey && ascQuery) || (lastKey < pQueryAttr->window.ekey && (!ascQuery))) { +static void updateResultRowInfoActiveIndex(SResultRowInfo* pResultRowInfo, const STimeWindow* pWin, TSKEY lastKey, bool ascQuery, bool interp) { + if ((lastKey > pWin->ekey && ascQuery) || (lastKey < pWin->ekey && (!ascQuery))) { closeAllResultRows(pResultRowInfo); pResultRowInfo->curPos = pResultRowInfo->size - 1; } else { int32_t step = ascQuery ? 1 : -1; - doUpdateResultRowIndex(pResultRowInfo, lastKey - step, ascQuery, pQueryAttr->timeWindowInterpo); + doUpdateResultRowIndex(pResultRowInfo, lastKey - step, ascQuery, interp); } } -static int32_t getNumOfRowsInTimeWindow(STaskRuntimeEnv* pRuntimeEnv, SDataBlockInfo *pDataBlockInfo, TSKEY *pPrimaryColumn, - int32_t startPos, TSKEY ekey, __block_search_fn_t searchFn, bool updateLastKey) { +static int32_t getNumOfRowsInTimeWindow(SDataBlockInfo *pDataBlockInfo, TSKEY *pPrimaryColumn, + int32_t startPos, TSKEY ekey, __block_search_fn_t searchFn, STableQueryInfo* item, int32_t order) { assert(startPos >= 0 && startPos < pDataBlockInfo->rows); - STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; - STableQueryInfo* item = pRuntimeEnv->current; int32_t num = -1; - int32_t order = pQueryAttr->order.order; int32_t step = GET_FORWARD_DIRECTION_FACTOR(order); - if (QUERY_IS_ASC_QUERY(pQueryAttr)) { + if (order == TSDB_ORDER_ASC) { if (ekey < pDataBlockInfo->window.ekey && pPrimaryColumn) { num = getForwardStepsInBlock(pDataBlockInfo->rows, searchFn, ekey, startPos, order, pPrimaryColumn); - if (updateLastKey) { // update the last key + if (item != NULL) { item->lastKey = pPrimaryColumn[startPos + (num - 1)] + step; } } else { num = pDataBlockInfo->rows - startPos; - if (updateLastKey) { + if (item != NULL) { item->lastKey = pDataBlockInfo->window.ekey + step; } } } else { // desc if (ekey > pDataBlockInfo->window.skey && pPrimaryColumn) { num = getForwardStepsInBlock(pDataBlockInfo->rows, searchFn, ekey, startPos, order, pPrimaryColumn); - if (updateLastKey) { // update the last key + if (item != NULL) { item->lastKey = pPrimaryColumn[startPos - (num - 1)] + step; } } else { num = startPos + 1; - if (updateLastKey) { + if (item != NULL) { item->lastKey = pDataBlockInfo->window.skey + step; } } @@ -924,22 +953,18 @@ static int32_t getNumOfRowsInTimeWindow(STaskRuntimeEnv* pRuntimeEnv, SDataBlock return num; } -static void doApplyFunctions(STaskRuntimeEnv* pRuntimeEnv, SqlFunctionCtx* pCtx, STimeWindow* pWin, int32_t offset, - int32_t forwardStep, TSKEY* tsCol, int32_t numOfTotal, int32_t numOfOutput) { - STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr; - bool hasAggregates = pCtx[0].isAggSet; - +static void doApplyFunctions(SqlFunctionCtx* pCtx, STimeWindow* pWin, int32_t offset, int32_t forwardStep, TSKEY* tsCol, + int32_t numOfTotal, int32_t numOfOutput, int32_t order) { for (int32_t k = 0; k < numOfOutput; ++k) { pCtx[k].size = forwardStep; pCtx[k].startTs = pWin->skey; // keep it temporarialy - char* start = NULL;//pCtx[k].pInput; + int32_t startOffset = pCtx[k].startRow; + bool hasAgg = pCtx[k].isAggSet; - int32_t pos = (QUERY_IS_ASC_QUERY(pQueryAttr)) ? offset : offset - (forwardStep - 1); - if (pCtx[k].pInput != NULL) { -// pCtx[k].pInput = (char *)pCtx[k].pInput + pos * pCtx[k].inputBytes; - } + int32_t pos = (order == TSDB_ORDER_ASC) ? offset : offset - (forwardStep - 1); + pCtx[k].startRow = pos; if (tsCol != NULL) { pCtx[k].ptsList = &tsCol[pos]; @@ -951,88 +976,80 @@ static void doApplyFunctions(STaskRuntimeEnv* pRuntimeEnv, SqlFunctionCtx* pCtx, pCtx[k].isAggSet = false; } - if (functionNeedToExecute(pRuntimeEnv, &pCtx[k])) { + if (functionNeedToExecute(&pCtx[k])) { pCtx[k].fpSet->addInput(&pCtx[k]); } // restore it - pCtx[k].isAggSet = hasAggregates; -// pCtx[k].pInput = start; + pCtx[k].isAggSet = hasAgg; + pCtx[k].startRow = startOffset; } } -static int32_t getNextQualifiedWindow(STaskAttr* pQueryAttr, STimeWindow* pNext, SDataBlockInfo* pDataBlockInfo, - TSKEY* primaryKeys, __block_search_fn_t searchFn, int32_t prevPosition) { - getNextTimeWindow(pQueryAttr, pNext); +static int32_t getNextQualifiedWindow(SInterval* pInterval, STimeWindow* pNext, SDataBlockInfo* pDataBlockInfo, + TSKEY* primaryKeys, int32_t prevPosition, STableIntervalOperatorInfo* pInfo) { + int32_t order = pInfo->order; + bool ascQuery = (order == TSDB_ORDER_ASC); + + int32_t precision = pInfo->precision; + getNextTimeWindow(pInterval, precision, order, pNext); // next time window is not in current block - if ((pNext->skey > pDataBlockInfo->window.ekey && QUERY_IS_ASC_QUERY(pQueryAttr)) || - (pNext->ekey < pDataBlockInfo->window.skey && !QUERY_IS_ASC_QUERY(pQueryAttr))) { + if ((pNext->skey > pDataBlockInfo->window.ekey && order == TSDB_ORDER_ASC) || + (pNext->ekey < pDataBlockInfo->window.skey && order == TSDB_ORDER_DESC)) { return -1; } - TSKEY startKey = -1; - if (QUERY_IS_ASC_QUERY(pQueryAttr)) { - startKey = pNext->skey; - if (startKey < pQueryAttr->window.skey) { - startKey = pQueryAttr->window.skey; - } - } else { - startKey = pNext->ekey; - if (startKey > pQueryAttr->window.skey) { - startKey = pQueryAttr->window.skey; - } - } - + TSKEY startKey = ascQuery? pNext->skey:pNext->ekey; int32_t startPos = 0; // tumbling time window query, a special case of sliding time window query - if (pQueryAttr->interval.sliding == pQueryAttr->interval.interval && prevPosition != -1) { - int32_t factor = GET_FORWARD_DIRECTION_FACTOR(pQueryAttr->order.order); + if (pInterval->sliding == pInterval->interval && prevPosition != -1) { + int32_t factor = GET_FORWARD_DIRECTION_FACTOR(order); startPos = prevPosition + factor; } else { - if (startKey <= pDataBlockInfo->window.skey && QUERY_IS_ASC_QUERY(pQueryAttr)) { + if (startKey <= pDataBlockInfo->window.skey && ascQuery) { startPos = 0; - } else if (startKey >= pDataBlockInfo->window.ekey && !QUERY_IS_ASC_QUERY(pQueryAttr)) { + } else if (startKey >= pDataBlockInfo->window.ekey && !ascQuery) { startPos = pDataBlockInfo->rows - 1; } else { - startPos = searchFn((char *)primaryKeys, pDataBlockInfo->rows, startKey, pQueryAttr->order.order); + startPos = binarySearchForKey((char *)primaryKeys, pDataBlockInfo->rows, startKey, order); } } /* interp query with fill should not skip time window */ - if (pQueryAttr->pointInterpQuery && pQueryAttr->fillType != TSDB_FILL_NONE) { - return startPos; - } +// if (pQueryAttr->pointInterpQuery && pQueryAttr->fillType != TSDB_FILL_NONE) { +// return startPos; +// } /* * This time window does not cover any data, try next time window, * this case may happen when the time window is too small */ if (primaryKeys == NULL) { - if (QUERY_IS_ASC_QUERY(pQueryAttr)) { + if (ascQuery) { assert(pDataBlockInfo->window.skey <= pNext->ekey); } else { assert(pDataBlockInfo->window.ekey >= pNext->skey); } } else { - if (QUERY_IS_ASC_QUERY(pQueryAttr) && primaryKeys[startPos] > pNext->ekey) { + if (ascQuery && primaryKeys[startPos] > pNext->ekey) { TSKEY next = primaryKeys[startPos]; - if (pQueryAttr->interval.intervalUnit == 'n' || pQueryAttr->interval.intervalUnit == 'y') { - pNext->skey = taosTimeTruncate(next, &pQueryAttr->interval, pQueryAttr->precision); - pNext->ekey = taosTimeAdd(pNext->skey, pQueryAttr->interval.interval, pQueryAttr->interval.intervalUnit, pQueryAttr->precision) - 1; + if (pInterval->intervalUnit == 'n' || pInterval->intervalUnit == 'y') { + pNext->skey = taosTimeTruncate(next, pInterval, precision); + pNext->ekey = taosTimeAdd(pNext->skey, pInterval->interval, pInterval->intervalUnit, precision) - 1; } else { - pNext->ekey += ((next - pNext->ekey + pQueryAttr->interval.sliding - 1)/pQueryAttr->interval.sliding) * pQueryAttr->interval.sliding; - pNext->skey = pNext->ekey - pQueryAttr->interval.interval + 1; + pNext->ekey += ((next - pNext->ekey + pInterval->sliding - 1)/pInterval->sliding) * pInterval->sliding; + pNext->skey = pNext->ekey - pInterval->interval + 1; } - } else if ((!QUERY_IS_ASC_QUERY(pQueryAttr)) && primaryKeys[startPos] < pNext->skey) { + } else if ((!ascQuery) && primaryKeys[startPos] < pNext->skey) { TSKEY next = primaryKeys[startPos]; - if (pQueryAttr->interval.intervalUnit == 'n' || pQueryAttr->interval.intervalUnit == 'y') { - pNext->skey = taosTimeTruncate(next, &pQueryAttr->interval, pQueryAttr->precision); - pNext->ekey = taosTimeAdd(pNext->skey, pQueryAttr->interval.interval, pQueryAttr->interval.intervalUnit, pQueryAttr->precision) - 1; + if (pInterval->intervalUnit == 'n' || pInterval->intervalUnit == 'y') { + pNext->skey = taosTimeTruncate(next, pInterval, precision); + pNext->ekey = taosTimeAdd(pNext->skey, pInterval->interval, pInterval->intervalUnit, precision) - 1; } else { - pNext->skey -= ((pNext->skey - next + pQueryAttr->interval.sliding - 1) / pQueryAttr->interval.sliding) * pQueryAttr->interval.sliding; - pNext->ekey = pNext->skey + pQueryAttr->interval.interval - 1; + pNext->skey -= ((pNext->skey - next + pInterval->sliding - 1) / pInterval->sliding) * pInterval->sliding; + pNext->ekey = pNext->skey + pInterval->interval - 1; } } } @@ -1069,23 +1086,19 @@ static void setNotInterpoWindowKey(SqlFunctionCtx* pCtx, int32_t numOfOutput, in } } -static void saveDataBlockLastRow(STaskRuntimeEnv* pRuntimeEnv, SDataBlockInfo* pDataBlockInfo, SArray* pDataBlock, - int32_t rowIndex) { +static void saveDataBlockLastRow(char** pRow, SArray* pDataBlock, int32_t rowIndex, int32_t numOfCols) { if (pDataBlock == NULL) { return; } - STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; - for (int32_t k = 0; k < pQueryAttr->numOfCols; ++k) { + for (int32_t k = 0; k < numOfCols; ++k) { SColumnInfoData *pColInfo = taosArrayGet(pDataBlock, k); - memcpy(pRuntimeEnv->prevRow[k], ((char*)pColInfo->pData) + (pColInfo->info.bytes * rowIndex), pColInfo->info.bytes); + memcpy(pRow[k], ((char*)pColInfo->pData) + (pColInfo->info.bytes * rowIndex), pColInfo->info.bytes); } } -static TSKEY getStartTsKey(STaskAttr* pQueryAttr, STimeWindow* win, const TSKEY* tsCols, int32_t rows) { +static TSKEY getStartTsKey(STimeWindow* win, const TSKEY* tsCols, int32_t rows, bool ascQuery) { TSKEY ts = TSKEY_INITIAL_VAL; - - bool ascQuery = QUERY_IS_ASC_QUERY(pQueryAttr); if (tsCols == NULL) { ts = ascQuery? win->skey : win->ekey; } else { @@ -1191,7 +1204,7 @@ static void doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SqlFunctionCtx* pCtx, SSDataBlock* pSDataBlock) { for (int32_t k = 0; k < pOperator->numOfOutput; ++k) { - if (functionNeedToExecute(NULL, &pCtx[k])) { + if (functionNeedToExecute(&pCtx[k])) { pCtx[k].startTs = startTs;// this can be set during create the struct pCtx[k].fpSet->addInput(&pCtx[k]); } @@ -1350,20 +1363,19 @@ static bool setTimeWindowInterpolationEndTs(SOperatorInfo* pOperatorInfo, SqlFun } static void doWindowBorderInterpolation(SOperatorInfo* pOperatorInfo, SSDataBlock* pBlock, SqlFunctionCtx* pCtx, - SResultRow* pResult, STimeWindow* win, int32_t startPos, int32_t forwardStep) { - STaskRuntimeEnv* pRuntimeEnv = pOperatorInfo->pRuntimeEnv; - STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; - if (!pQueryAttr->timeWindowInterpo) { + SResultRow* pResult, STimeWindow* win, int32_t startPos, int32_t forwardStep, int32_t order, bool timeWindowInterpo) { + if (!timeWindowInterpo) { return; } assert(pBlock != NULL); - int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQueryAttr->order.order); + int32_t step = GET_FORWARD_DIRECTION_FACTOR(order); if (pBlock->pDataBlock == NULL){ // tscError("pBlock->pDataBlock == NULL"); return; } + SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, 0); TSKEY *tsCols = (TSKEY *)(pColInfo->pData); @@ -1376,38 +1388,37 @@ static void doWindowBorderInterpolation(SOperatorInfo* pOperatorInfo, SSDataBloc setResultRowInterpo(pResult, RESULT_ROW_START_INTERP); } } else { - setNotInterpoWindowKey(pCtx, pQueryAttr->numOfOutput, RESULT_ROW_START_INTERP); + setNotInterpoWindowKey(pCtx, pOperatorInfo->numOfOutput, RESULT_ROW_START_INTERP); } // point interpolation does not require the end key time window interpolation. - if (pQueryAttr->pointInterpQuery) { - return; - } +// if (pointInterpQuery) { +// return; +// } // interpolation query does not generate the time window end interpolation done = resultRowInterpolated(pResult, RESULT_ROW_END_INTERP); if (!done) { int32_t endRowIndex = startPos + (forwardStep - 1) * step; - TSKEY endKey = QUERY_IS_ASC_QUERY(pQueryAttr)? pBlock->info.window.ekey:pBlock->info.window.skey; + TSKEY endKey = (order == TSDB_ORDER_ASC)? pBlock->info.window.ekey:pBlock->info.window.skey; bool interp = setTimeWindowInterpolationEndTs(pOperatorInfo, pCtx, endRowIndex, pBlock->pDataBlock, tsCols, endKey, win); if (interp) { setResultRowInterpo(pResult, RESULT_ROW_END_INTERP); } } else { - setNotInterpoWindowKey(pCtx, pQueryAttr->numOfOutput, RESULT_ROW_END_INTERP); + setNotInterpoWindowKey(pCtx, pOperatorInfo->numOfOutput, RESULT_ROW_END_INTERP); } } static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pSDataBlock, int32_t tableGroupId) { STableIntervalOperatorInfo* pInfo = (STableIntervalOperatorInfo*) pOperatorInfo->info; - STaskRuntimeEnv* pRuntimeEnv = pOperatorInfo->pRuntimeEnv; - int32_t numOfOutput = pOperatorInfo->numOfOutput; - STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; + SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo; + int32_t numOfOutput = pOperatorInfo->numOfOutput; - int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQueryAttr->order.order); - bool ascQuery = QUERY_IS_ASC_QUERY(pQueryAttr); + int32_t step = 1; + bool ascQuery = true; int32_t prevIndex = pResultRowInfo->curPos; @@ -1420,26 +1431,26 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul } int32_t startPos = ascQuery? 0 : (pSDataBlock->info.rows - 1); - TSKEY ts = getStartTsKey(pQueryAttr, &pSDataBlock->info.window, tsCols, pSDataBlock->info.rows); + TSKEY ts = getStartTsKey(&pSDataBlock->info.window, tsCols, pSDataBlock->info.rows, ascQuery); - STimeWindow win = getActiveTimeWindow(pResultRowInfo, ts, pQueryAttr); - bool masterScan = IS_MAIN_SCAN(pRuntimeEnv); + STimeWindow win = getActiveTimeWindow(pResultRowInfo, ts, &pInfo->interval, pInfo->precision, &pInfo->win); + bool masterScan = true; SResultRow* pResult = NULL; - int32_t ret = setResultOutputBufByKey(pRuntimeEnv, pResultRowInfo, pSDataBlock->info.uid, &win, masterScan, &pResult, tableGroupId, pInfo->pCtx, - numOfOutput, pInfo->rowCellInfoOffset); + int32_t ret = setResultOutputBufByKey_rv(pResultRowInfo, pSDataBlock->info.uid, &win, masterScan, &pResult, tableGroupId, pInfo->binfo.pCtx, + numOfOutput, pInfo->binfo.rowCellInfoOffset, pInfo->pResultBuf, &pInfo->aggSup, pTaskInfo); if (ret != TSDB_CODE_SUCCESS || pResult == NULL) { - longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); + longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } int32_t forwardStep = 0; - TSKEY ekey = reviseWindowEkey(pQueryAttr, &win); + TSKEY ekey = win.ekey; forwardStep = - getNumOfRowsInTimeWindow(pRuntimeEnv, &pSDataBlock->info, tsCols, startPos, ekey, binarySearchForKey, true); + getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC); // prev time window not interpolation yet. int32_t curIndex = pResultRowInfo->curPos; - if (prevIndex != -1 && prevIndex < curIndex && pQueryAttr->timeWindowInterpo) { + if (prevIndex != -1 && prevIndex < curIndex && pInfo->timeWindowInterpo) { for (int32_t j = prevIndex; j < curIndex; ++j) { // previous time window may be all closed already. SResultRow* pRes = getResultRow(pResultRowInfo, j); if (pRes->closed) { @@ -1448,64 +1459,64 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul } STimeWindow w = pRes->win; - ret = setResultOutputBufByKey(pRuntimeEnv, pResultRowInfo, pSDataBlock->info.uid, &w, masterScan, &pResult, - tableGroupId, pInfo->pCtx, numOfOutput, pInfo->rowCellInfoOffset); + ret = setResultOutputBufByKey_rv(pResultRowInfo, pSDataBlock->info.uid, &w, masterScan, &pResult, + tableGroupId, pInfo->binfo.pCtx, numOfOutput, pInfo->binfo.rowCellInfoOffset, pInfo->pResultBuf, &pInfo->aggSup, pTaskInfo); if (ret != TSDB_CODE_SUCCESS) { - longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); + longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } assert(!resultRowInterpolated(pResult, RESULT_ROW_END_INTERP)); - doTimeWindowInterpolation(pOperatorInfo, pInfo, pSDataBlock->pDataBlock, *(TSKEY*)pRuntimeEnv->prevRow[0], -1, + doTimeWindowInterpolation(pOperatorInfo, &pInfo->binfo, pSDataBlock->pDataBlock, *(TSKEY*)pInfo->pRow[0], -1, tsCols[startPos], startPos, w.ekey, RESULT_ROW_END_INTERP); setResultRowInterpo(pResult, RESULT_ROW_END_INTERP); - setNotInterpoWindowKey(pInfo->pCtx, pQueryAttr->numOfOutput, RESULT_ROW_START_INTERP); + setNotInterpoWindowKey(pInfo->binfo.pCtx, pOperatorInfo->numOfOutput, RESULT_ROW_START_INTERP); - doApplyFunctions(pRuntimeEnv, pInfo->pCtx, &w, startPos, 0, tsCols, pSDataBlock->info.rows, numOfOutput); + doApplyFunctions(pInfo->binfo.pCtx, &w, startPos, 0, tsCols, pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); } // restore current time window - ret = setResultOutputBufByKey(pRuntimeEnv, pResultRowInfo, pSDataBlock->info.uid, &win, masterScan, &pResult, tableGroupId, pInfo->pCtx, - numOfOutput, pInfo->rowCellInfoOffset); + ret = setResultOutputBufByKey_rv(pResultRowInfo, pSDataBlock->info.uid, &win, masterScan, &pResult, tableGroupId, pInfo->binfo.pCtx, + numOfOutput, pInfo->binfo.rowCellInfoOffset, pInfo->pResultBuf, &pInfo->aggSup, pTaskInfo); if (ret != TSDB_CODE_SUCCESS) { - longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); + longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } } // window start key interpolation - doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->pCtx, pResult, &win, startPos, forwardStep); - doApplyFunctions(pRuntimeEnv, pInfo->pCtx, &win, startPos, forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput); + doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->binfo.pCtx, pResult, &win, startPos, forwardStep, pInfo->order, false); + doApplyFunctions(pInfo->binfo.pCtx, &win, startPos, forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); STimeWindow nextWin = win; while (1) { int32_t prevEndPos = (forwardStep - 1) * step + startPos; - startPos = getNextQualifiedWindow(pQueryAttr, &nextWin, &pSDataBlock->info, tsCols, binarySearchForKey, prevEndPos); + startPos = getNextQualifiedWindow(&pInfo->interval, &nextWin, &pSDataBlock->info, tsCols, prevEndPos, pInfo); if (startPos < 0) { break; } // null data, failed to allocate more memory buffer - int32_t code = setResultOutputBufByKey(pRuntimeEnv, pResultRowInfo, pSDataBlock->info.uid, &nextWin, masterScan, &pResult, tableGroupId, - pInfo->pCtx, numOfOutput, pInfo->rowCellInfoOffset); + int32_t code = setResultOutputBufByKey_rv(pResultRowInfo, pSDataBlock->info.uid, &nextWin, masterScan, &pResult, tableGroupId, + pInfo->binfo.pCtx, numOfOutput, pInfo->binfo.rowCellInfoOffset, pInfo->pResultBuf, &pInfo->aggSup, pTaskInfo); if (code != TSDB_CODE_SUCCESS || pResult == NULL) { - longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); + longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } - ekey = reviseWindowEkey(pQueryAttr, &nextWin); - forwardStep = getNumOfRowsInTimeWindow(pRuntimeEnv, &pSDataBlock->info, tsCols, startPos, ekey, binarySearchForKey, true); + ekey = nextWin.ekey;//reviseWindowEkey(pQueryAttr, &nextWin); + forwardStep = getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC); // window start(end) key interpolation - doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->pCtx, pResult, &nextWin, startPos, forwardStep); - doApplyFunctions(pRuntimeEnv, pInfo->pCtx, &nextWin, startPos, forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput); + doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->binfo.pCtx, pResult, &nextWin, startPos, forwardStep, pInfo->order, false); + doApplyFunctions(pInfo->binfo.pCtx, &nextWin, startPos, forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); } - if (pQueryAttr->timeWindowInterpo) { + if (pInfo->timeWindowInterpo) { int32_t rowIndex = ascQuery? (pSDataBlock->info.rows-1):0; - saveDataBlockLastRow(pRuntimeEnv, &pSDataBlock->info, pSDataBlock->pDataBlock, rowIndex); + saveDataBlockLastRow(pInfo->pRow, pSDataBlock->pDataBlock, rowIndex, pSDataBlock->info.numOfCols); } - updateResultRowInfoActiveIndex(pResultRowInfo, pQueryAttr, pRuntimeEnv->current->lastKey); +// updateResultRowInfoActiveIndex(pResultRowInfo, &pInfo->win, pRuntimeEnv->current->lastKey, true, false); } @@ -1528,7 +1539,7 @@ static void hashAllIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe } int32_t startPos = ascQuery? 0 : (pSDataBlock->info.rows - 1); - TSKEY ts = getStartTsKey(pQueryAttr, &pSDataBlock->info.window, tsCols, pSDataBlock->info.rows); + TSKEY ts = getStartTsKey(&pSDataBlock->info.window, tsCols, pSDataBlock->info.rows, ascQuery); STimeWindow win = getCurrentActiveTimeWindow(pResultRowInfo, ts, pQueryAttr); bool masterScan = IS_MAIN_SCAN(pRuntimeEnv); @@ -1541,25 +1552,25 @@ static void hashAllIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe while (1) { // null data, failed to allocate more memory buffer ret = setResultOutputBufByKey(pRuntimeEnv, pResultRowInfo, pSDataBlock->info.uid, &win, masterScan, &pResult, - tableGroupId, pInfo->pCtx, numOfOutput, pInfo->rowCellInfoOffset); + tableGroupId, pInfo->binfo.pCtx, numOfOutput, pInfo->binfo.rowCellInfoOffset); if (ret != TSDB_CODE_SUCCESS) { longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } TSKEY ekey = reviseWindowEkey(pQueryAttr, &win); - forwardStep = getNumOfRowsInTimeWindow(pRuntimeEnv, &pSDataBlock->info, tsCols, startPos, ekey, binarySearchForKey, true); +// forwardStep = getNumOfRowsInTimeWindow(pRuntimeEnv, &pSDataBlock->info, tsCols, startPos, ekey, binarySearchForKey, true); // window start(end) key interpolation - doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->pCtx, pResult, &win, startPos, forwardStep); - doApplyFunctions(pRuntimeEnv, pInfo->pCtx, ascQuery ? &win : &preWin, startPos, forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput); +// doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->binfo.pCtx, pResult, &win, startPos, forwardStep); +// doApplyFunctions(pRuntimeEnv, pInfo->binfo.pCtx, ascQuery ? &win : &preWin, startPos, forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput); preWin = win; int32_t prevEndPos = (forwardStep - 1) * step + startPos; - startPos = getNextQualifiedWindow(pQueryAttr, &win, &pSDataBlock->info, tsCols, binarySearchForKey, prevEndPos); +// startPos = getNextQualifiedWindow(pQueryAttr, &win, &pSDataBlock->info, tsCols, binarySearchForKey, prevEndPos); if (startPos < 0) { if ((ascQuery && win.skey <= pQueryAttr->window.ekey) || ((!ascQuery) && win.ekey >= pQueryAttr->window.ekey)) { int32_t code = setResultOutputBufByKey(pRuntimeEnv, pResultRowInfo, pSDataBlock->info.uid, &win, masterScan, &pResult, tableGroupId, - pInfo->pCtx, numOfOutput, pInfo->rowCellInfoOffset); + pInfo->binfo.pCtx, numOfOutput, pInfo->binfo.rowCellInfoOffset); if (code != TSDB_CODE_SUCCESS || pResult == NULL) { longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } @@ -1567,8 +1578,8 @@ static void hashAllIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe startPos = pSDataBlock->info.rows - 1; // window start(end) key interpolation - doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->pCtx, pResult, &win, startPos, forwardStep); - doApplyFunctions(pRuntimeEnv, pInfo->pCtx, ascQuery ? &win : &preWin, startPos, forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput); +// doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->binfo.pCtx, pResult, &win, startPos, forwardStep); +// doApplyFunctions(pRuntimeEnv, pInfo->binfo.pCtx, ascQuery ? &win : &preWin, startPos, forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput); } break; @@ -1578,10 +1589,10 @@ static void hashAllIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe if (pQueryAttr->timeWindowInterpo) { int32_t rowIndex = ascQuery? (pSDataBlock->info.rows-1):0; - saveDataBlockLastRow(pRuntimeEnv, &pSDataBlock->info, pSDataBlock->pDataBlock, rowIndex); +// saveDataBlockLastRow(pRuntimeEnv, &pSDataBlock->info, pSDataBlock->pDataBlock, rowIndex); } - updateResultRowInfoActiveIndex(pResultRowInfo, pQueryAttr, pRuntimeEnv->current->lastKey); +// updateResultRowInfoActiveIndex(pResultRowInfo, pQueryAttr, pRuntimeEnv->current->lastKey); } @@ -1643,7 +1654,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pIn longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR); } - doApplyFunctions(pRuntimeEnv, pInfo->binfo.pCtx, &w, j - num, num, tsList, pSDataBlock->info.rows, pOperator->numOfOutput); +// doApplyFunctions(pRuntimeEnv, pInfo->binfo.pCtx, &w, j - num, num, tsList, pSDataBlock->info.rows, pOperator->numOfOutput); num = 1; memcpy(pInfo->prevData, val, bytes); @@ -1662,7 +1673,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pIn longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR); } - doApplyFunctions(pRuntimeEnv, pInfo->binfo.pCtx, &w, pSDataBlock->info.rows - num, num, tsList, pSDataBlock->info.rows, pOperator->numOfOutput); +// doApplyFunctions(pRuntimeEnv, pInfo->binfo.pCtx, &w, pSDataBlock->info.rows - num, num, tsList, pSDataBlock->info.rows, pOperator->numOfOutput); } tfree(pInfo->prevData); @@ -1712,8 +1723,8 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSWindowOperatorInf longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR); } - doApplyFunctions(pRuntimeEnv, pBInfo->pCtx, &pInfo->curWindow, pInfo->start, pInfo->numOfRows, tsList, - pSDataBlock->info.rows, pOperator->numOfOutput); +// doApplyFunctions(pRuntimeEnv, pBInfo->pCtx, &pInfo->curWindow, pInfo->start, pInfo->numOfRows, tsList, +// pSDataBlock->info.rows, pOperator->numOfOutput); pInfo->curWindow.skey = tsList[j]; pInfo->curWindow.ekey = tsList[j]; @@ -1733,8 +1744,8 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSWindowOperatorInf longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR); } - doApplyFunctions(pRuntimeEnv, pBInfo->pCtx, &pInfo->curWindow, pInfo->start, pInfo->numOfRows, tsList, - pSDataBlock->info.rows, pOperator->numOfOutput); +// doApplyFunctions(pRuntimeEnv, pBInfo->pCtx, &pInfo->curWindow, pInfo->start, pInfo->numOfRows, tsList, +// pSDataBlock->info.rows, pOperator->numOfOutput); } static void setResultRowKey(SResultRow* pResultRow, char* pData, int16_t type) { @@ -1808,7 +1819,7 @@ static int32_t getGroupbyColumnIndex(SGroupbyExpr *pGroupbyExpr, SSDataBlock* pD return -1; } -static bool functionNeedToExecute(STaskRuntimeEnv *pRuntimeEnv, SqlFunctionCtx *pCtx) { +static bool functionNeedToExecute(SqlFunctionCtx *pCtx) { struct SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); // in case of timestamp column, always generated results. @@ -2048,6 +2059,10 @@ static SqlFunctionCtx* createSqlFunctionCtx_rv(SArray* pExprInfo, int32_t** rowC pCtx->resDataInfo.type = pSqlExpr->resSchema.type; pCtx->order = TSDB_ORDER_ASC; + if (i == 0) { + pCtx->functionId = FUNCTION_TS; + } + // pCtx->functionId = pSqlExpr->functionId; // pCtx->stableQuery = pQueryAttr->stableQuery; pCtx->resDataInfo.intermediateBytes = pSqlExpr->interBytes; @@ -2343,22 +2358,22 @@ static bool isCachedLastQuery(STaskAttr *pQueryAttr) { } ///////////////////////////////////////////////////////////////////////////////////////////// - -void getAlignQueryTimeWindow(STaskAttr *pQueryAttr, int64_t key, int64_t keyFirst, int64_t keyLast, STimeWindow *win) { - assert(key >= keyFirst && key <= keyLast && pQueryAttr->interval.sliding <= pQueryAttr->interval.interval); - win->skey = taosTimeTruncate(key, &pQueryAttr->interval, pQueryAttr->precision); +//todo refactor : return window +void getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key, int64_t keyFirst, int64_t keyLast, STimeWindow *win) { + assert(key >= keyFirst && key <= keyLast && pInterval->sliding <= pInterval->interval); + win->skey = taosTimeTruncate(key, pInterval, precision); /* - * if the realSkey > INT64_MAX - pQueryAttr->interval.interval, the query duration between + * if the realSkey > INT64_MAX - pInterval->interval, the query duration between * realSkey and realEkey must be less than one interval.Therefore, no need to adjust the query ranges. */ - if (keyFirst > (INT64_MAX - pQueryAttr->interval.interval)) { - assert(keyLast - keyFirst < pQueryAttr->interval.interval); + if (keyFirst > (INT64_MAX - pInterval->interval)) { + assert(keyLast - keyFirst < pInterval->interval); win->ekey = INT64_MAX; - } else if (pQueryAttr->interval.intervalUnit == 'n' || pQueryAttr->interval.intervalUnit == 'y') { - win->ekey = taosTimeAdd(win->skey, pQueryAttr->interval.interval, pQueryAttr->interval.intervalUnit, pQueryAttr->precision) - 1; + } else if (pInterval->intervalUnit == 'n' || pInterval->intervalUnit == 'y') { + win->ekey = taosTimeAdd(win->skey, pInterval->interval, pInterval->intervalUnit, precision) - 1; } else { - win->ekey = win->skey + pQueryAttr->interval.interval - 1; + win->ekey = win->skey + pInterval->interval - 1; } } @@ -2584,7 +2599,7 @@ static bool overlapWithTimeWindow(STaskAttr* pQueryAttr, SDataBlockInfo* pBlockI TSKEY ek = TMAX(pQueryAttr->window.skey, pQueryAttr->window.ekey); if (QUERY_IS_ASC_QUERY(pQueryAttr)) { - getAlignQueryTimeWindow(pQueryAttr, pBlockInfo->window.skey, sk, ek, &w); +// getAlignQueryTimeWindow(pQueryAttr, pBlockInfo->window.skey, sk, ek, &w); assert(w.ekey >= pBlockInfo->window.skey); if (w.ekey < pBlockInfo->window.ekey) { @@ -2592,7 +2607,7 @@ static bool overlapWithTimeWindow(STaskAttr* pQueryAttr, SDataBlockInfo* pBlockI } while(1) { - getNextTimeWindow(pQueryAttr, &w); +// getNextTimeWindow(pQueryAttr, &w); if (w.skey > pBlockInfo->window.ekey) { break; } @@ -2603,7 +2618,7 @@ static bool overlapWithTimeWindow(STaskAttr* pQueryAttr, SDataBlockInfo* pBlockI } } } else { - getAlignQueryTimeWindow(pQueryAttr, pBlockInfo->window.ekey, sk, ek, &w); +// getAlignQueryTimeWindow(pQueryAttr, pBlockInfo->window.ekey, sk, ek, &w); assert(w.skey <= pBlockInfo->window.ekey); if (w.skey > pBlockInfo->window.skey) { @@ -2611,7 +2626,7 @@ static bool overlapWithTimeWindow(STaskAttr* pQueryAttr, SDataBlockInfo* pBlockI } while(1) { - getNextTimeWindow(pQueryAttr, &w); +// getNextTimeWindow(pQueryAttr, &w); if (w.ekey < pBlockInfo->window.skey) { break; } @@ -3513,9 +3528,6 @@ static void setupEnvForReverseScan(STableScanInfo *pTableScanInfo, SqlFunctionCt } void finalizeQueryResult(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset) { - STaskRuntimeEnv *pRuntimeEnv = pOperator->pRuntimeEnv; -// STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr; - int32_t numOfOutput = pOperator->numOfOutput; // if (pQueryAttr->groupbyColumn || QUERY_IS_INTERVAL_QUERY(pQueryAttr) || pQueryAttr->sw.gap > 0 || pQueryAttr->stateWindow) { // // for each group result, call the finalize function for each column @@ -3908,7 +3920,7 @@ void setIntervalQueryRange(STaskRuntimeEnv *pRuntimeEnv, TSKEY key) { TSKEY sk = TMIN(win.skey, win.ekey); TSKEY ek = TMAX(win.skey, win.ekey); - getAlignQueryTimeWindow(pQueryAttr, win.skey, sk, ek, &w); +// getAlignQueryTimeWindow(pQueryAttr, win.skey, sk, ek, &w); // if (pResultRowInfo->prevSKey == TSKEY_INITIAL_VAL) { // if (!QUERY_IS_ASC_QUERY(pQueryAttr)) { @@ -6364,22 +6376,19 @@ static SSDataBlock* doIntervalAgg(void* param, bool* newgroup) { return NULL; } - STableIntervalOperatorInfo* pIntervalInfo = pOperator->info; + STableIntervalOperatorInfo* pInfo = pOperator->info; - STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; if (pOperator->status == OP_RES_TO_RETURN) { // toSDatablock(pAggInfo->pGroupResInfo, pAggInfo->pResultBuf, pInfo->pRes, pAggInfo->binfo.capacity); - if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) { + if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { doSetOperatorCompleted(pOperator); } - return pIntervalInfo->pRes; + return pInfo->binfo.pRes; } - STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; - int32_t order = pQueryAttr->order.order; - STimeWindow win = pQueryAttr->window; - +// int32_t order = pQueryAttr->order.order; +// STimeWindow win = pQueryAttr->window; SOperatorInfo* downstream = pOperator->pDownstream[0]; while(1) { @@ -6391,30 +6400,30 @@ static SSDataBlock* doIntervalAgg(void* param, bool* newgroup) { break; } -// setTagValue(pOperator, pRuntimeEnv->current->pTable, pIntervalInfo->pCtx, pOperator->numOfOutput); +// setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfOutput); // the pDataBlock are always the same one, no need to call this again - setInputDataBlock(pOperator, pIntervalInfo->pCtx, pBlock, pQueryAttr->order.order); - hashIntervalAgg(pOperator, &pIntervalInfo->resultRowInfo, pBlock, 0); + setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, TSDB_ORDER_ASC); + hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, 0); } // restore the value - pQueryAttr->order.order = order; - pQueryAttr->window = win; +// pQueryAttr->order.order = order; +// pQueryAttr->window = win; pOperator->status = OP_RES_TO_RETURN; - closeAllResultRows(&pIntervalInfo->resultRowInfo); + closeAllResultRows(&pInfo->binfo.resultRowInfo); setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED); - finalizeQueryResult(pOperator, pIntervalInfo->pCtx, &pIntervalInfo->resultRowInfo, pIntervalInfo->rowCellInfoOffset); + finalizeQueryResult(pOperator, pInfo->binfo.pCtx, &pInfo->binfo.resultRowInfo, pInfo->binfo.rowCellInfoOffset); - initGroupResInfo(&pRuntimeEnv->groupResInfo, &pIntervalInfo->resultRowInfo); -// toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes); + initGroupResInfo(&pInfo->groupResInfo, &pInfo->binfo.resultRowInfo); + toSDatablock(&pInfo->groupResInfo, pInfo->pResultBuf, pInfo->binfo.pRes, pInfo->binfo.capacity); - if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) { + if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { doSetOperatorCompleted(pOperator); } - return pIntervalInfo->pRes->info.rows == 0? NULL:pIntervalInfo->pRes; + return pInfo->binfo.pRes->info.rows == 0? NULL:pInfo->binfo.pRes; } static SSDataBlock* doAllIntervalAgg(void* param, bool* newgroup) { @@ -6429,11 +6438,11 @@ static SSDataBlock* doAllIntervalAgg(void* param, bool* newgroup) { if (pOperator->status == OP_RES_TO_RETURN) { // toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes); - if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) { + if (pIntervalInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) { doSetOperatorCompleted(pOperator); } - return pIntervalInfo->pRes; + return pIntervalInfo->binfo.pRes; } STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; @@ -6454,8 +6463,8 @@ static SSDataBlock* doAllIntervalAgg(void* param, bool* newgroup) { // setTagValue(pOperator, pRuntimeEnv->current->pTable, pIntervalInfo->pCtx, pOperator->numOfOutput); // the pDataBlock are always the same one, no need to call this again - setInputDataBlock(pOperator, pIntervalInfo->pCtx, pBlock, pQueryAttr->order.order); - hashAllIntervalAgg(pOperator, &pIntervalInfo->resultRowInfo, pBlock, 0); + setInputDataBlock(pOperator, pIntervalInfo->binfo.pCtx, pBlock, pQueryAttr->order.order); + hashAllIntervalAgg(pOperator, &pIntervalInfo->binfo.resultRowInfo, pBlock, 0); } // restore the value @@ -6463,18 +6472,18 @@ static SSDataBlock* doAllIntervalAgg(void* param, bool* newgroup) { pQueryAttr->window = win; pOperator->status = OP_RES_TO_RETURN; - closeAllResultRows(&pIntervalInfo->resultRowInfo); + closeAllResultRows(&pIntervalInfo->binfo.resultRowInfo); setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED); - finalizeQueryResult(pOperator, pIntervalInfo->pCtx, &pIntervalInfo->resultRowInfo, pIntervalInfo->rowCellInfoOffset); + finalizeQueryResult(pOperator, pIntervalInfo->binfo.pCtx, &pIntervalInfo->binfo.resultRowInfo, pIntervalInfo->binfo.rowCellInfoOffset); - initGroupResInfo(&pRuntimeEnv->groupResInfo, &pIntervalInfo->resultRowInfo); + initGroupResInfo(&pRuntimeEnv->groupResInfo, &pIntervalInfo->binfo.resultRowInfo); // toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes); - if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) { + if (pIntervalInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) { pOperator->status = OP_EXEC_DONE; } - return pIntervalInfo->pRes->info.rows == 0? NULL:pIntervalInfo->pRes; + return pIntervalInfo->binfo.pRes->info.rows == 0? NULL:pIntervalInfo->binfo.pRes; } static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) { @@ -6490,14 +6499,14 @@ static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) { int64_t st = taosGetTimestampUs(); // copyToSDataBlock(NULL, 3000, pIntervalInfo->pRes, pIntervalInfo->rowCellInfoOffset); - if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) { + if (pIntervalInfo->binfo.pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) { doSetOperatorCompleted(pOperator); } SQInfo* pQInfo = pRuntimeEnv->qinfo; pQInfo->summary.firstStageMergeTime += (taosGetTimestampUs() - st); - return pIntervalInfo->pRes; + return pIntervalInfo->binfo.pRes; } STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; @@ -6518,7 +6527,7 @@ static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) { STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current; // setTagValue(pOperator, pTableQueryInfo->pTable, pIntervalInfo->pCtx, pOperator->numOfOutput); - setInputDataBlock(pOperator, pIntervalInfo->pCtx, pBlock, pQueryAttr->order.order); + setInputDataBlock(pOperator, pIntervalInfo->binfo.pCtx, pBlock, pQueryAttr->order.order); setIntervalQueryRange(pRuntimeEnv, pBlock->info.window.skey); hashIntervalAgg(pOperator, &pTableQueryInfo->resInfo, pBlock, pTableQueryInfo->groupIndex); @@ -6530,11 +6539,11 @@ static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) { setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED); // copyToSDataBlock(pRuntimeEnv, 3000, pIntervalInfo->pRes, pIntervalInfo->rowCellInfoOffset); - if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) { + if (pIntervalInfo->binfo.pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) { pOperator->status = OP_EXEC_DONE; } - return pIntervalInfo->pRes; + return pIntervalInfo->binfo.pRes; } static SSDataBlock* doAllSTableIntervalAgg(void* param, bool* newgroup) { @@ -6548,11 +6557,11 @@ static SSDataBlock* doAllSTableIntervalAgg(void* param, bool* newgroup) { if (pOperator->status == OP_RES_TO_RETURN) { // copyToSDataBlock(pRuntimeEnv, 3000, pIntervalInfo->pRes, pIntervalInfo->rowCellInfoOffset); - if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) { + if (pIntervalInfo->binfo.pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) { pOperator->status = OP_EXEC_DONE; } - return pIntervalInfo->pRes; + return pIntervalInfo->binfo.pRes; } STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; @@ -6573,7 +6582,7 @@ static SSDataBlock* doAllSTableIntervalAgg(void* param, bool* newgroup) { STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current; // setTagValue(pOperator, pTableQueryInfo->pTable, pIntervalInfo->pCtx, pOperator->numOfOutput); - setInputDataBlock(pOperator, pIntervalInfo->pCtx, pBlock, pQueryAttr->order.order); + setInputDataBlock(pOperator, pIntervalInfo->binfo.pCtx, pBlock, pQueryAttr->order.order); setIntervalQueryRange(pRuntimeEnv, pBlock->info.window.skey); hashAllIntervalAgg(pOperator, &pTableQueryInfo->resInfo, pBlock, pTableQueryInfo->groupIndex); @@ -6586,14 +6595,14 @@ static SSDataBlock* doAllSTableIntervalAgg(void* param, bool* newgroup) { int64_t st = taosGetTimestampUs(); // copyToSDataBlock(pRuntimeEnv, 3000, pIntervalInfo->pRes, pIntervalInfo->rowCellInfoOffset); - if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) { + if (pIntervalInfo->binfo.pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) { pOperator->status = OP_EXEC_DONE; } SQInfo* pQInfo = pRuntimeEnv->qinfo; pQInfo->summary.firstStageMergeTime += (taosGetTimestampUs() - st); - return pIntervalInfo->pRes; + return pIntervalInfo->binfo.pRes; } static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorInfo *pInfo, SSDataBlock *pSDataBlock) { @@ -6645,8 +6654,8 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR); } - doApplyFunctions(pRuntimeEnv, pBInfo->pCtx, &pInfo->curWindow, pInfo->start, pInfo->numOfRows, tsList, - pSDataBlock->info.rows, pOperator->numOfOutput); +// doApplyFunctions(pRuntimeEnv, pBInfo->pCtx, &pInfo->curWindow, pInfo->start, pInfo->numOfRows, tsList, +// pSDataBlock->info.rows, pOperator->numOfOutput); pInfo->curWindow.skey = tsList[j]; pInfo->curWindow.ekey = tsList[j]; @@ -6667,8 +6676,8 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR); } - doApplyFunctions(pRuntimeEnv, pBInfo->pCtx, &pInfo->curWindow, pInfo->start, pInfo->numOfRows, tsList, - pSDataBlock->info.rows, pOperator->numOfOutput); +// doApplyFunctions(pRuntimeEnv, pBInfo->pCtx, &pInfo->curWindow, pInfo->start, pInfo->numOfRows, tsList, +// pSDataBlock->info.rows, pOperator->numOfOutput); } static SSDataBlock* doStateWindowAgg(void *param, bool* newgroup) { @@ -7268,27 +7277,44 @@ SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorIn return pOperator; } -SOperatorInfo* createTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput) { +SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo) { STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo)); - pInfo->pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset); - pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); - initResultRowInfo(&pInfo->resultRowInfo, 8); + initAggSup(&pInfo->aggSup, pExprInfo); + + // todo: + pInfo->order = TSDB_ORDER_ASC; + pInfo->precision = TSDB_TIME_PRECISION_MICRO; + pInfo->win.skey = INT64_MIN; + pInfo->win.ekey = INT64_MAX; + pInfo->interval.intervalUnit = 's'; + pInfo->interval.slidingUnit = 's'; + pInfo->interval.interval = 1000; + pInfo->interval.sliding = 1000; + + int32_t code = createDiskbasedBuf(&pInfo->pResultBuf, 4096, 4096 * 256, 0, "/tmp/"); + + int32_t numOfOutput = taosArrayGetSize(pExprInfo); + pInfo->binfo.pCtx = createSqlFunctionCtx_rv(pExprInfo, &pInfo->binfo.rowCellInfoOffset, &pInfo->binfo.resRowSize); + pInfo->binfo.pRes = createOutputBuf_rv(pExprInfo, pInfo->binfo.capacity); + + initResultRowInfo(&pInfo->binfo.resultRowInfo, (int32_t)1); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "TimeIntervalAggOperator"; -// pOperator->operatorType = OP_TimeWindow; + pOperator->operatorType = OP_TimeWindow; pOperator->blockingOptr = true; pOperator->status = OP_IN_EXECUTING; - pOperator->pExpr = pExpr; + pOperator->pExpr = exprArrayDup(pExprInfo); + + pOperator->pTaskInfo = pTaskInfo; pOperator->numOfOutput = numOfOutput; pOperator->info = pInfo; - pOperator->pRuntimeEnv = pRuntimeEnv; pOperator->exec = doIntervalAgg; pOperator->cleanupFn = destroyBasicOperatorInfo; - int32_t code = appendDownstream(pOperator, &downstream, 1); + code = appendDownstream(pOperator, &downstream, 1); return pOperator; } @@ -7296,9 +7322,9 @@ SOperatorInfo* createTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOpe SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput) { STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo)); - pInfo->pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset); - pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); - initResultRowInfo(&pInfo->resultRowInfo, 8); + pInfo->binfo.pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); + pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); + initResultRowInfo(&pInfo->binfo.resultRowInfo, 8); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); @@ -7369,9 +7395,9 @@ SOperatorInfo* createSWindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperator SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput) { STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo)); - pInfo->pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset); - pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); - initResultRowInfo(&pInfo->resultRowInfo, 8); + pInfo->binfo.pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); + pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); + initResultRowInfo(&pInfo->binfo.resultRowInfo, 8); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "MultiTableTimeIntervalOperator"; @@ -7393,9 +7419,9 @@ SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntim SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput) { STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo)); - pInfo->pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset); - pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); - initResultRowInfo(&pInfo->resultRowInfo, 8); + pInfo->binfo.pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); + pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); + initResultRowInfo(&pInfo->binfo.resultRowInfo, 8); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "AllMultiTableTimeIntervalOperator"; @@ -7458,7 +7484,7 @@ SOperatorInfo* createFillOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInf TSKEY sk = TMIN(pQueryAttr->window.skey, pQueryAttr->window.ekey); TSKEY ek = TMAX(pQueryAttr->window.skey, pQueryAttr->window.ekey); - getAlignQueryTimeWindow(pQueryAttr, pQueryAttr->window.skey, sk, ek, &w); +// getAlignQueryTimeWindow(pQueryAttr, pQueryAttr->window.skey, sk, ek, &w); pInfo->pFillInfo = taosCreateFillInfo(pQueryAttr->order.order, w.skey, 0, (int32_t)pRuntimeEnv->resultInfo.capacity, numOfOutput, diff --git a/source/libs/executor/src/tlinearhash.c b/source/libs/executor/src/tlinearhash.c index 803ce8bba2..3a58253d81 100644 --- a/source/libs/executor/src/tlinearhash.c +++ b/source/libs/executor/src/tlinearhash.c @@ -44,8 +44,8 @@ typedef struct SLHashObj { * +-----------+-------+--------+ */ typedef struct SLHashNode { - int32_t keyLen; - int32_t dataLen; + uint16_t keyLen; + uint16_t dataLen; } SLHashNode; #define GET_LHASH_NODE_KEY(_n) (((char*)(_n)) + sizeof(SLHashNode)) @@ -70,10 +70,10 @@ static int32_t doGetRelatedSplitBucketId(int32_t bucketId, int32_t bits) { } static void doCopyObject(char* p, const void* key, int32_t keyLen, const void* data, int32_t size) { - *(int32_t*) p = keyLen; - p += sizeof(int32_t); - *(int32_t*) p = size; - p += sizeof(int32_t); + *(uint16_t*) p = keyLen; + p += sizeof(uint16_t); + *(uint16_t*) p = size; + p += sizeof(uint16_t); memcpy(p, key, keyLen); p += keyLen; @@ -118,7 +118,7 @@ static int32_t doAddToBucket(SLHashObj* pHashObj, SLHashBucket* pBucket, int32_t } pBucket->size += 1; - printf("===> add to bucket:0x%x, num:%d, key:%d\n", index, pBucket->size, *(int*) key); +// printf("===> add to bucket:0x%x, num:%d, key:%d\n", index, pBucket->size, *(int*) key); return TSDB_CODE_SUCCESS; } @@ -154,6 +154,14 @@ static void doCompressBucketPages(SLHashObj *pHashObj, SLHashBucket* pBucket) { int32_t* pageId = taosArrayGetLast(pBucket->pPageIdList); SFilePage* pLast = getBufPage(pHashObj->pBuf, *pageId); + if (pLast->num <= sizeof(SFilePage)) { + // this is empty + dBufSetBufPageRecycled(pHashObj->pBuf, pLast); + releaseBufPage(pHashObj->pBuf, pFirst); + taosArrayRemove(pBucket->pPageIdList, numOfPages - 1); + return; + } + char* pStart = pLast->data; int32_t nodeSize = GET_LHASH_NODE_LEN(pStart); while (1) { @@ -162,21 +170,33 @@ static void doCompressBucketPages(SLHashObj *pHashObj, SLHashBucket* pBucket) { SLHashNode* pNode = (SLHashNode*)pStart; doCopyObject(p, GET_LHASH_NODE_KEY(pStart), pNode->keyLen, GET_LHASH_NODE_DATA(pStart), pNode->dataLen); + setBufPageDirty(pFirst, true); + setBufPageDirty(pLast, true); + + ASSERT(pLast->num >= nodeSize + sizeof(SFilePage)); pFirst->num += nodeSize; pLast->num -= nodeSize; + pStart += nodeSize; - if (pStart - pLast->data >= pLast->num) { + if (pLast->num <= sizeof(SFilePage)) { // this is empty dBufSetBufPageRecycled(pHashObj->pBuf, pLast); + releaseBufPage(pHashObj->pBuf, pFirst); taosArrayRemove(pBucket->pPageIdList, numOfPages - 1); break; } nodeSize = GET_LHASH_NODE_LEN(pStart); } else { // move to the front of pLast page - memmove(pLast->data, pStart,(((char*)pLast) + pLast->num - pStart)); + if (pStart != pLast->data) { + memmove(pLast->data, pStart, (((char*)pLast) + pLast->num - pStart)); + setBufPageDirty(pLast, true); + } + + releaseBufPage(pHashObj->pBuf, pLast); + releaseBufPage(pHashObj->pBuf, pFirst); break; } } @@ -216,7 +236,7 @@ static int32_t doAddNewBucket(SLHashObj* pHashObj) { taosArrayPush(pBucket->pPageIdList, &pageId); pHashObj->numOfBuckets += 1; - printf("---------------add new bucket, id:0x%x, total:%d\n", pHashObj->numOfBuckets - 1, pHashObj->numOfBuckets); +// printf("---------------add new bucket, id:0x%x, total:%d\n", pHashObj->numOfBuckets - 1, pHashObj->numOfBuckets); return TSDB_CODE_SUCCESS; } @@ -281,7 +301,7 @@ int32_t tHashPut(SLHashObj* pHashObj, const void *key, size_t keyLen, void *data if (v >= pHashObj->numOfBuckets) { int32_t newBucketId = doGetAlternativeBucketId(v, pHashObj->bits, pHashObj->numOfBuckets); - printf("bucketId: 0x%x not exists, put it into 0x%x instead\n", v, newBucketId); +// printf("bucketId: 0x%x not exists, put it into 0x%x instead\n", v, newBucketId); v = newBucketId; } @@ -305,7 +325,7 @@ int32_t tHashPut(SLHashObj* pHashObj, const void *key, size_t keyLen, void *data int32_t numOfBits = ceil(log(pHashObj->numOfBuckets) / log(2)); if (numOfBits > pHashObj->bits) { - printf("extend the bits from %d to %d, new bucket:%d\n", pHashObj->bits, numOfBits, newBucketId); +// printf("extend the bits from %d to %d, new bucket:%d\n", pHashObj->bits, numOfBits, newBucketId); ASSERT(numOfBits == pHashObj->bits + 1); pHashObj->bits = numOfBits; } @@ -314,7 +334,7 @@ int32_t tHashPut(SLHashObj* pHashObj, const void *key, size_t keyLen, void *data // load all data in this bucket and check if the data needs to relocated into the new bucket SLHashBucket* pBucket = pHashObj->pBucket[splitBucketId]; - printf("split %d items' bucket:0x%x to new bucket:0x%x\n", pBucket->size, splitBucketId, newBucketId); +// printf("split %d items' bucket:0x%x to new bucket:0x%x\n", pBucket->size, splitBucketId, newBucketId); for (int32_t i = 0; i < taosArrayGetSize(pBucket->pPageIdList); ++i) { int32_t pageId = *(int32_t*)taosArrayGet(pBucket->pPageIdList, i); @@ -331,14 +351,14 @@ int32_t tHashPut(SLHashObj* pHashObj, const void *key, size_t keyLen, void *data if (v1 != splitBucketId) { // place it into the new bucket ASSERT(v1 == newBucketId); - printf("move key:%d to 0x%x bucket, remain items:%d\n", *(int32_t*)k, v1, pBucket->size - 1); +// printf("move key:%d to 0x%x bucket, remain items:%d\n", *(int32_t*)k, v1, pBucket->size - 1); SLHashBucket* pNewBucket = pHashObj->pBucket[newBucketId]; doAddToBucket(pHashObj, pNewBucket, newBucketId, (void*)GET_LHASH_NODE_KEY(pNode), pNode->keyLen, GET_LHASH_NODE_KEY(pNode), pNode->dataLen); doRemoveFromBucket(p, pNode, pBucket); } else { - printf("check key:%d, located into: %d, skip it\n", *(int*) k, v1); +// printf("check key:%d, located into: %d, skip it\n", *(int*) k, v1); int32_t nodeSize = GET_LHASH_NODE_LEN(pStart); pStart += nodeSize; @@ -398,8 +418,8 @@ void tHashPrint(const SLHashObj* pHashObj, int32_t type) { if (type == LINEAR_HASH_DATA) { for (int32_t i = 0; i < pHashObj->numOfBuckets; ++i) { - printf("bucket: 0x%x, obj:%d, page:%d\n", i, pHashObj->pBucket[i]->size, - (int)taosArrayGetSize(pHashObj->pBucket[i]->pPageIdList)); +// printf("bucket: 0x%x, obj:%d, page:%d\n", i, pHashObj->pBucket[i]->size, +// (int)taosArrayGetSize(pHashObj->pBucket[i]->pPageIdList)); } } else { dBufPrintStatis(pHashObj->pBuf); diff --git a/source/libs/executor/test/executorTests.cpp b/source/libs/executor/test/executorTests.cpp index c0fb899a2d..c3757134d5 100644 --- a/source/libs/executor/test/executorTests.cpp +++ b/source/libs/executor/test/executorTests.cpp @@ -14,6 +14,7 @@ */ #include +#include #include #include #include @@ -47,6 +48,8 @@ typedef struct SDummyInputInfo { int32_t startVal; int32_t type; int32_t numOfRowsPerPage; + int32_t numOfCols; // number of columns + int64_t tsStart; SSDataBlock* pBlock; } SDummyInputInfo; @@ -117,16 +120,96 @@ SSDataBlock* getDummyBlock(void* param, bool* newgroup) { return pBlock; } -SOperatorInfo* createDummyOperator(int32_t startVal, int32_t numOfBlocks, int32_t rowsPerPage, int32_t type) { +SSDataBlock* get2ColsDummyBlock(void* param, bool* newgroup) { + SOperatorInfo* pOperator = static_cast(param); + SDummyInputInfo* pInfo = static_cast(pOperator->info); + if (pInfo->current >= pInfo->totalPages) { + return NULL; + } + + if (pInfo->pBlock == NULL) { + pInfo->pBlock = static_cast(calloc(1, sizeof(SSDataBlock))); + + pInfo->pBlock->pDataBlock = taosArrayInit(4, sizeof(SColumnInfoData)); + + SColumnInfoData colInfo = {0}; + colInfo.info.type = TSDB_DATA_TYPE_TIMESTAMP; + colInfo.info.bytes = sizeof(int64_t); + colInfo.info.colId = 1; + colInfo.pData = static_cast(calloc(pInfo->numOfRowsPerPage, sizeof(int64_t))); +// colInfo.nullbitmap = static_cast(calloc(1, (pInfo->numOfRowsPerPage + 7) / 8)); + + taosArrayPush(pInfo->pBlock->pDataBlock, &colInfo); + + SColumnInfoData colInfo1 = {0}; + colInfo1.info.type = TSDB_DATA_TYPE_INT; + colInfo1.info.bytes = 4; + colInfo1.info.colId = 2; + + colInfo1.pData = static_cast(calloc(pInfo->numOfRowsPerPage, sizeof(int32_t))); + colInfo1.nullbitmap = static_cast(calloc(1, (pInfo->numOfRowsPerPage + 7) / 8)); + + taosArrayPush(pInfo->pBlock->pDataBlock, &colInfo1); + } else { + blockDataClearup(pInfo->pBlock, false); + } + + SSDataBlock* pBlock = pInfo->pBlock; + + char buf[128] = {0}; + char b1[128] = {0}; + int64_t ts = 0; + int32_t v = 0; + for(int32_t i = 0; i < pInfo->numOfRowsPerPage; ++i) { + SColumnInfoData* pColInfo = static_cast(TARRAY_GET_ELEM(pBlock->pDataBlock, 0)); + + ts = (++pInfo->tsStart); + colDataAppend(pColInfo, i, reinterpret_cast(&ts), false); + + SColumnInfoData* pColInfo1 = static_cast(TARRAY_GET_ELEM(pBlock->pDataBlock, 1)); + if (pInfo->type == data_desc) { + v = (--pInfo->startVal); + } else if (pInfo->type == data_asc) { + v = ++pInfo->startVal; + } else if (pInfo->type == data_rand) { + v = random(); + } + + colDataAppend(pColInfo1, i, reinterpret_cast(&v), false); + +// sprintf(buf, "this is %d row", i); +// STR_TO_VARSTR(b1, buf); +// +// SColumnInfoData* pColInfo2 = static_cast(TARRAY_GET_ELEM(pBlock->pDataBlock, 1)); +// colDataAppend(pColInfo2, i, b1, false); + } + + pBlock->info.rows = pInfo->numOfRowsPerPage; + pBlock->info.numOfCols = 1; + + pInfo->current += 1; + + blockDataUpdateTsWindow(pBlock); + return pBlock; + +} + +SOperatorInfo* createDummyOperator(int32_t startVal, int32_t numOfBlocks, int32_t rowsPerPage, int32_t type, int32_t numOfCols) { SOperatorInfo* pOperator = static_cast(calloc(1, sizeof(SOperatorInfo))); pOperator->name = "dummyInputOpertor4Test"; - pOperator->exec = getDummyBlock; + + if (numOfCols == 1) { + pOperator->exec = getDummyBlock; + } else { + pOperator->exec = get2ColsDummyBlock; + } SDummyInputInfo *pInfo = (SDummyInputInfo*) calloc(1, sizeof(SDummyInputInfo)); pInfo->totalPages = numOfBlocks; pInfo->startVal = startVal; pInfo->numOfRowsPerPage = rowsPerPage; pInfo->type = type; + pInfo->tsStart = 1620000000000; pOperator->info = pInfo; return pOperator; @@ -357,8 +440,6 @@ TEST(testCase, external_sort_Test) { taosArrayDestroy(pOrderVal); } - - TEST(testCase, sorted_merge_Test) { srand(time(NULL)); @@ -432,4 +513,79 @@ TEST(testCase, sorted_merge_Test) { } #endif + +TEST(testCase, time_interval_Operator_Test) { + srand(time(NULL)); + + SArray* pOrderVal = taosArrayInit(4, sizeof(SOrder)); + SOrder o = {0}; + o.order = TSDB_ORDER_ASC; + o.col.info.colId = 1; + o.col.info.type = TSDB_DATA_TYPE_INT; + taosArrayPush(pOrderVal, &o); + + SArray* pExprInfo = taosArrayInit(4, sizeof(SExprInfo)); + SExprInfo *exp = static_cast(calloc(1, sizeof(SExprInfo))); + exp->base.resSchema = createSchema(TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), 1, "ts"); + exp->base.pColumns = static_cast(calloc(1, sizeof(SColumn))); + exp->base.pColumns->flag = TSDB_COL_NORMAL; + exp->base.pColumns->info = (SColumnInfo) {.colId = 1, .type = TSDB_DATA_TYPE_TIMESTAMP, .bytes = 8}; + exp->base.numOfCols = 1; + + taosArrayPush(pExprInfo, &exp); + + SExprInfo *exp1 = static_cast(calloc(1, sizeof(SExprInfo))); + exp1->base.resSchema = createSchema(TSDB_DATA_TYPE_BIGINT, 8, 2, "res1"); + exp1->base.pColumns = static_cast(calloc(1, sizeof(SColumn))); + exp1->base.pColumns->flag = TSDB_COL_NORMAL; + exp1->base.pColumns->info = (SColumnInfo) {.colId = 1, .type = TSDB_DATA_TYPE_INT, .bytes = 4}; + exp1->base.numOfCols = 1; + + taosArrayPush(pExprInfo, &exp1); + + SOperatorInfo* p = createDummyOperator(1, 1, 2000, data_asc, 2); + + SExecTaskInfo ti = {0}; + SOperatorInfo* pOperator = createIntervalOperatorInfo(p, pExprInfo, &ti); + + bool newgroup = false; + SSDataBlock* pRes = NULL; + + int32_t total = 1; + + int64_t s1 = taosGetTimestampUs(); + int32_t t = 1; + + while(1) { + int64_t s = taosGetTimestampUs(); + pRes = pOperator->exec(pOperator, &newgroup); + + int64_t e = taosGetTimestampUs(); + if (t++ == 1) { + printf("---------------elapsed:%ld\n", e - s); + } + + if (pRes == NULL) { + break; + } + + SColumnInfoData* pCol1 = static_cast(taosArrayGet(pRes->pDataBlock, 0)); +// SColumnInfoData* pCol2 = static_cast(taosArrayGet(pRes->pDataBlock, 1)); + for (int32_t i = 0; i < pRes->info.rows; ++i) { +// char* p = colDataGetData(pCol2, i); + printf("%d: %ld\n", total++, ((int64_t*)pCol1->pData)[i]); +// printf("%d: %d, %s\n", total++, ((int32_t*)pCol1->pData)[i], (char*)varDataVal(p)); + } + } + + int64_t s2 = taosGetTimestampUs(); + printf("total:%ld\n", s2 - s1); + + pOperator->cleanupFn(pOperator->info, 2); + tfree(exp); + tfree(exp1); + taosArrayDestroy(pExprInfo); + taosArrayDestroy(pOrderVal); +} + #pragma GCC diagnostic pop diff --git a/source/libs/executor/test/lhashTests.cpp b/source/libs/executor/test/lhashTests.cpp index d0fe9c5ac0..66ef3b0877 100644 --- a/source/libs/executor/test/lhashTests.cpp +++ b/source/libs/executor/test/lhashTests.cpp @@ -28,9 +28,9 @@ TEST(testCase, linear_hash_Tests) { srand(time(NULL)); _hash_fn_t fn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT); -#if 1 - SLHashObj* pHashObj = tHashInit(10, 128 + 8, fn, 8); - for(int32_t i = 0; i < 100; ++i) { +#if 0 + SLHashObj* pHashObj = tHashInit(256, 4096, fn, 320); + for(int32_t i = 0; i < 5000000; ++i) { int32_t code = tHashPut(pHashObj, &i, sizeof(i), &i, sizeof(i)); assert(code == 0); } @@ -46,13 +46,13 @@ TEST(testCase, linear_hash_Tests) { // } // } - tHashPrint(pHashObj, LINEAR_HASH_DATA); + tHashPrint(pHashObj, LINEAR_HASH_STATIS); tHashCleanup(pHashObj); #endif #if 0 SHashObj* pHashObj = taosHashInit(1000, fn, false, HASH_NO_LOCK); - for(int32_t i = 0; i < 500000; ++i) { + for(int32_t i = 0; i < 1000000; ++i) { taosHashPut(pHashObj, &i, sizeof(i), &i, sizeof(i)); } diff --git a/source/libs/function/src/taggfunction.c b/source/libs/function/src/taggfunction.c index 0615ff9627..0c2ce821d2 100644 --- a/source/libs/function/src/taggfunction.c +++ b/source/libs/function/src/taggfunction.c @@ -4395,7 +4395,7 @@ SFunctionFpSet fpSet[1] = { .addInput = count_function, .finalize = doFinalizer, .combine = count_func_merge, - } + }, }; SAggFunctionInfo aggFunc[35] = {{ diff --git a/source/util/src/tpagedbuf.c b/source/util/src/tpagedbuf.c index 3fe3ddccde..d2d1b5a367 100644 --- a/source/util/src/tpagedbuf.c +++ b/source/util/src/tpagedbuf.c @@ -5,12 +5,6 @@ #include "tcompression.h" #include "thash.h" -//enum { -// true = 0x1, -// BUF_PAGE_RELEASED = 0x2, -// true = 0x3, -//}; - #define GET_DATA_PAYLOAD(_p) ((char *)(_p)->pData + POINTER_BYTES) #define NO_IN_MEM_AVAILABLE_PAGES(_b) (listNEles((_b)->lruList) >= (_b)->inMemPages) @@ -20,7 +14,7 @@ typedef struct SPageDiskInfo { } SPageDiskInfo, SFreeListItem; struct SPageInfo { - SListNode* pn; // point to list node + SListNode* pn; // point to list node struct void* pData; int64_t offset; int32_t pageId; @@ -38,7 +32,8 @@ struct SDiskbasedBuf { char* path; // file path int32_t pageSize; // current used page size int32_t inMemPages; // numOfPages that are allocated in memory - SHashObj* groupSet; // id hash table + SList* freePgList; // free page list + SHashObj* groupSet; // id hash table, todo remove it SHashObj* all; SList* lruList; void* emptyDummyIdList; // dummy id list @@ -110,6 +105,14 @@ static uint64_t allocatePositionInFile(SDiskbasedBuf* pBuf, size_t size) { } } +static void setPageNotInBuf(SPageInfo* pPageInfo) { + pPageInfo->pData = NULL; +} + +static FORCE_INLINE size_t getAllocPageSize(int32_t pageSize) { + return pageSize + POINTER_BYTES + 2; +} + /** * +--------------------------+-------------------+--------------+ * | PTR to SPageInfo (8bytes)| Payload (PageSize)| 2 Extra Bytes| @@ -189,17 +192,17 @@ static char* doFlushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) { } } else {// NOTE: the size may be -1, the this recycle page has not been flushed to disk yet. size = pg->length; + if (size == -1) { + printf("----\n"); + } } ASSERT(size > 0 || (pg->offset == -1 && pg->length == -1)); char* pDataBuf = pg->pData; - memset(pDataBuf, 0, pBuf->pageSize); - - pg->pData = NULL; // this means the data is not in buffer - pg->length = size; - pg->dirty = false; + memset(pDataBuf, 0, getAllocPageSize(pBuf->pageSize)); + pg->length = size; // on disk size return pDataBuf; } @@ -214,7 +217,11 @@ static char* flushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) { } } - return doFlushPageToDisk(pBuf, pg); + char* p = doFlushPageToDisk(pBuf, pg); + setPageNotInBuf(pg); + pg->dirty = false; + + return p; } // load file block data in disk @@ -284,12 +291,23 @@ static SListNode* getEldestUnrefedPage(SDiskbasedBuf* pBuf) { assert(pageInfo->pageId >= 0 && pageInfo->pn == pn); if (!pageInfo->used) { +// printf("%d is chosen\n", pageInfo->pageId); break; } else { - printf("page %d is used, dirty:%d\n", pageInfo->pageId, pageInfo->dirty); +// printf("page %d is used, dirty:%d\n", pageInfo->pageId, pageInfo->dirty); } } +// int32_t pos = listNEles(pBuf->lruList); +// SListIter iter1 = {0}; +// tdListInitIter(pBuf->lruList, &iter1, TD_LIST_BACKWARD); +// SListNode* pn1 = NULL; +// while((pn1 = tdListNext(&iter1)) != NULL) { +// SPageInfo* pageInfo = *(SPageInfo**) pn1->data; +// printf("page %d is used, dirty:%d, pos:%d\n", pageInfo->pageId, pageInfo->dirty, pos - 1); +// pos -= 1; +// } + return pn; } @@ -333,10 +351,6 @@ static void lruListMoveToFront(SList *pList, SPageInfo* pi) { tdListPrependNode(pList, pi->pn); } -static FORCE_INLINE size_t getAllocPageSize(int32_t pageSize) { - return pageSize + POINTER_BYTES + 2; -} - static SPageInfo* getPageInfoFromPayload(void* page) { int32_t offset = offsetof(SPageInfo, pData); char* p = page - offset; @@ -348,41 +362,42 @@ static SPageInfo* getPageInfoFromPayload(void* page) { int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMemBufSize, uint64_t qId, const char* dir) { *pBuf = calloc(1, sizeof(SDiskbasedBuf)); - SDiskbasedBuf* pResBuf = *pBuf; - if (pResBuf == NULL) { + SDiskbasedBuf* pPBuf = *pBuf; + if (pPBuf == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } - pResBuf->pageSize = pagesize; - pResBuf->numOfPages = 0; // all pages are in buffer in the first place - pResBuf->totalBufSize = 0; - pResBuf->inMemPages = inMemBufSize/pagesize; // maximum allowed pages, it is a soft limit. - pResBuf->allocateId = -1; - pResBuf->comp = true; - pResBuf->file = NULL; - pResBuf->qId = qId; - pResBuf->fileSize = 0; - pResBuf->pFree = taosArrayInit(4, sizeof(SFreeListItem)); + pPBuf->pageSize = pagesize; + pPBuf->numOfPages = 0; // all pages are in buffer in the first place + pPBuf->totalBufSize = 0; + pPBuf->inMemPages = inMemBufSize/pagesize; // maximum allowed pages, it is a soft limit. + pPBuf->allocateId = -1; + pPBuf->comp = true; + pPBuf->file = NULL; + pPBuf->qId = qId; + pPBuf->fileSize = 0; + pPBuf->pFree = taosArrayInit(4, sizeof(SFreeListItem)); + pPBuf->freePgList = tdListNew(POINTER_BYTES); // at least more than 2 pages must be in memory assert(inMemBufSize >= pagesize * 2); - pResBuf->lruList = tdListNew(POINTER_BYTES); + pPBuf->lruList = tdListNew(POINTER_BYTES); // init id hash table _hash_fn_t fn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT); - pResBuf->groupSet = taosHashInit(10, fn, true, false); - pResBuf->assistBuf = malloc(pResBuf->pageSize + 2); // EXTRA BYTES - pResBuf->all = taosHashInit(10, fn, true, false); + pPBuf->groupSet = taosHashInit(10, fn, true, false); + pPBuf->assistBuf = malloc(pPBuf->pageSize + 2); // EXTRA BYTES + pPBuf->all = taosHashInit(10, fn, true, false); char path[PATH_MAX] = {0}; taosGetTmpfilePath(dir, "paged-buf", path); - pResBuf->path = strdup(path); + pPBuf->path = strdup(path); - pResBuf->emptyDummyIdList = taosArrayInit(1, sizeof(int32_t)); + pPBuf->emptyDummyIdList = taosArrayInit(1, sizeof(int32_t)); -// qDebug("QInfo:0x%"PRIx64" create resBuf for output, page size:%d, inmem buf pages:%d, file:%s", qId, pResBuf->pageSize, -// pResBuf->inMemPages, pResBuf->path); +// qDebug("QInfo:0x%"PRIx64" create resBuf for output, page size:%d, inmem buf pages:%d, file:%s", qId, pPBuf->pageSize, +// pPBuf->inMemPages, pPBuf->path); return TSDB_CODE_SUCCESS; } @@ -401,19 +416,29 @@ void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t groupId, int32_t* pageId) { } } - // register new id in this group - *pageId = (++pBuf->allocateId); + SPageInfo* pi = NULL; + if (listNEles(pBuf->freePgList) != 0) { + SListNode* pItem = tdListPopHead(pBuf->freePgList); + pi = *(SPageInfo**) pItem->data; + pi->used = true; + *pageId = pi->pageId; + tfree(pItem); + } else {// create a new pageinfo + // register new id in this group + *pageId = (++pBuf->allocateId); - // register page id info - SPageInfo* pi = registerPage(pBuf, groupId, *pageId); + // register page id info + pi = registerPage(pBuf, groupId, *pageId); + + // add to hash map + taosHashPut(pBuf->all, pageId, sizeof(int32_t), &pi, POINTER_BYTES); + pBuf->totalBufSize += pBuf->pageSize; + } // add to LRU list assert(listNEles(pBuf->lruList) < pBuf->inMemPages && pBuf->inMemPages > 0); lruListPushFront(pBuf->lruList, pi); - // add to hash map - taosHashPut(pBuf->all, pageId, sizeof(int32_t), &pi, POINTER_BYTES); - // allocate buf if (availablePage == NULL) { pi->pData = calloc(1, getAllocPageSize(pBuf->pageSize)); // add extract bytes in case of zipped buffer increased. @@ -421,11 +446,7 @@ void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t groupId, int32_t* pageId) { pi->pData = availablePage; } - pBuf->totalBufSize += pBuf->pageSize; - ((void**)pi->pData)[0] = pi; - pi->used = true; - return (void *)(GET_DATA_PAYLOAD(pi)); } @@ -467,6 +488,7 @@ void* getBufPage(SDiskbasedBuf* pBuf, int32_t id) { (*pi)->pData = availablePage; } + // set the ptr to the new SPageInfo ((void**)((*pi)->pData))[0] = (*pi); lruListPushFront(pBuf->lruList, *pi); @@ -551,6 +573,8 @@ void destroyDiskbasedBuf(SDiskbasedBuf* pBuf) { } tdListFree(pBuf->lruList); + tdListFree(pBuf->freePgList); + taosArrayDestroy(pBuf->emptyDummyIdList); taosArrayDestroy(pBuf->pFree); @@ -596,14 +620,15 @@ void setBufPageCompressOnDisk(SDiskbasedBuf* pBuf, bool comp) { void dBufSetBufPageRecycled(SDiskbasedBuf *pBuf, void* pPage) { SPageInfo* ppi = getPageInfoFromPayload(pPage); - ppi->used = false; + ppi->used = false; ppi->dirty = false; - // it is a in-memory page that has not been flushed to disk yet. - if (ppi->length != -1 && ppi->offset != -1) { - SFreeListItem item = {.length = ppi->length, .offset = ppi->offset}; - taosArrayPush(pBuf->pFree, &item); - } + // add this pageinfo into the free page info list + SListNode* pNode = tdListPopNode(pBuf->lruList, ppi->pn); + tfree(ppi->pData); + tfree(pNode); + + tdListAppend(pBuf->freePgList, &ppi); } void dBufSetPrintInfo(SDiskbasedBuf* pBuf) {