diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index 78a7e58ee1..395b42aa86 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -139,18 +139,29 @@ SSDataBlock* doScanLastrow(SOperatorInfo* pOperator) { colDataAppend(pDst, 0, p, isNull); } + pInfo->pRes->info.uid = *(tb_uid_t*)taosArrayGet(pInfo->pUidList, pInfo->indexOfBufferedRes); + pInfo->pRes->info.rows = 1; + if (pInfo->pseudoExprSup.numOfExprs > 0) { SExprSupp* pSup = &pInfo->pseudoExprSup; - addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pInfo->pRes, + int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pInfo->pRes, GET_TASKID(pTaskInfo)); + if (code != TSDB_CODE_SUCCESS) { + pTaskInfo->code = code; + return NULL; + } } - pInfo->pRes->info.uid = *(tb_uid_t*)taosArrayGet(pInfo->pUidList, pInfo->indexOfBufferedRes); - int64_t* groupId = taosHashGet(pTableList->map, &pInfo->pRes->info.uid, sizeof(int64_t)); - pInfo->pRes->info.groupId = *groupId; + if (pTableList->map != NULL) { + int64_t* groupId = taosHashGet(pTableList->map, &pInfo->pRes->info.uid, sizeof(int64_t)); + pInfo->pRes->info.groupId = *groupId; + } else { + ASSERT(taosArrayGetSize(pTableList->pTableList) == 1); + STableKeyInfo* pKeyInfo = taosArrayGet(pTableList->pTableList, 0); + pInfo->pRes->info.groupId = pKeyInfo->groupId; + } pInfo->indexOfBufferedRes += 1; - pInfo->pRes->info.rows = 1; return pInfo->pRes; } else { doSetOperatorCompleted(pOperator); @@ -182,8 +193,12 @@ SSDataBlock* doScanLastrow(SOperatorInfo* pOperator) { STableKeyInfo* pKeyInfo = taosArrayGet(pGroupTableList, 0); pInfo->pRes->info.groupId = pKeyInfo->groupId; - addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pInfo->pRes, + code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pInfo->pRes, GET_TASKID(pTaskInfo)); + if (code != TSDB_CODE_SUCCESS) { + pTaskInfo->code = code; + return NULL; + } } tsdbLastrowReaderClose(pInfo->pLastrowReader); diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 76d853ef3e..a8ff3188c8 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -65,7 +65,7 @@ size_t getResultRowSize(SqlFunctionCtx* pCtx, int32_t numOfOutput) { } rowSize += - (numOfOutput * sizeof(bool)); // expand rowSize to mark if col is null for top/bottom result(saveTupleData) + (numOfOutput * sizeof(bool)); // expand rowSize to mark if col is null for top/bottom result(doSaveTupleData) return rowSize; } diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index 30fdbb245d..35669b3e42 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -106,7 +106,7 @@ bool irateFuncSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResInfo); int32_t irateFunction(SqlFunctionCtx *pCtx); int32_t irateFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); -int32_t cacheLastRowFunction(SqlFunctionCtx* pCtx); +int32_t cachedLastRowFunction(SqlFunctionCtx* pCtx); bool getFirstLastFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); int32_t firstFunction(SqlFunctionCtx *pCtx); diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 9ca4ee8d8f..8bc8def4e0 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -2233,7 +2233,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .translateFunc = translateFirstLast, .getEnvFunc = getFirstLastFuncEnv, .initFunc = functionSetup, - .processFunc = cacheLastRowFunction, + .processFunc = cachedLastRowFunction, .finalizeFunc = firstLastFinalize }, { diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index e2288d9f70..12b796c5ca 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -80,13 +80,14 @@ typedef struct STopBotRes { } STopBotRes; typedef struct SFirstLastRes { - bool hasResult; + bool hasResult; // used for last_row function only, isNullRes in SResultRowEntry can not be passed to downstream.So, // this attribute is required - bool isNull; - int32_t bytes; - int64_t ts; - char buf[]; + bool isNull; + int32_t bytes; + int64_t ts; + STuplePos pos; + char buf[]; } SFirstLastRes; typedef struct SStddevRes { @@ -1141,8 +1142,8 @@ bool getMinmaxFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { return true; } -static void saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos); -static void copyTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos); +static void doSaveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos); +static void doCopyTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos); static int32_t findRowIndex(int32_t start, int32_t num, SColumnInfoData* pCol, const char* tval) { // the data is loaded, not only the block SMA value @@ -1195,7 +1196,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { pBuf->v = *(int64_t*)tval; if (pCtx->subsidiaries.num > 0) { index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval); - saveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos); + doSaveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos); } } else { if (IS_SIGNED_NUMERIC_TYPE(type)) { @@ -1207,7 +1208,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { pBuf->v = val; if (pCtx->subsidiaries.num > 0) { index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval); - saveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos); + doSaveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos); } } @@ -1220,7 +1221,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { pBuf->v = val; if (pCtx->subsidiaries.num > 0) { index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval); - saveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos); + doSaveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos); } } } else if (type == TSDB_DATA_TYPE_DOUBLE) { @@ -1232,7 +1233,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { pBuf->v = val; if (pCtx->subsidiaries.num > 0) { index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval); - saveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos); + doSaveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos); } } } else if (type == TSDB_DATA_TYPE_FLOAT) { @@ -1246,7 +1247,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { if (pCtx->subsidiaries.num > 0) { index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval); - saveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos); + doSaveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos); } } } @@ -1271,7 +1272,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { if (!pBuf->assign) { *val = pData[i]; if (pCtx->subsidiaries.num > 0) { - saveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + doSaveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); } pBuf->assign = true; } else { @@ -1283,7 +1284,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { if ((*val < pData[i]) ^ isMinFunc) { *val = pData[i]; if (pCtx->subsidiaries.num > 0) { - copyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + doCopyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); } } } @@ -1302,7 +1303,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { if (!pBuf->assign) { *val = pData[i]; if (pCtx->subsidiaries.num > 0) { - saveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + doSaveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); } pBuf->assign = true; } else { @@ -1314,7 +1315,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { if ((*val < pData[i]) ^ isMinFunc) { *val = pData[i]; if (pCtx->subsidiaries.num > 0) { - copyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + doCopyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); } } } @@ -1333,7 +1334,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { if (!pBuf->assign) { *val = pData[i]; if (pCtx->subsidiaries.num > 0) { - saveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + doSaveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); } pBuf->assign = true; } else { @@ -1345,7 +1346,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { if ((*val < pData[i]) ^ isMinFunc) { *val = pData[i]; if (pCtx->subsidiaries.num > 0) { - copyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + doCopyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); } } } @@ -1364,7 +1365,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { if (!pBuf->assign) { *val = pData[i]; if (pCtx->subsidiaries.num > 0) { - saveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + doSaveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); } pBuf->assign = true; } else { @@ -1376,7 +1377,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { if ((*val < pData[i]) ^ isMinFunc) { *val = pData[i]; if (pCtx->subsidiaries.num > 0) { - copyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + doCopyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); } } } @@ -1397,7 +1398,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { if (!pBuf->assign) { *val = pData[i]; if (pCtx->subsidiaries.num > 0) { - saveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + doSaveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); } pBuf->assign = true; } else { @@ -1409,7 +1410,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { if ((*val < pData[i]) ^ isMinFunc) { *val = pData[i]; if (pCtx->subsidiaries.num > 0) { - copyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + doCopyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); } } } @@ -1428,7 +1429,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { if (!pBuf->assign) { *val = pData[i]; if (pCtx->subsidiaries.num > 0) { - saveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + doSaveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); } pBuf->assign = true; } else { @@ -1440,7 +1441,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { if ((*val < pData[i]) ^ isMinFunc) { *val = pData[i]; if (pCtx->subsidiaries.num > 0) { - copyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + doCopyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); } } } @@ -1459,7 +1460,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { if (!pBuf->assign) { *val = pData[i]; if (pCtx->subsidiaries.num > 0) { - saveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + doSaveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); } pBuf->assign = true; } else { @@ -1471,7 +1472,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { if ((*val < pData[i]) ^ isMinFunc) { *val = pData[i]; if (pCtx->subsidiaries.num > 0) { - copyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + doCopyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); } } } @@ -1490,7 +1491,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { if (!pBuf->assign) { *val = pData[i]; if (pCtx->subsidiaries.num > 0) { - saveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + doSaveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); } pBuf->assign = true; } else { @@ -1502,7 +1503,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { if ((*val < pData[i]) ^ isMinFunc) { *val = pData[i]; if (pCtx->subsidiaries.num > 0) { - copyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + doCopyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); } } } @@ -1522,7 +1523,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { if (!pBuf->assign) { *val = pData[i]; if (pCtx->subsidiaries.num > 0) { - saveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + doSaveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); } pBuf->assign = true; } else { @@ -1534,7 +1535,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { if ((*val < pData[i]) ^ isMinFunc) { *val = pData[i]; if (pCtx->subsidiaries.num > 0) { - copyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + doCopyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); } } } @@ -1553,7 +1554,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { if (!pBuf->assign) { *val = pData[i]; if (pCtx->subsidiaries.num > 0) { - saveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + doSaveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); } pBuf->assign = true; } else { @@ -1565,7 +1566,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { if ((*val < pData[i]) ^ isMinFunc) { *val = pData[i]; if (pCtx->subsidiaries.num > 0) { - copyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + doCopyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); } } } @@ -2640,7 +2641,7 @@ int32_t apercentileCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) } int32_t getFirstLastInfoSize(int32_t resBytes) { - return sizeof(SFirstLastRes) + resBytes + sizeof(int64_t) + sizeof(STuplePos); + return sizeof(SFirstLastRes) + resBytes; } bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { @@ -2669,6 +2670,33 @@ static FORCE_INLINE TSKEY getRowPTs(SColumnInfoData* pTsColInfo, int32_t rowInde return *(TSKEY*)colDataGetData(pTsColInfo, rowIndex); } +static void saveTupleData(const SSDataBlock* pSrcBlock, int32_t rowIndex, SqlFunctionCtx* pCtx, SFirstLastRes* pInfo) { + if (pCtx->subsidiaries.num <= 0) { + return; + } + + if (!pInfo->hasResult) { + doSaveTupleData(pCtx, rowIndex, pSrcBlock, &pInfo->pos); + } else { + doCopyTupleData(pCtx, rowIndex, pSrcBlock, &pInfo->pos); + } +} + +static void doSaveCurrentVal(SqlFunctionCtx* pCtx, int32_t rowIndex, int64_t currentTs, int32_t type, char* pData) { + SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); + SFirstLastRes* pInfo = GET_ROWCELL_INTERBUF(pResInfo); + + if (IS_VAR_DATA_TYPE(type)) { + pInfo->bytes = varDataTLen(pData); + } + + memcpy(pInfo->buf, pData, pInfo->bytes); + pInfo->ts = currentTs; + saveTupleData(pCtx->pSrcBlock, rowIndex, pCtx, pInfo); + + pInfo->hasResult = true; +} + // This ordinary first function does not care if current scan is ascending order or descending order scan // the OPTIMIZED version of first function will only handle the ascending order scan int32_t firstFunction(SqlFunctionCtx* pCtx) { @@ -2680,9 +2708,7 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) { SInputColumnInfoData* pInput = &pCtx->input; SColumnInfoData* pInputCol = pInput->pData[0]; - int32_t type = pInputCol->info.type; - int32_t bytes = pInputCol->info.bytes; - pInfo->bytes = bytes; + pInfo->bytes = pInputCol->info.bytes; // All null data column, return directly. if (pInput->colDataAggIsSet && (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows)) { @@ -2700,8 +2726,7 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) { if (blockDataOrder == TSDB_ORDER_ASC) { // filter according to current result firstly if (pResInfo->numOfRes > 0) { - TSKEY ts = *(TSKEY*)(pInfo->buf + bytes); - if (ts < startKey) { + if (pInfo->ts < startKey) { return TSDB_CODE_SUCCESS; } } @@ -2715,26 +2740,8 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) { char* data = colDataGetData(pInputCol, i); TSKEY cts = getRowPTs(pInput->pPTS, i); - - if (pResInfo->numOfRes == 0 || *(TSKEY*)(pInfo->buf + bytes) > cts) { - if (IS_VAR_DATA_TYPE(type)) { - bytes = varDataTLen(data); - pInfo->bytes = bytes; - } - memcpy(pInfo->buf, data, bytes); - *(TSKEY*)(pInfo->buf + bytes) = cts; - // handle selectivity - if (pCtx->subsidiaries.num > 0) { - STuplePos* pTuplePos = (STuplePos*)(pInfo->buf + bytes + sizeof(TSKEY)); - if (!pInfo->hasResult) { - saveTupleData(pCtx, i, pCtx->pSrcBlock, pTuplePos); - } else { - copyTupleData(pCtx, i, pCtx->pSrcBlock, pTuplePos); - } - } - pInfo->hasResult = true; - // DO_UPDATE_TAG_COLUMNS(pCtx, ts); - pResInfo->numOfRes = 1; + if (pResInfo->numOfRes == 0 || pInfo->ts > cts) { + doSaveCurrentVal(pCtx, i, cts, pInputCol->info.type, data); break; } } @@ -2742,8 +2749,7 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) { // in case of descending order time stamp serial, which usually happens as the results of the nest query, // all data needs to be check. if (pResInfo->numOfRes > 0) { - TSKEY ts = *(TSKEY*)(pInfo->buf + bytes); - if (ts < endKey) { + if (pInfo->ts < endKey) { return TSDB_CODE_SUCCESS; } } @@ -2758,24 +2764,8 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) { char* data = colDataGetData(pInputCol, i); TSKEY cts = getRowPTs(pInput->pPTS, i); - if (pResInfo->numOfRes == 0 || *(TSKEY*)(pInfo->buf + bytes) > cts) { - if (IS_VAR_DATA_TYPE(type)) { - bytes = varDataTLen(data); - pInfo->bytes = bytes; - } - memcpy(pInfo->buf, data, bytes); - *(TSKEY*)(pInfo->buf + bytes) = cts; - // handle selectivity - if (pCtx->subsidiaries.num > 0) { - STuplePos* pTuplePos = (STuplePos*)(pInfo->buf + bytes + sizeof(TSKEY)); - if (!pInfo->hasResult) { - saveTupleData(pCtx, i, pCtx->pSrcBlock, pTuplePos); - } else { - copyTupleData(pCtx, i, pCtx->pSrcBlock, pTuplePos); - } - } - pInfo->hasResult = true; - pResInfo->numOfRes = 1; + if (pResInfo->numOfRes == 0 || pInfo->ts > cts) { + doSaveCurrentVal(pCtx, i, cts, pInputCol->info.type, data); break; } } @@ -2821,26 +2811,10 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) { char* data = colDataGetData(pInputCol, i); TSKEY cts = getRowPTs(pInput->pPTS, i); - if (pResInfo->numOfRes == 0 || *(TSKEY*)(pInfo->buf + bytes) < cts) { - if (IS_VAR_DATA_TYPE(type)) { - bytes = varDataTLen(data); - pInfo->bytes = bytes; - } - memcpy(pInfo->buf, data, bytes); - *(TSKEY*)(pInfo->buf + bytes) = cts; - // handle selectivity - if (pCtx->subsidiaries.num > 0) { - STuplePos* pTuplePos = (STuplePos*)(pInfo->buf + bytes + sizeof(TSKEY)); - if (!pInfo->hasResult) { - saveTupleData(pCtx, i, pCtx->pSrcBlock, pTuplePos); - } else { - copyTupleData(pCtx, i, pCtx->pSrcBlock, pTuplePos); - } - } - pInfo->hasResult = true; - // DO_UPDATE_TAG_COLUMNS(pCtx, ts); - pResInfo->numOfRes = 1; + if (pResInfo->numOfRes == 0 || pInfo->ts < cts) { + doSaveCurrentVal(pCtx, i, cts, type, data); } + break; } } else { // descending order @@ -2853,24 +2827,8 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) { char* data = colDataGetData(pInputCol, i); TSKEY cts = getRowPTs(pInput->pPTS, i); - if (pResInfo->numOfRes == 0 || *(TSKEY*)(pInfo->buf + bytes) < cts) { - if (IS_VAR_DATA_TYPE(type)) { - bytes = varDataTLen(data); - pInfo->bytes = bytes; - } - memcpy(pInfo->buf, data, bytes); - *(TSKEY*)(pInfo->buf + bytes) = cts; - // handle selectivity - if (pCtx->subsidiaries.num > 0) { - STuplePos* pTuplePos = (STuplePos*)(pInfo->buf + bytes + sizeof(TSKEY)); - if (!pInfo->hasResult) { - saveTupleData(pCtx, i, pCtx->pSrcBlock, pTuplePos); - } else { - copyTupleData(pCtx, i, pCtx->pSrcBlock, pTuplePos); - } - } - pInfo->hasResult = true; - pResInfo->numOfRes = 1; + if (pResInfo->numOfRes == 0 || pInfo->ts < cts) { + doSaveCurrentVal(pCtx, i, cts, type, data); } break; } @@ -2885,8 +2843,9 @@ static void firstLastTransferInfo(SqlFunctionCtx* pCtx, SFirstLastRes* pInput, S int32_t start = pColInfo->startRowIndex; pOutput->bytes = pInput->bytes; - TSKEY* tsIn = (TSKEY*)(pInput->buf + pInput->bytes); - TSKEY* tsOut = (TSKEY*)(pOutput->buf + pInput->bytes); + TSKEY* tsIn = &pInput->ts; + TSKEY* tsOut = &pOutput->ts; + if (pOutput->hasResult) { if (isFirst) { if (*tsIn > *tsOut) { @@ -2898,20 +2857,12 @@ static void firstLastTransferInfo(SqlFunctionCtx* pCtx, SFirstLastRes* pInput, S } } } + *tsOut = *tsIn; memcpy(pOutput->buf, pInput->buf, pOutput->bytes); - // handle selectivity - STuplePos* pTuplePos = (STuplePos*)(pOutput->buf + pOutput->bytes + sizeof(TSKEY)); - if (pCtx->subsidiaries.num > 0) { - if (!pOutput->hasResult) { - saveTupleData(pCtx, start, pCtx->pSrcBlock, pTuplePos); - } else { - copyTupleData(pCtx, start, pCtx->pSrcBlock, pTuplePos); - } - } - pOutput->hasResult = true; + saveTupleData(pCtx->pSrcBlock, start, pCtx, pOutput); - return; + pOutput->hasResult = true; } static int32_t firstLastFunctionMergeImpl(SqlFunctionCtx* pCtx, bool isFirstQuery) { @@ -2953,34 +2904,34 @@ int32_t firstLastFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { colDataAppend(pCol, pBlock->info.rows, pRes->buf, pRes->isNull || pResInfo->isNullRes); // handle selectivity - STuplePos* pTuplePos = (STuplePos*)(pRes->buf + pRes->bytes + sizeof(TSKEY)); - setSelectivityValue(pCtx, pBlock, pTuplePos, pBlock->info.rows); + setSelectivityValue(pCtx, pBlock, &pRes->pos, pBlock->info.rows); return pResInfo->numOfRes; } int32_t firstLastPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx); - SFirstLastRes* pRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); + SFirstLastRes* pRes = GET_ROWCELL_INTERBUF(pEntryInfo); int32_t resultBytes = getFirstLastInfoSize(pRes->bytes); - char* res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char)); + // todo check for failure + char* res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char)); memcpy(varDataVal(res), pRes, resultBytes); + varDataSetLen(res, resultBytes); int32_t slotId = pCtx->pExpr->base.resSchema.slotId; SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); colDataAppend(pCol, pBlock->info.rows, res, false); - // handle selectivity - STuplePos* pTuplePos = (STuplePos*)(pRes->buf + pRes->bytes + sizeof(TSKEY)); - setSelectivityValue(pCtx, pBlock, pTuplePos, pBlock->info.rows); + setSelectivityValue(pCtx, pBlock, &pRes->pos, pBlock->info.rows); taosMemoryFree(res); return 1; } +//todo rewrite: int32_t lastCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) { SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx); char* pDBuf = GET_ROWCELL_INTERBUF(pDResInfo); @@ -2998,6 +2949,28 @@ int32_t lastCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) { return TSDB_CODE_SUCCESS; } +static void doSaveLastrow(SqlFunctionCtx *pCtx, char* pData, int32_t rowIndex, int64_t cts, SFirstLastRes* pInfo) { + SInputColumnInfoData* pInput = &pCtx->input; + SColumnInfoData* pInputCol = pInput->pData[0]; + + if (colDataIsNull_s(pInputCol, rowIndex)) { + pInfo->isNull = true; + } else { + pInfo->isNull = false; + + if (IS_VAR_DATA_TYPE(pInputCol->info.type)) { + pInfo->bytes = varDataTLen(pData); + } + + memcpy(pInfo->buf, pData, pInfo->bytes); + } + + pInfo->ts = cts; + saveTupleData(pCtx->pSrcBlock, rowIndex, pCtx, pInfo); + + pInfo->hasResult = true; +} + int32_t lastRowFunction(SqlFunctionCtx* pCtx) { int32_t numOfElems = 0; @@ -3007,12 +2980,9 @@ int32_t lastRowFunction(SqlFunctionCtx* pCtx) { SInputColumnInfoData* pInput = &pCtx->input; SColumnInfoData* pInputCol = pInput->pData[0]; - int32_t type = pInputCol->info.type; int32_t bytes = pInputCol->info.bytes; pInfo->bytes = bytes; - SColumnDataAgg* pColAgg = (pInput->colDataAggIsSet) ? pInput->pColumnDataAgg[0] : NULL; - TSKEY startKey = getRowPTs(pInput->pPTS, 0); TSKEY endKey = getRowPTs(pInput->pPTS, pInput->totalRows - 1); @@ -3022,31 +2992,10 @@ int32_t lastRowFunction(SqlFunctionCtx* pCtx) { for (int32_t i = pInput->numOfRows + pInput->startRowIndex - 1; i >= pInput->startRowIndex; --i) { char* data = colDataGetData(pInputCol, i); TSKEY cts = getRowPTs(pInput->pPTS, i); - if (pResInfo->numOfRes == 0 || *(TSKEY*)(pInfo->buf) < cts) { - if (pInputCol->hasNull && colDataIsNull(pInputCol, pInput->totalRows, i, pColAgg)) { - pInfo->isNull = true; - } else { - pInfo->isNull = false; - if (IS_VAR_DATA_TYPE(type)) { - bytes = varDataTLen(data); - pInfo->bytes = bytes; - } - memcpy(pInfo->buf + sizeof(TSKEY), data, bytes); - } - *(TSKEY*)(pInfo->buf) = cts; - numOfElems++; - // handle selectivity - if (pCtx->subsidiaries.num > 0) { - STuplePos* pTuplePos = (STuplePos*)(pInfo->buf + bytes + sizeof(TSKEY)); - if (!pInfo->hasResult) { - saveTupleData(pCtx, i, pCtx->pSrcBlock, pTuplePos); - } else { - copyTupleData(pCtx, i, pCtx->pSrcBlock, pTuplePos); - } - } - pInfo->hasResult = true; - // DO_UPDATE_TAG_COLUMNS(pCtx, ts); - pResInfo->numOfRes = 1; + numOfElems++; + + if (pResInfo->numOfRes == 0 || pInfo->ts < cts) { + doSaveLastrow(pCtx, data, i, cts, pInfo); } break; } @@ -3054,31 +3003,10 @@ int32_t lastRowFunction(SqlFunctionCtx* pCtx) { for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) { char* data = colDataGetData(pInputCol, i); TSKEY cts = getRowPTs(pInput->pPTS, i); - if (pResInfo->numOfRes == 0 || *(TSKEY*)(pInfo->buf) < cts) { - if (pInputCol->hasNull && colDataIsNull(pInputCol, pInput->totalRows, i, pColAgg)) { - pInfo->isNull = true; - } else { - pInfo->isNull = false; - if (IS_VAR_DATA_TYPE(type)) { - bytes = varDataTLen(data); - pInfo->bytes = bytes; - } - memcpy(pInfo->buf + sizeof(TSKEY), data, bytes); - } - *(TSKEY*)(pInfo->buf) = cts; - numOfElems++; - // handle selectivity - if (pCtx->subsidiaries.num > 0) { - STuplePos* pTuplePos = (STuplePos*)(pInfo->buf + bytes + sizeof(TSKEY)); - if (!pInfo->hasResult) { - saveTupleData(pCtx, i, pCtx->pSrcBlock, pTuplePos); - } else { - copyTupleData(pCtx, i, pCtx->pSrcBlock, pTuplePos); - } - } - pInfo->hasResult = true; - pResInfo->numOfRes = 1; - // DO_UPDATE_TAG_COLUMNS(pCtx, ts); + numOfElems++; + + if (pResInfo->numOfRes == 0 || pInfo->ts < cts) { + doSaveLastrow(pCtx, data, i, cts, pInfo); } break; } @@ -3088,21 +3016,6 @@ int32_t lastRowFunction(SqlFunctionCtx* pCtx) { return TSDB_CODE_SUCCESS; } -int32_t lastRowFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { - int32_t slotId = pCtx->pExpr->base.resSchema.slotId; - SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); - - SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); - - SFirstLastRes* pRes = GET_ROWCELL_INTERBUF(pResInfo); - colDataAppend(pCol, pBlock->info.rows, pRes->buf + sizeof(TSKEY), pRes->isNull); - // handle selectivity - STuplePos* pTuplePos = (STuplePos*)(pRes->buf + pRes->bytes + sizeof(TSKEY)); - setSelectivityValue(pCtx, pBlock, pTuplePos, pBlock->info.rows); - - return pResInfo->numOfRes; -} - bool getDiffFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { pEnv->calcMemSize = sizeof(SDiffInfo); return true; @@ -3425,7 +3338,7 @@ void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSData // save the data of this tuple if (pCtx->subsidiaries.num > 0) { - saveTupleData(pCtx, rowIndex, pSrcBlock, &pItem->tuplePos); + doSaveTupleData(pCtx, rowIndex, pSrcBlock, &pItem->tuplePos); } #ifdef BUF_PAGE_DEBUG qDebug("page_saveTuple i:%d, item:%p,pageId:%d, offset:%d\n", pEntryInfo->numOfRes, pItem, pItem->tuplePos.pageId, @@ -3449,7 +3362,7 @@ void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSData // save the data of this tuple by over writing the old data if (pCtx->subsidiaries.num > 0) { - copyTupleData(pCtx, rowIndex, pSrcBlock, &pItem->tuplePos); + doCopyTupleData(pCtx, rowIndex, pSrcBlock, &pItem->tuplePos); } #ifdef BUF_PAGE_DEBUG qDebug("page_copyTuple pageId:%d, offset:%d", pItem->tuplePos.pageId, pItem->tuplePos.offset); @@ -3466,7 +3379,7 @@ void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSData * |(n columns, one bit for each column)| src column #1| src column #2| * +------------------------------------+--------------+--------------+ */ -void saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos) { +void doSaveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos) { SFilePage* pPage = NULL; // todo refactor: move away @@ -3527,7 +3440,7 @@ void saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pS #endif } -void copyTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos) { +void doCopyTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos) { SFilePage* pPage = getBufPage(pCtx->pBuf, pPos->pageId); int32_t numOfCols = pCtx->subsidiaries.num; @@ -4843,7 +4756,7 @@ static void doReservoirSample(SqlFunctionCtx* pCtx, SSampleInfo* pInfo, char* da if (pInfo->numSampled < pInfo->samples) { sampleAssignResult(pInfo, data, pInfo->numSampled); if (pCtx->subsidiaries.num > 0) { - saveTupleData(pCtx, index, pCtx->pSrcBlock, &pInfo->tuplePos[pInfo->numSampled]); + doSaveTupleData(pCtx, index, pCtx->pSrcBlock, &pInfo->tuplePos[pInfo->numSampled]); } pInfo->numSampled++; } else { @@ -4851,7 +4764,7 @@ static void doReservoirSample(SqlFunctionCtx* pCtx, SSampleInfo* pInfo, char* da if (j < pInfo->samples) { sampleAssignResult(pInfo, data, j); if (pCtx->subsidiaries.num > 0) { - copyTupleData(pCtx, index, pCtx->pSrcBlock, &pInfo->tuplePos[j]); + doCopyTupleData(pCtx, index, pCtx->pSrcBlock, &pInfo->tuplePos[j]); } } } @@ -5993,7 +5906,7 @@ int32_t interpFunction(SqlFunctionCtx* pCtx) { return TSDB_CODE_SUCCESS; } -int32_t cacheLastRowFunction(SqlFunctionCtx* pCtx) { +int32_t cachedLastRowFunction(SqlFunctionCtx* pCtx) { int32_t numOfElems = 0; SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); @@ -6002,9 +5915,7 @@ int32_t cacheLastRowFunction(SqlFunctionCtx* pCtx) { SInputColumnInfoData* pInput = &pCtx->input; SColumnInfoData* pInputCol = pInput->pData[0]; - int32_t type = pInputCol->info.type; int32_t bytes = pInputCol->info.bytes; - pInfo->bytes = bytes; // last_row function does not ignore the null value @@ -6014,28 +5925,7 @@ int32_t cacheLastRowFunction(SqlFunctionCtx* pCtx) { char* data = colDataGetData(pInputCol, i); TSKEY cts = getRowPTs(pInput->pPTS, i); if (pResInfo->numOfRes == 0 || pInfo->ts < cts) { - if (colDataIsNull_s(pInputCol, i)) { - pInfo->isNull = true; - } else { - if (IS_VAR_DATA_TYPE(type)) { - bytes = varDataTLen(data); - pInfo->bytes = bytes; - } - - memcpy(pInfo->buf, data, bytes); - } - - pInfo->ts = cts; - if (pCtx->subsidiaries.num > 0) { - STuplePos* pTuplePos = (STuplePos*)(pInfo->buf + bytes + sizeof(TSKEY)); - if (!pInfo->hasResult) { - saveTupleData(pCtx, i, pCtx->pSrcBlock, pTuplePos); - } else { - copyTupleData(pCtx, i, pCtx->pSrcBlock, pTuplePos); - } - } - - pInfo->hasResult = true; + doSaveLastrow(pCtx, data, i, cts, pInfo); } } diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 97858f83fb..2d05f6a16a 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -1993,7 +1993,8 @@ static bool lastRowScanOptMayBeOptimized(SLogicNode* pNode) { SNode* pFunc = NULL; FOREACH(pFunc, ((SAggLogicNode*)pNode)->pAggFuncs) { if (FUNCTION_TYPE_LAST_ROW != ((SFunctionNode*)pFunc)->funcType && - FUNCTION_TYPE_SELECT_VALUE != ((SFunctionNode*)pFunc)->funcType) { + FUNCTION_TYPE_SELECT_VALUE != ((SFunctionNode*)pFunc)->funcType && + FUNCTION_TYPE_GROUP_KEY != ((SFunctionNode*)pFunc)->funcType) { return false; } }