diff --git a/include/common/tcommon.h b/include/common/tcommon.h index ccad689710..a66e670bf0 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -107,6 +107,9 @@ typedef struct SFirstLastRes { bool isNull; int32_t bytes; int64_t ts; + char* pkData; + int32_t pkBytes; + int8_t pkType; STuplePos pos; char buf[]; } SFirstLastRes; diff --git a/include/libs/function/function.h b/include/libs/function/function.h index 0fa84c99c6..4ee0989118 100644 --- a/include/libs/function/function.h +++ b/include/libs/function/function.h @@ -118,6 +118,7 @@ typedef struct SInputColumnInfoData { int32_t numOfInputCols; // PTS is not included bool colDataSMAIsSet; // if agg is set or not SColumnInfoData *pPTS; // primary timestamp column + SColumnInfoData *pPrimaryKey; // primary key column SColumnInfoData **pData; SColumnDataAgg **pColumnDataAgg; uint64_t uid; // table uid, used to set the tag value when building the final query result for selectivity functions. @@ -180,6 +181,43 @@ typedef struct SFunctionStateStore { int32_t (*streamStateFuncGet)(SStreamState *pState, const SWinKey *key, void **ppVal, int32_t *pVLen); } SFunctionStateStore; +typedef struct SFuncInputRow { + TSKEY ts; + bool isDataNull; + char* pData; + + SSDataBlock* block; // prev row block or src block + int32_t rowIndex; // prev row block ? 0 : rowIndex in srcBlock + + //TODO: + // int32_t startOffset; // for diff, derivative + // SPoint1 startPoint; // for twa +} SFuncInputRow; + +typedef struct SFuncInputRowIter { + bool hasPrev; + + SInputColumnInfoData* pInput; + SColumnInfoData* pData; + TSKEY* tsList; + int32_t rowIndex; + int32_t inputEndIndex; + SSDataBlock* pSrcBlock; + + TSKEY prevBlockTsEnd; + bool prevIsDataNull; + char* pPrevData; + SSDataBlock* pPrevRowBlock; // pre one row block + + //TODO: + // int32_t prevStartOffset; // for diff, derivative. + // SPoint1 prevStartPoint; // for twa. + // int32_t startOffset; // for diff, derivative. + // SPoint1 startPoint; // for twa. + + bool finalRow; +} SFuncInputRowIter; + // sql function runtime context typedef struct SqlFunctionCtx { SInputColumnInfoData input; @@ -209,6 +247,9 @@ typedef struct SqlFunctionCtx { int32_t exprIdx; char *udfName; SFunctionStateStore *pStore; + bool hasPrimaryKey; + SFuncInputRowIter rowIter; + bool bInputFinished; } SqlFunctionCtx; typedef struct tExprNode { diff --git a/include/libs/function/functionMgt.h b/include/libs/function/functionMgt.h index 3836c631d5..f126bb587a 100644 --- a/include/libs/function/functionMgt.h +++ b/include/libs/function/functionMgt.h @@ -240,8 +240,9 @@ bool fmIsGroupKeyFunc(int32_t funcId); bool fmIsBlockDistFunc(int32_t funcId); bool fmIsConstantResFunc(SFunctionNode* pFunc); bool fmIsSkipScanCheckFunc(int32_t funcId); +bool fmIsPrimaryKeyFunc(int32_t funcId); -void getLastCacheDataType(SDataType* pType); +void getLastCacheDataType(SDataType* pType, int32_t pkBytes); SFunctionNode* createFunction(const char* pName, SNodeList* pParameterList); int32_t fmGetDistMethod(const SFunctionNode* pFunc, SFunctionNode** pPartialFunc, SFunctionNode** pMidFunc, SFunctionNode** pMergeFunc); diff --git a/include/libs/nodes/querynodes.h b/include/libs/nodes/querynodes.h index 97ac4ff3b9..53e931087e 100644 --- a/include/libs/nodes/querynodes.h +++ b/include/libs/nodes/querynodes.h @@ -75,6 +75,7 @@ typedef struct SColumnNode { SExprNode node; // QUERY_NODE_COLUMN uint64_t tableId; int8_t tableType; + bool tableHasPk; col_id_t colId; uint16_t projIdx; // the idx in project list, start from 1 EColumnType colType; // column or tag @@ -163,6 +164,8 @@ typedef struct SFunctionNode { int32_t funcType; SNodeList* pParameterList; int32_t udfBufSize; + bool hasPk; + int32_t pkBytes; } SFunctionNode; typedef struct STableNode { diff --git a/include/util/taoserror.h b/include/util/taoserror.h index aa906c38f4..b2ebd34255 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -760,6 +760,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_PAR_NOT_SUPPORT_MULTI_RESULT TAOS_DEF_ERROR_CODE(0, 0x2670) #define TSDB_CODE_PAR_TAG_IS_PRIMARY_KEY TAOS_DEF_ERROR_CODE(0, 0x2671) #define TSDB_CODE_PAR_SECOND_COL_PK TAOS_DEF_ERROR_CODE(0, 0x2672) +#define TSDB_CODE_PAR_COL_PK_TYPE TAOS_DEF_ERROR_CODE(0, 0x2673) #define TSDB_CODE_PAR_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x26FF) //planner diff --git a/source/libs/executor/src/executorInt.c b/source/libs/executor/src/executorInt.c index 76dc622cfd..b6a2156bc6 100644 --- a/source/libs/executor/src/executorInt.c +++ b/source/libs/executor/src/executorInt.c @@ -303,6 +303,12 @@ static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int pInput->colDataSMAIsSet = false; SExprInfo* pOneExpr = &pExprSup->pExprInfo[i]; + bool hasPk = pOneExpr->pExpr->nodeType == QUERY_NODE_FUNCTION && pOneExpr->pExpr->_function.pFunctNode->hasPk; + pCtx[i].hasPrimaryKey = hasPk; + + int16_t tsParamIdx = (!hasPk) ? pOneExpr->base.numOfParams - 1 : pOneExpr->base.numOfParams - 2; + int16_t pkParamIdx = pOneExpr->base.numOfParams - 1; + for (int32_t j = 0; j < pOneExpr->base.numOfParams; ++j) { SFunctParam* pFuncParam = &pOneExpr->base.pParam[j]; if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) { @@ -315,9 +321,13 @@ static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int // NOTE: the last parameter is the primary timestamp column // todo: refactor this - if (fmIsImplicitTsFunc(pCtx[i].functionId) && (j == pOneExpr->base.numOfParams - 1)) { + + if (fmIsImplicitTsFunc(pCtx[i].functionId) && (j == tsParamIdx)) { pInput->pPTS = pInput->pData[j]; // in case of merge function, this is not always the ts column data. } + if (hasPk && (j == pkParamIdx)) { + pInput->pPrimaryKey = pInput->pData[j]; + } ASSERT(pInput->pData[j] != NULL); } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) { // todo avoid case: top(k, 12), 12 is the value parameter. diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index ba7bf72aea..ccb50ff2a8 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -157,7 +157,7 @@ int32_t firstLastFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t firstLastPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t firstCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); int32_t lastCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); -int32_t getFirstLastInfoSize(int32_t resBytes); +int32_t getFirstLastInfoSize(int32_t resBytes, int32_t pkBytes); EFuncDataRequired firstDynDataReq(void* pRes, STimeWindow* pTimeWindow); EFuncDataRequired lastDynDataReq(void* pRes, STimeWindow* pTimeWindow); diff --git a/source/libs/function/inc/functionMgtInt.h b/source/libs/function/inc/functionMgtInt.h index 30bd38e7ba..5b51022f68 100644 --- a/source/libs/function/inc/functionMgtInt.h +++ b/source/libs/function/inc/functionMgtInt.h @@ -53,6 +53,7 @@ extern "C" { #define FUNC_MGT_GEOMETRY_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(24) #define FUNC_MGT_FORBID_SYSTABLE_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(25) #define FUNC_MGT_SKIP_SCAN_CHECK_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(26) +#define FUNC_MGT_PRIMARY_KEY_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(27) #define FUNC_MGT_TEST_MASK(val, mask) (((val) & (mask)) != 0) diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index b4af77fafc..cb2a979909 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -1782,9 +1782,9 @@ static int32_t translateFirstLastImpl(SFunctionNode* pFunc, char* pErrBuf, int32 return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); } } - + int32_t pkBytes = (pFunc->hasPk) ? pFunc->pkBytes : 0; pFunc->node.resType = - (SDataType){.bytes = getFirstLastInfoSize(paraBytes) + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}; + (SDataType){.bytes = getFirstLastInfoSize(paraBytes, pkBytes) + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}; } else { if (TSDB_DATA_TYPE_BINARY != paraType) { return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); @@ -2753,7 +2753,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .name = "interp", .type = FUNCTION_TYPE_INTERP, .classification = FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_INTERVAL_INTERPO_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC | - FUNC_MGT_FORBID_STREAM_FUNC | FUNC_MGT_FORBID_SYSTABLE_FUNC | FUNC_MGT_KEEP_ORDER_FUNC, + FUNC_MGT_FORBID_STREAM_FUNC | FUNC_MGT_FORBID_SYSTABLE_FUNC | FUNC_MGT_KEEP_ORDER_FUNC | FUNC_MGT_PRIMARY_KEY_FUNC, .translateFunc = translateInterp, .getEnvFunc = getSelectivityFuncEnv, .initFunc = functionSetup, @@ -2765,7 +2765,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .name = "derivative", .type = FUNCTION_TYPE_DERIVATIVE, .classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC | - FUNC_MGT_KEEP_ORDER_FUNC | FUNC_MGT_CUMULATIVE_FUNC | FUNC_MGT_FORBID_STREAM_FUNC | FUNC_MGT_FORBID_SYSTABLE_FUNC, + FUNC_MGT_KEEP_ORDER_FUNC | FUNC_MGT_CUMULATIVE_FUNC | FUNC_MGT_FORBID_STREAM_FUNC | FUNC_MGT_FORBID_SYSTABLE_FUNC | FUNC_MGT_PRIMARY_KEY_FUNC, .translateFunc = translateDerivative, .getEnvFunc = getDerivativeFuncEnv, .initFunc = derivativeFuncSetup, @@ -2778,7 +2778,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .name = "irate", .type = FUNCTION_TYPE_IRATE, .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC | FUNC_MGT_FORBID_STREAM_FUNC | - FUNC_MGT_FORBID_SYSTABLE_FUNC, + FUNC_MGT_FORBID_SYSTABLE_FUNC | FUNC_MGT_PRIMARY_KEY_FUNC, .translateFunc = translateIrate, .getEnvFunc = getIrateFuncEnv, .initFunc = irateFuncSetup, @@ -2792,7 +2792,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .name = "_irate_partial", .type = FUNCTION_TYPE_IRATE_PARTIAL, .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC | FUNC_MGT_FORBID_STREAM_FUNC | - FUNC_MGT_FORBID_SYSTABLE_FUNC, + FUNC_MGT_FORBID_SYSTABLE_FUNC | FUNC_MGT_PRIMARY_KEY_FUNC, .translateFunc = translateIratePartial, .getEnvFunc = getIrateFuncEnv, .initFunc = irateFuncSetup, @@ -2815,7 +2815,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .name = "last_row", .type = FUNCTION_TYPE_LAST_ROW, .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC | - FUNC_MGT_KEEP_ORDER_FUNC | FUNC_MGT_FORBID_SYSTABLE_FUNC, + FUNC_MGT_KEEP_ORDER_FUNC | FUNC_MGT_FORBID_SYSTABLE_FUNC | FUNC_MGT_PRIMARY_KEY_FUNC, .translateFunc = translateFirstLast, .dynDataRequiredFunc = lastDynDataReq, .getEnvFunc = getFirstLastFuncEnv, @@ -2852,7 +2852,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .name = "_last_row_partial", .type = FUNCTION_TYPE_LAST_PARTIAL, .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC | - FUNC_MGT_FORBID_SYSTABLE_FUNC, + FUNC_MGT_FORBID_SYSTABLE_FUNC | FUNC_MGT_PRIMARY_KEY_FUNC, .translateFunc = translateFirstLastPartial, .dynDataRequiredFunc = lastDynDataReq, .getEnvFunc = getFirstLastFuncEnv, @@ -2864,7 +2864,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .name = "_last_row_merge", .type = FUNCTION_TYPE_LAST_MERGE, .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC | - FUNC_MGT_FORBID_SYSTABLE_FUNC, + FUNC_MGT_FORBID_SYSTABLE_FUNC | FUNC_MGT_PRIMARY_KEY_FUNC, .translateFunc = translateFirstLastMerge, .getEnvFunc = getFirstLastFuncEnv, .initFunc = functionSetup, @@ -2875,7 +2875,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .name = "first", .type = FUNCTION_TYPE_FIRST, .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC | - FUNC_MGT_KEEP_ORDER_FUNC | FUNC_MGT_FORBID_SYSTABLE_FUNC, + FUNC_MGT_KEEP_ORDER_FUNC | FUNC_MGT_FORBID_SYSTABLE_FUNC | FUNC_MGT_PRIMARY_KEY_FUNC, .translateFunc = translateFirstLast, .dynDataRequiredFunc = firstDynDataReq, .getEnvFunc = getFirstLastFuncEnv, @@ -2891,7 +2891,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .name = "_first_partial", .type = FUNCTION_TYPE_FIRST_PARTIAL, .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC | - FUNC_MGT_FORBID_SYSTABLE_FUNC, + FUNC_MGT_FORBID_SYSTABLE_FUNC | FUNC_MGT_PRIMARY_KEY_FUNC, .translateFunc = translateFirstLastPartial, .dynDataRequiredFunc = firstDynDataReq, .getEnvFunc = getFirstLastFuncEnv, @@ -2904,7 +2904,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .name = "_first_merge", .type = FUNCTION_TYPE_FIRST_MERGE, .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC | - FUNC_MGT_FORBID_SYSTABLE_FUNC, + FUNC_MGT_FORBID_SYSTABLE_FUNC | FUNC_MGT_PRIMARY_KEY_FUNC, .translateFunc = translateFirstLastMerge, .getEnvFunc = getFirstLastFuncEnv, .initFunc = functionSetup, @@ -2916,7 +2916,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .name = "last", .type = FUNCTION_TYPE_LAST, .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC | - FUNC_MGT_KEEP_ORDER_FUNC | FUNC_MGT_FORBID_SYSTABLE_FUNC, + FUNC_MGT_KEEP_ORDER_FUNC | FUNC_MGT_FORBID_SYSTABLE_FUNC | FUNC_MGT_PRIMARY_KEY_FUNC, .translateFunc = translateFirstLast, .dynDataRequiredFunc = lastDynDataReq, .getEnvFunc = getFirstLastFuncEnv, @@ -2932,7 +2932,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .name = "_last_partial", .type = FUNCTION_TYPE_LAST_PARTIAL, .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC | - FUNC_MGT_FORBID_SYSTABLE_FUNC, + FUNC_MGT_FORBID_SYSTABLE_FUNC | FUNC_MGT_PRIMARY_KEY_FUNC, .translateFunc = translateFirstLastPartial, .dynDataRequiredFunc = lastDynDataReq, .getEnvFunc = getFirstLastFuncEnv, @@ -2945,7 +2945,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .name = "_last_merge", .type = FUNCTION_TYPE_LAST_MERGE, .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC | - FUNC_MGT_FORBID_SYSTABLE_FUNC, + FUNC_MGT_FORBID_SYSTABLE_FUNC | FUNC_MGT_PRIMARY_KEY_FUNC, .translateFunc = translateFirstLastMerge, .getEnvFunc = getFirstLastFuncEnv, .initFunc = functionSetup, @@ -2957,7 +2957,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .name = "twa", .type = FUNCTION_TYPE_TWA, .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_INTERVAL_INTERPO_FUNC | FUNC_MGT_FORBID_STREAM_FUNC | - FUNC_MGT_IMPLICIT_TS_FUNC | FUNC_MGT_FORBID_SYSTABLE_FUNC, + FUNC_MGT_IMPLICIT_TS_FUNC | FUNC_MGT_FORBID_SYSTABLE_FUNC | FUNC_MGT_PRIMARY_KEY_FUNC, .translateFunc = translateInNumOutDou, .dataRequiredFunc = statisDataRequired, .getEnvFunc = getTwaFuncEnv, @@ -3060,7 +3060,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .name = "diff", .type = FUNCTION_TYPE_DIFF, .classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC | - FUNC_MGT_KEEP_ORDER_FUNC | FUNC_MGT_FORBID_STREAM_FUNC | FUNC_MGT_CUMULATIVE_FUNC | FUNC_MGT_FORBID_SYSTABLE_FUNC, + FUNC_MGT_KEEP_ORDER_FUNC | FUNC_MGT_FORBID_STREAM_FUNC | FUNC_MGT_CUMULATIVE_FUNC | FUNC_MGT_FORBID_SYSTABLE_FUNC | FUNC_MGT_PRIMARY_KEY_FUNC, .translateFunc = translateDiff, .getEnvFunc = getDiffFuncEnv, .initFunc = diffFunctionSetup, @@ -3144,7 +3144,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "unique", .type = FUNCTION_TYPE_UNIQUE, - .classification = FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_FORBID_STREAM_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC, + .classification = FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_FORBID_STREAM_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC | FUNC_MGT_PRIMARY_KEY_FUNC, .translateFunc = translateUnique, .getEnvFunc = getUniqueFuncEnv, .initFunc = uniqueFunctionSetup, diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 5ab6d5e075..6dc8f3b570 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -418,6 +418,209 @@ typedef struct SGroupKeyInfo { (_p).val = (_v); \ } while (0) +int32_t funcInputUpdate(SqlFunctionCtx* pCtx) { + SFuncInputRowIter* pIter = &pCtx->rowIter; + + if (!pCtx->bInputFinished) { + pIter->pInput = &pCtx->input; + pIter->tsList = (TSKEY*)pIter->pInput->pPTS->pData; + pIter->pData = pIter->pInput->pData[0]; + pIter->rowIndex = pIter->pInput->startRowIndex; + pIter->inputEndIndex = pIter->rowIndex + pIter->pInput->numOfRows - 1; + pIter->pSrcBlock = pCtx->pSrcBlock; + } else { + pIter->finalRow = true; + } + + return TSDB_CODE_SUCCESS; +} + +bool funcInputGetNextRowDescPk(SFuncInputRowIter* pIter, SFuncInputRow* pRow) { + if (pIter->finalRow) { + if (pIter->hasPrev) { + pRow->ts = pIter->prevBlockTsEnd; + pRow->isDataNull = pIter->prevIsDataNull; + pRow->pData = pIter->pPrevData; + pRow->block = pIter->pPrevRowBlock; + pRow->rowIndex = 0; + + pIter->hasPrev = false; + return true; + } else { + return false; + } + } + if (pIter->hasPrev) { + if (pIter->prevBlockTsEnd == pIter->tsList[pIter->inputEndIndex]) { + blockDataDestroy(pIter->pPrevRowBlock); + pIter->pPrevRowBlock = blockDataExtractBlock(pIter->pSrcBlock, pIter->inputEndIndex, 1); + pIter->prevIsDataNull = colDataIsNull_f(pIter->pData->nullbitmap, pIter->inputEndIndex); + pIter->pPrevData = taosMemoryMalloc(pIter->pData->info.bytes); + char* srcData = colDataGetData(pIter->pData, pIter->inputEndIndex); + memcpy(pIter->pPrevData, srcData, pIter->pData->info.bytes); + pIter->pPrevRowBlock = blockDataExtractBlock(pIter->pSrcBlock, pIter->inputEndIndex, 1); + + pIter->hasPrev = true; + return false; + } else { + int32_t idx = pIter->rowIndex; + while (pIter->tsList[idx] == pIter->prevBlockTsEnd) { + ++idx; + } + pRow->ts = pIter->prevBlockTsEnd; + if (idx == pIter->pInput->startRowIndex) { + pRow->isDataNull = pIter->prevIsDataNull; + pRow->pData = pIter->pPrevData; + pRow->block = pIter->pPrevRowBlock; + pRow->rowIndex = 0; + } else { + pRow->ts = pIter->tsList[idx - 1]; + pRow->isDataNull = colDataIsNull_f(pIter->pData->nullbitmap, idx - 1); + pRow->pData = colDataGetData(pIter->pData, idx - 1); + pRow->block = pIter->pSrcBlock; + pRow->rowIndex = idx - 1; + } + pIter->hasPrev = false; + pIter->rowIndex = idx; + return true; + } + } else { + TSKEY tsEnd = pIter->tsList[pIter->inputEndIndex]; + if (pIter->tsList[pIter->rowIndex] != tsEnd) { + int32_t idx = pIter->rowIndex; + while (pIter->tsList[idx + 1] == pIter->tsList[pIter->rowIndex]) { + ++idx; + } + pRow->ts = pIter->tsList[idx]; + pRow->isDataNull = colDataIsNull_f(pIter->pData->nullbitmap, idx); + pRow->pData = colDataGetData(pIter->pData, pIter->rowIndex); + pRow->block = pIter->pSrcBlock; + pRow->rowIndex = idx; + + pIter->rowIndex = idx + 1; + return true; + } else { + pIter->hasPrev = true; + pIter->prevBlockTsEnd = tsEnd; + // TODO + pIter->prevIsDataNull = colDataIsNull_f(pIter->pData->nullbitmap, pIter->inputEndIndex); + pIter->pPrevData = taosMemoryMalloc(pIter->pData->info.bytes); + memcpy(pIter->pPrevData, colDataGetData(pIter->pData, pIter->inputEndIndex), pIter->pData->info.bytes); + pIter->pPrevRowBlock = blockDataExtractBlock(pIter->pSrcBlock, pIter->inputEndIndex, 1); + return false; + } + } +} + +bool funcInputGetNextRowAscPk(SFuncInputRowIter *pIter, SFuncInputRow* pRow) { + if (pIter->hasPrev) { + if (pIter->prevBlockTsEnd == pIter->tsList[pIter->inputEndIndex]) { + pIter->hasPrev = true; + return false; + } else { + int32_t idx = pIter->rowIndex; + while (pIter->tsList[idx] == pIter->prevBlockTsEnd) { + ++idx; + } + pRow->ts = pIter->tsList[idx]; + pRow->isDataNull = colDataIsNull_f(pIter->pData->nullbitmap, idx); + pRow->pData = colDataGetData(pIter->pData, idx); + pRow->block = pIter->pSrcBlock; + pRow->rowIndex = idx; + + pIter->hasPrev = false; + pIter->rowIndex = idx + 1; + return true; + } + } else { + if (pIter->rowIndex <= pIter->inputEndIndex) { + pRow->ts = pIter->tsList[pIter->rowIndex]; + pRow->isDataNull = colDataIsNull_f(pIter->pData->nullbitmap, pIter->rowIndex); + pRow->pData = colDataGetData(pIter->pData, pIter->rowIndex); + pRow->block = pIter->pSrcBlock; + pRow->rowIndex = pIter->rowIndex; + + TSKEY tsEnd = pIter->tsList[pIter->inputEndIndex]; + if (pIter->tsList[pIter->rowIndex] != tsEnd) { + int32_t idx = pIter->rowIndex + 1; + while (idx <= pIter->inputEndIndex && pIter->tsList[idx] == pIter->tsList[pIter->rowIndex]) { + ++idx; + } + pIter->rowIndex = idx; + } else { + pIter->rowIndex = pIter->inputEndIndex + 1; + } + return true; + } else { + TSKEY tsEnd = pIter->tsList[pIter->inputEndIndex]; + pIter->hasPrev = true; + pIter->prevBlockTsEnd = tsEnd; + return false; + } + } +} + +bool funcInputGetNextRowNoPk(SFuncInputRowIter *pIter, SFuncInputRow* pRow) { + if (pIter->rowIndex <= pIter->inputEndIndex) { + pRow->ts = pIter->tsList[pIter->rowIndex]; + pRow->isDataNull = colDataIsNull_f(pIter->pData->nullbitmap, pIter->rowIndex); + pRow->pData = colDataGetData(pIter->pData, pIter->rowIndex); + pRow->block = pIter->pSrcBlock; + pRow->rowIndex = pIter->rowIndex; + + ++pIter->rowIndex; + return true; + } else { + return false; + } +} + +bool funcInputGetNextRow(SqlFunctionCtx* pCtx, SFuncInputRow* pRow) { + SFuncInputRowIter* pIter = &pCtx->rowIter; + if (pCtx->hasPrimaryKey) { + if (pCtx->order == TSDB_ORDER_DESC) { + return funcInputGetNextRowDescPk(pIter, pRow); + } else { + return funcInputGetNextRowAscPk(pIter, pRow); + } + } else { + return funcInputGetNextRowNoPk(pIter, pRow); + } +} + +// This function append the selectivity to subsidiaries function context directly, without fetching data +// from intermediate disk based buf page +void appendSelectivityCols(SqlFunctionCtx* pCtx, SSDataBlock* pSrcBlock, int32_t rowIndex, int32_t pos) { + if (pCtx->subsidiaries.num <= 0) { + return; + } + + for (int32_t j = 0; j < pCtx->subsidiaries.num; ++j) { + SqlFunctionCtx* pc = pCtx->subsidiaries.pCtx[j]; + + // get data from source col + SFunctParam* pFuncParam = &pc->pExpr->base.pParam[0]; + int32_t srcSlotId = pFuncParam->pCol->slotId; + + SColumnInfoData* pSrcCol = taosArrayGet(pSrcBlock->pDataBlock, srcSlotId); + + char* pData = colDataGetData(pSrcCol, rowIndex); + + // append to dest col + int32_t dstSlotId = pc->pExpr->base.resSchema.slotId; + + SColumnInfoData* pDstCol = taosArrayGet(pCtx->pDstBlock->pDataBlock, dstSlotId); + + if (colDataIsNull_s(pSrcCol, rowIndex) == true) { + colDataSetNULL(pDstCol, pos); + } else { + colDataSetVal(pDstCol, pos, pData, false); + } + } +} + +bool funcInputGetNextRowIndex(SInputColumnInfoData* pInput, int32_t from, bool firstOccur, int32_t* pRowIndex, int32_t* nextFrom); + static int32_t firstLastTransferInfoImpl(SFirstLastRes* pInput, SFirstLastRes* pOutput, bool isFirst); bool functionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) { @@ -2119,11 +2322,14 @@ EFuncDataRequired lastDynDataReq(void* pRes, STimeWindow* pTimeWindow) { } } -int32_t getFirstLastInfoSize(int32_t resBytes) { return sizeof(SFirstLastRes) + resBytes; } +//TODO modify it to include primary key bytes +int32_t getFirstLastInfoSize(int32_t resBytes, int32_t pkBytes) { return sizeof(SFirstLastRes) + resBytes + pkBytes; } bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { SColumnNode* pNode = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, 0); - pEnv->calcMemSize = getFirstLastInfoSize(pNode->node.resType.bytes); + //TODO: change SFunctionNode to add pk info + int32_t pkBytes = (pFunc->hasPk) ? pFunc->pkBytes : 0; + pEnv->calcMemSize = getFirstLastInfoSize(pNode->node.resType.bytes, pkBytes); return true; } @@ -2177,7 +2383,7 @@ static int32_t firstlastSaveTupleData(const SSDataBlock* pSrcBlock, int32_t rowI return code; } -static int32_t doSaveCurrentVal(SqlFunctionCtx* pCtx, int32_t rowIndex, int64_t currentTs, int32_t type, char* pData) { +static int32_t doSaveCurrentVal(SqlFunctionCtx* pCtx, int32_t rowIndex, int64_t currentTs, char* pkData, int32_t type, char* pData) { SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); SFirstLastRes* pInfo = GET_ROWCELL_INTERBUF(pResInfo); @@ -2186,6 +2392,14 @@ static int32_t doSaveCurrentVal(SqlFunctionCtx* pCtx, int32_t rowIndex, int64_t } memcpy(pInfo->buf, pData, pInfo->bytes); + if (pkData != NULL) { + if (IS_VAR_DATA_TYPE(pInfo->pkType)) { + pInfo->pkBytes = varDataTLen(pkData); + } + memcpy(pInfo->buf + pInfo->bytes, pkData, pInfo->pkBytes); + pInfo->pkData = pInfo->buf + pInfo->bytes; + } + pInfo->ts = currentTs; int32_t code = firstlastSaveTupleData(pCtx->pSrcBlock, rowIndex, pCtx, pInfo); if (code != TSDB_CODE_SUCCESS) { @@ -2213,6 +2427,15 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) { return TSDB_CODE_SUCCESS; } + SColumnInfoData* pkCol = pInput->pPrimaryKey; + pInfo->pkType = -1; + __compar_fn_t pkCompareFn = NULL; + if (pCtx->hasPrimaryKey) { + pInfo->pkType = pkCol->info.type; + pInfo->pkBytes = pkCol->info.bytes; + pkCompareFn = getKeyComparFunc(pInfo->pkType, TSDB_ORDER_ASC); + } + // All null data column, return directly. if (pInput->colDataSMAIsSet && (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows) && pInputCol->hasNull == true) { @@ -2283,17 +2506,24 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) { } #else int64_t* pts = (int64_t*)pInput->pPTS->pData; - for (int32_t i = pInput->startRowIndex; i < pInput->startRowIndex + pInput->numOfRows; ++i) { + + int from = -1; + int32_t i = -1; + while (funcInputGetNextRowIndex(pInput, from, true, &i, &from)) { if (pInputCol->hasNull && colDataIsNull(pInputCol, pInput->totalRows, i, pColAgg)) { continue; } numOfElems++; - char* data = colDataGetData(pInputCol, i); + char* pkData = NULL; + if (pCtx->hasPrimaryKey) { + pkData = colDataGetData(pkCol, i); + } TSKEY cts = pts[i]; - if (pResInfo->numOfRes == 0 || pInfo->ts > cts) { - int32_t code = doSaveCurrentVal(pCtx, i, cts, pInputCol->info.type, data); + if (pResInfo->numOfRes == 0 || pInfo->ts > cts || + (pInfo->ts == cts && pkCompareFn && pkCompareFn(pkData, pInfo->pkData) < 0)) { + int32_t code = doSaveCurrentVal(pCtx, i, cts, pkData, pInputCol->info.type, data); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -2330,6 +2560,15 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) { return TSDB_CODE_SUCCESS; } + SColumnInfoData* pkCol = pInput->pPrimaryKey; + pInfo->pkType = -1; + __compar_fn_t pkCompareFn = NULL; + if (pCtx->hasPrimaryKey) { + pInfo->pkType = pkCol->info.type; + pInfo->pkBytes = pkCol->info.bytes; + pkCompareFn = getKeyComparFunc(pInfo->pkType, TSDB_ORDER_DESC); + } + // All null data column, return directly. if (pInput->colDataSMAIsSet && (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows) && pInputCol->hasNull == true) { @@ -2400,7 +2639,7 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) { } } #else - if (!pInputCol->hasNull) { + if (!pInputCol->hasNull && !pCtx->hasPrimaryKey) { numOfElems = 1; int32_t round = pInput->numOfRows >> 2; @@ -2427,7 +2666,7 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) { if (pResInfo->numOfRes == 0 || pInfo->ts < cts) { char* data = colDataGetData(pInputCol, chosen); - int32_t code = doSaveCurrentVal(pCtx, i, cts, type, data); + int32_t code = doSaveCurrentVal(pCtx, i, cts, NULL, type, data); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -2438,7 +2677,7 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) { for (int32_t i = pInput->startRowIndex + round * 4; i < pInput->startRowIndex + pInput->numOfRows; ++i) { if (pResInfo->numOfRes == 0 || pInfo->ts < pts[i]) { char* data = colDataGetData(pInputCol, i); - int32_t code = doSaveCurrentVal(pCtx, i, pts[i], type, data); + int32_t code = doSaveCurrentVal(pCtx, i, pts[i], NULL, type, data); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -2446,16 +2685,22 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) { } } } else { - for (int32_t i = pInput->startRowIndex; i < pInput->startRowIndex + pInput->numOfRows; ++i) { + int from = -1; + int32_t i = -1; + while (funcInputGetNextRowIndex(pInput, from, false, &i, &from)) { if (colDataIsNull(pInputCol, pInput->totalRows, i, pColAgg)) { continue; } numOfElems++; - - if (pResInfo->numOfRes == 0 || pInfo->ts < pts[i]) { + char* pkData = NULL; + if (pCtx->hasPrimaryKey) { + pkData = colDataGetData(pkCol, i); + } + if (pResInfo->numOfRes == 0 || pInfo->ts < pts[i] || + (pInfo->ts == pts[i] && pkCompareFn && pkCompareFn(pkData, pInfo->pkData) < 0)) { char* data = colDataGetData(pInputCol, i); - int32_t code = doSaveCurrentVal(pCtx, i, pts[i], type, data); + int32_t code = doSaveCurrentVal(pCtx, i, pts[i], pkData, type, data); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -2483,14 +2728,19 @@ static int32_t firstLastTransferInfoImpl(SFirstLastRes* pInput, SFirstLastRes* p if (!pInput->hasResult) { return TSDB_CODE_FAILED; } - + __compar_fn_t pkCompareFn = NULL; + if (pInput->pkData) { + pkCompareFn = getKeyComparFunc(pInput->pkType, (isFirst) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC); + } if (pOutput->hasResult) { if (isFirst) { - if (pInput->ts > pOutput->ts) { + if (pInput->ts > pOutput->ts || + (pInput->ts == pOutput->ts && pkCompareFn && pkCompareFn(pInput->pkData, pOutput->pkData) > 0)) { return TSDB_CODE_FAILED; } } else { - if (pInput->ts < pOutput->ts) { + if (pInput->ts < pOutput->ts || + (pInput->ts == pOutput->ts && pkCompareFn && pkCompareFn(pInput->pkData, pOutput->pkData) < 0)) { return TSDB_CODE_FAILED; } } @@ -2501,6 +2751,10 @@ static int32_t firstLastTransferInfoImpl(SFirstLastRes* pInput, SFirstLastRes* p pOutput->bytes = pInput->bytes; memcpy(pOutput->buf, pInput->buf, pOutput->bytes); + if (pInput->pkData) { + pOutput->pkBytes = pInput->pkBytes; + memcpy(pOutput->buf+pOutput->bytes, pInput->pkData, pOutput->pkBytes); + } return TSDB_CODE_SUCCESS; } @@ -2535,6 +2789,11 @@ static int32_t firstLastFunctionMergeImpl(SqlFunctionCtx* pCtx, bool isFirstQuer } char* data = colDataGetData(pCol, i); SFirstLastRes* pInputInfo = (SFirstLastRes*)varDataVal(data); + if (pCtx->hasPrimaryKey) { + pInputInfo->pkData = pInputInfo->buf + pInputInfo->bytes; + } else { + pInputInfo->pkData = NULL; + } int32_t code = firstLastTransferInfo(pCtx, pInputInfo, pInfo, isFirstQuery, i); if (code != TSDB_CODE_SUCCESS) { return code; @@ -2575,7 +2834,7 @@ int32_t firstLastPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx); SFirstLastRes* pRes = GET_ROWCELL_INTERBUF(pEntryInfo); - int32_t resultBytes = getFirstLastInfoSize(pRes->bytes); + int32_t resultBytes = getFirstLastInfoSize(pRes->bytes, pRes->pkBytes); // todo check for failure char* res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char)); @@ -2613,6 +2872,8 @@ int32_t lastCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) { static int32_t doSaveLastrow(SqlFunctionCtx* pCtx, char* pData, int32_t rowIndex, int64_t cts, SFirstLastRes* pInfo) { SInputColumnInfoData* pInput = &pCtx->input; SColumnInfoData* pInputCol = pInput->pData[0]; + SColumnInfoData* pkCol = pInput->pPrimaryKey; + if (colDataIsNull_s(pInputCol, rowIndex)) { pInfo->isNull = true; @@ -2626,6 +2887,14 @@ static int32_t doSaveLastrow(SqlFunctionCtx* pCtx, char* pData, int32_t rowIndex memcpy(pInfo->buf, pData, pInfo->bytes); } + if (pCtx->hasPrimaryKey) { + char* pkData = colDataGetData(pkCol, rowIndex); + if (IS_VAR_DATA_TYPE(pInfo->pkType)) { + pInfo->pkBytes = varDataTLen(pkData); + } + memcpy(pInfo->buf + pInfo->bytes, pkData, pInfo->pkBytes); + pInfo->pkData = pInfo->buf + pInfo->bytes; + } pInfo->ts = cts; int32_t code = firstlastSaveTupleData(pCtx->pSrcBlock, rowIndex, pCtx, pInfo); if (code != TSDB_CODE_SUCCESS) { @@ -2653,7 +2922,14 @@ int32_t lastRowFunction(SqlFunctionCtx* pCtx) { if (IS_NULL_TYPE(type)) { return TSDB_CODE_SUCCESS; } - + SColumnInfoData* pkCol = pInput->pPrimaryKey; + pInfo->pkType = -1; + __compar_fn_t pkCompareFn = NULL; + if (pCtx->hasPrimaryKey) { + pInfo->pkType = pkCol->info.type; + pInfo->pkBytes = pkCol->info.bytes; + pkCompareFn = getKeyComparFunc(pInfo->pkType, TSDB_ORDER_DESC); + } TSKEY startKey = getRowPTs(pInput->pPTS, 0); TSKEY endKey = getRowPTs(pInput->pPTS, pInput->totalRows - 1); @@ -2689,13 +2965,20 @@ int32_t lastRowFunction(SqlFunctionCtx* pCtx) { #else int64_t* pts = (int64_t*)pInput->pPTS->pData; - for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) { + int from = -1; + int32_t i = -1; + while (funcInputGetNextRowIndex(pInput, from, false, &i, &from)) { bool isNull = colDataIsNull(pInputCol, pInput->numOfRows, i, NULL); char* data = isNull ? NULL : colDataGetData(pInputCol, i); TSKEY cts = pts[i]; numOfElems++; - if (pResInfo->numOfRes == 0 || pInfo->ts < cts) { + char* pkData = NULL; + if (pCtx->hasPrimaryKey) { + pkData = colDataGetData(pkCol, i); + } + if (pResInfo->numOfRes == 0 || pInfo->ts < cts || + (pInfo->ts == pts[i] && pkCompareFn && pkCompareFn(pkData, pInfo->pkData) < 0)) { int32_t code = doSaveLastrow(pCtx, data, i, cts, pInfo); if (code != TSDB_CODE_SUCCESS) { return code; @@ -2854,13 +3137,52 @@ static int32_t doHandleDiff(SDiffInfo* pDiffInfo, int32_t type, const char* pv, return TSDB_CODE_SUCCESS; } +//TODO: the primary key compare can be skipped for ordered pk if knonwn before +//TODO: for desc ordered, pk shall select the smallest one for one ts. if across block boundaries. +bool funcInputGetNextRowIndex(SInputColumnInfoData* pInput, int32_t from, bool firstOccur, int32_t* pRowIndex, int32_t* nextFrom) { + if (pInput->pPrimaryKey == NULL) { + if (from == -1) { + from = pInput->startRowIndex; + } else if (from >= pInput->numOfRows + pInput->startRowIndex) { + return false; + } + *pRowIndex = from; + *nextFrom = from + 1; + return true; + } else { + if (from == -1) { + from = pInput->startRowIndex; + } else if (from >= pInput->numOfRows + pInput->startRowIndex) { + return false; + } + TSKEY* tsList = (int64_t*)pInput->pPTS->pData; + SColumnInfoData* pkCol = pInput->pPrimaryKey; + int8_t pkType = pkCol->info.type; + int32_t order = (firstOccur) ? TSDB_ORDER_ASC: TSDB_ORDER_DESC; + __compar_fn_t compareFunc = getKeyComparFunc(pkType, order); + int32_t select = from; + char* val = colDataGetData(pkCol, select); + while (from < pInput->numOfRows + pInput->startRowIndex - 1 && tsList[from + 1] == tsList[from]) { + char* val1 = colDataGetData(pkCol, from + 1); + if (compareFunc(val1, val) < 0) { + select = from + 1; + val = val1; + } + from = from + 1; + } + *pRowIndex = select; + *nextFrom = from + 1; + return true; + } +} + int32_t diffFunction(SqlFunctionCtx* pCtx) { SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); SDiffInfo* pDiffInfo = GET_ROWCELL_INTERBUF(pResInfo); SInputColumnInfoData* pInput = &pCtx->input; - SColumnInfoData* pInputCol = pInput->pData[0]; + int8_t inputType = pInputCol->info.type; TSKEY* tsList = (int64_t*)pInput->pPTS->pData; @@ -2869,10 +3191,13 @@ int32_t diffFunction(SqlFunctionCtx* pCtx) { SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput; - for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += 1) { + funcInputUpdate(pCtx); + + SFuncInputRow row = {0}; + while (funcInputGetNextRow(pCtx, &row)) { int32_t pos = startOffset + numOfElems; - if (colDataIsNull_f(pInputCol->nullbitmap, i)) { + if (row.isDataNull) { if (pDiffInfo->includeNull) { colDataSetNull_f_s(pOutput, pos); @@ -2881,24 +3206,24 @@ int32_t diffFunction(SqlFunctionCtx* pCtx) { continue; } - char* pv = colDataGetData(pInputCol, i); + char* pv = row.pData; if (pDiffInfo->hasPrev) { - if (tsList[i] == pDiffInfo->prevTs) { - return TSDB_CODE_FUNC_DUP_TIMESTAMP; + if (row.ts == pDiffInfo->prevTs) { + return TSDB_CODE_FUNC_DUP_TIMESTAMP; } - int32_t code = doHandleDiff(pDiffInfo, pInputCol->info.type, pv, pOutput, pos, tsList[i]); + int32_t code = doHandleDiff(pDiffInfo, inputType, pv, pOutput, pos, row.ts); if (code != TSDB_CODE_SUCCESS) { return code; } // handle selectivity if (pCtx->subsidiaries.num > 0) { - appendSelectivityValue(pCtx, i, pos); + appendSelectivityCols(pCtx, row.block, row.rowIndex, pos); } - + numOfElems++; } else { - int32_t code = doSetPrevVal(pDiffInfo, pInputCol->info.type, pv, tsList[i]); + int32_t code = doSetPrevVal(pDiffInfo, inputType, pv, row.ts); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -5196,36 +5521,35 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) { goto _twa_over; } - int32_t i = pInput->startRowIndex; + funcInputUpdate(pCtx); + SFuncInputRow row = {0}; if (pCtx->start.key != INT64_MIN && last->key == INT64_MIN) { - for (; i < pInput->numOfRows + pInput->startRowIndex; ++i) { - if (colDataIsNull_f(pInputCol->nullbitmap, i)) { + while (funcInputGetNextRow(pCtx, &row)) { + if (row.isDataNull) { continue; } - last->key = tsList[i]; + last->key = row.ts; - GET_TYPED_DATA(last->val, double, pInputCol->info.type, colDataGetData(pInputCol, i)); + GET_TYPED_DATA(last->val, double, pInputCol->info.type, row.pData); pInfo->dOutput += twa_get_area(pCtx->start, *last); pInfo->win.skey = pCtx->start.key; numOfElems++; - i += 1; break; } } else if (pInfo->p.key == INT64_MIN) { - for (; i < pInput->numOfRows + pInput->startRowIndex; ++i) { - if (colDataIsNull_f(pInputCol->nullbitmap, i)) { + while (funcInputGetNextRow(pCtx, &row)) { + if (row.isDataNull) { continue; } - last->key = tsList[i]; + last->key = row.ts; - GET_TYPED_DATA(last->val, double, pInputCol->info.type, colDataGetData(pInputCol, i)); + GET_TYPED_DATA(last->val, double, pInputCol->info.type, row.pData); pInfo->win.skey = last->key; numOfElems++; - i += 1; break; } } @@ -5235,14 +5559,13 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) { // calculate the value of switch (pInputCol->info.type) { case TSDB_DATA_TYPE_TINYINT: { - int8_t* val = (int8_t*)colDataGetData(pInputCol, 0); - for (; i < pInput->numOfRows + pInput->startRowIndex; i += 1) { - if (colDataIsNull_f(pInputCol->nullbitmap, i)) { + while (funcInputGetNextRow(pCtx, &row)) { + if (row.isDataNull) { continue; } numOfElems++; - INIT_INTP_POINT(st, tsList[i], val[i]); + INIT_INTP_POINT(st, row.ts, *(int8_t*)row.pData); if (pInfo->p.key == st.key) { return TSDB_CODE_FUNC_DUP_TIMESTAMP; } @@ -5254,14 +5577,13 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) { } case TSDB_DATA_TYPE_SMALLINT: { - int16_t* val = (int16_t*)colDataGetData(pInputCol, 0); - for (; i < pInput->numOfRows + pInput->startRowIndex; i += 1) { - if (colDataIsNull_f(pInputCol->nullbitmap, i)) { + while (funcInputGetNextRow(pCtx, &row)) { + if (row.isDataNull) { continue; } numOfElems++; - INIT_INTP_POINT(st, tsList[i], val[i]); + INIT_INTP_POINT(st, row.ts, *(int16_t*)row.pData); if (pInfo->p.key == st.key) { return TSDB_CODE_FUNC_DUP_TIMESTAMP; } @@ -5272,14 +5594,13 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) { break; } case TSDB_DATA_TYPE_INT: { - int32_t* val = (int32_t*)colDataGetData(pInputCol, 0); - for (; i < pInput->numOfRows + pInput->startRowIndex; i += 1) { - if (colDataIsNull_f(pInputCol->nullbitmap, i)) { + while (funcInputGetNextRow(pCtx, &row)) { + if (row.isDataNull) { continue; } numOfElems++; - INIT_INTP_POINT(st, tsList[i], val[i]); + INIT_INTP_POINT(st, row.ts, *(int32_t*)row.pData); if (pInfo->p.key == st.key) { return TSDB_CODE_FUNC_DUP_TIMESTAMP; } @@ -5290,14 +5611,13 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) { break; } case TSDB_DATA_TYPE_BIGINT: { - int64_t* val = (int64_t*)colDataGetData(pInputCol, 0); - for (; i < pInput->numOfRows + pInput->startRowIndex; i += 1) { - if (colDataIsNull_f(pInputCol->nullbitmap, i)) { + while (funcInputGetNextRow(pCtx, &row)) { + if (row.isDataNull) { continue; } numOfElems++; - INIT_INTP_POINT(st, tsList[i], val[i]); + INIT_INTP_POINT(st, row.ts, *(int64_t*)row.pData); if (pInfo->p.key == st.key) { return TSDB_CODE_FUNC_DUP_TIMESTAMP; } @@ -5308,14 +5628,13 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) { break; } case TSDB_DATA_TYPE_FLOAT: { - float* val = (float*)colDataGetData(pInputCol, 0); - for (; i < pInput->numOfRows + pInput->startRowIndex; i += 1) { - if (colDataIsNull_f(pInputCol->nullbitmap, i)) { + while (funcInputGetNextRow(pCtx, &row)) { + if (row.isDataNull) { continue; } numOfElems++; - INIT_INTP_POINT(st, tsList[i], val[i]); + INIT_INTP_POINT(st, row.ts, *(float_t*)row.pData); if (pInfo->p.key == st.key) { return TSDB_CODE_FUNC_DUP_TIMESTAMP; } @@ -5326,14 +5645,13 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) { break; } case TSDB_DATA_TYPE_DOUBLE: { - double* val = (double*)colDataGetData(pInputCol, 0); - for (; i < pInput->numOfRows + pInput->startRowIndex; i += 1) { - if (colDataIsNull_f(pInputCol->nullbitmap, i)) { + while (funcInputGetNextRow(pCtx, &row)) { + if (row.isDataNull) { continue; } numOfElems++; - INIT_INTP_POINT(st, tsList[i], val[i]); + INIT_INTP_POINT(st, row.ts, *(double*)row.pData); if (pInfo->p.key == st.key) { return TSDB_CODE_FUNC_DUP_TIMESTAMP; } @@ -5344,14 +5662,13 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) { break; } case TSDB_DATA_TYPE_UTINYINT: { - uint8_t* val = (uint8_t*)colDataGetData(pInputCol, 0); - for (; i < pInput->numOfRows + pInput->startRowIndex; i += 1) { - if (colDataIsNull_f(pInputCol->nullbitmap, i)) { + while (funcInputGetNextRow(pCtx, &row)) { + if (row.isDataNull) { continue; } numOfElems++; - INIT_INTP_POINT(st, tsList[i], val[i]); + INIT_INTP_POINT(st, row.ts, *(uint8_t*)row.pData); if (pInfo->p.key == st.key) { return TSDB_CODE_FUNC_DUP_TIMESTAMP; } @@ -5362,14 +5679,13 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) { break; } case TSDB_DATA_TYPE_USMALLINT: { - uint16_t* val = (uint16_t*)colDataGetData(pInputCol, 0); - for (; i < pInput->numOfRows + pInput->startRowIndex; i += 1) { - if (colDataIsNull_f(pInputCol->nullbitmap, i)) { + while (funcInputGetNextRow(pCtx, &row)) { + if (row.isDataNull) { continue; } numOfElems++; - INIT_INTP_POINT(st, tsList[i], val[i]); + INIT_INTP_POINT(st, row.ts, *(uint16_t*)row.pData); if (pInfo->p.key == st.key) { return TSDB_CODE_FUNC_DUP_TIMESTAMP; } @@ -5380,14 +5696,13 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) { break; } case TSDB_DATA_TYPE_UINT: { - uint32_t* val = (uint32_t*)colDataGetData(pInputCol, 0); - for (; i < pInput->numOfRows + pInput->startRowIndex; i += 1) { - if (colDataIsNull_f(pInputCol->nullbitmap, i)) { + while (funcInputGetNextRow(pCtx, &row)) { + if (row.isDataNull) { continue; } numOfElems++; - INIT_INTP_POINT(st, tsList[i], val[i]); + INIT_INTP_POINT(st, row.ts, *(uint32_t*)row.pData); if (pInfo->p.key == st.key) { return TSDB_CODE_FUNC_DUP_TIMESTAMP; } @@ -5398,14 +5713,13 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) { break; } case TSDB_DATA_TYPE_UBIGINT: { - uint64_t* val = (uint64_t*)colDataGetData(pInputCol, 0); - for (; i < pInput->numOfRows + pInput->startRowIndex; i += 1) { - if (colDataIsNull_f(pInputCol->nullbitmap, i)) { + while (funcInputGetNextRow(pCtx, &row)) { + if (row.isDataNull) { continue; } numOfElems++; - INIT_INTP_POINT(st, tsList[i], val[i]); + INIT_INTP_POINT(st, row.ts, *(uint64_t*)row.pData); if (pInfo->p.key == st.key) { return TSDB_CODE_FUNC_DUP_TIMESTAMP; } @@ -5707,28 +6021,27 @@ int32_t derivativeFunction(SqlFunctionCtx* pCtx) { SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput; SColumnInfoData* pTsOutput = pCtx->pTsOutput; - int32_t i = pInput->startRowIndex; - TSKEY* tsList = (int64_t*)pInput->pPTS->pData; + funcInputUpdate(pCtx); double v = 0; - if (pCtx->order == TSDB_ORDER_ASC) { - for (; i < pInput->numOfRows + pInput->startRowIndex; i += 1) { - if (colDataIsNull_f(pInputCol->nullbitmap, i)) { + SFuncInputRow row = {0}; + while (funcInputGetNextRow(pCtx, &row)) { + if (row.isDataNull) { continue; } - char* d = (char*)pInputCol->pData + pInputCol->info.bytes * i; + char* d = row.pData; GET_TYPED_DATA(v, double, pInputCol->info.type, d); int32_t pos = pCtx->offset + numOfElems; if (!pDerivInfo->valueSet) { // initial value is not set yet pDerivInfo->valueSet = true; } else { - if (tsList[i] == pDerivInfo->prevTs) { + if (row.ts == pDerivInfo->prevTs) { return TSDB_CODE_FUNC_DUP_TIMESTAMP; } - double r = ((v - pDerivInfo->prevValue) * pDerivInfo->tsWindow) / (tsList[i] - pDerivInfo->prevTs); + double r = ((v - pDerivInfo->prevValue) * pDerivInfo->tsWindow) / (row.ts - pDerivInfo->prevTs); if (pDerivInfo->ignoreNegative && r < 0) { } else { if (isinf(r) || isnan(r)) { @@ -5738,12 +6051,12 @@ int32_t derivativeFunction(SqlFunctionCtx* pCtx) { } if (pTsOutput != NULL) { - colDataSetInt64(pTsOutput, pos, &tsList[i]); + colDataSetInt64(pTsOutput, pos, &row.ts); } // handle selectivity if (pCtx->subsidiaries.num > 0) { - appendSelectivityValue(pCtx, i, pos); + appendSelectivityCols(pCtx, row.block, row.rowIndex, pos); } numOfElems++; @@ -5751,25 +6064,26 @@ int32_t derivativeFunction(SqlFunctionCtx* pCtx) { } pDerivInfo->prevValue = v; - pDerivInfo->prevTs = tsList[i]; + pDerivInfo->prevTs = row.ts; } } else { - for (; i < pInput->numOfRows + pInput->startRowIndex; i += 1) { - if (colDataIsNull_f(pInputCol->nullbitmap, i)) { + SFuncInputRow row = {0}; + while (funcInputGetNextRow(pCtx, &row)) { + if (row.isDataNull) { continue; } - char* d = (char*)pInputCol->pData + pInputCol->info.bytes * i; + char* d = row.pData; GET_TYPED_DATA(v, double, pInputCol->info.type, d); int32_t pos = pCtx->offset + numOfElems; if (!pDerivInfo->valueSet) { // initial value is not set yet pDerivInfo->valueSet = true; } else { - if (tsList[i] == pDerivInfo->prevTs) { + if (row.ts == pDerivInfo->prevTs) { return TSDB_CODE_FUNC_DUP_TIMESTAMP; } - double r = ((pDerivInfo->prevValue - v) * pDerivInfo->tsWindow) / (pDerivInfo->prevTs - tsList[i]); + double r = ((pDerivInfo->prevValue - v) * pDerivInfo->tsWindow) / (pDerivInfo->prevTs - row.ts); if (pDerivInfo->ignoreNegative && r < 0) { } else { if (isinf(r) || isnan(r)) { @@ -5784,15 +6098,14 @@ int32_t derivativeFunction(SqlFunctionCtx* pCtx) { // handle selectivity if (pCtx->subsidiaries.num > 0) { - appendSelectivityValue(pCtx, i, pos); + appendSelectivityCols(pCtx, row.block, row.rowIndex, pos); } - numOfElems++; } } pDerivInfo->prevValue = v; - pDerivInfo->prevTs = tsList[i]; + pDerivInfo->prevTs = row.ts; } } @@ -5833,51 +6146,52 @@ int32_t irateFunction(SqlFunctionCtx* pCtx) { SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput; - TSKEY* tsList = (int64_t*)pInput->pPTS->pData; + funcInputUpdate(pCtx); int32_t numOfElems = 0; int32_t type = pInputCol->info.type; - - for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += 1) { - if (colDataIsNull_f(pInputCol->nullbitmap, i)) { + SFuncInputRow row = {0}; + while (funcInputGetNextRow(pCtx, &row)) { + if (row.isDataNull) { continue; } - numOfElems++; - - char* data = colDataGetData(pInputCol, i); + char* data = row.pData; double v = 0; GET_TYPED_DATA(v, double, type, data); if (INT64_MIN == pRateInfo->lastKey) { pRateInfo->lastValue = v; - pRateInfo->lastKey = tsList[i]; + pRateInfo->lastKey = row.ts; pRateInfo->hasResult = 1; continue; } - if (tsList[i] > pRateInfo->lastKey) { + if (row.ts > pRateInfo->lastKey) { if ((INT64_MIN == pRateInfo->firstKey) || pRateInfo->lastKey > pRateInfo->firstKey) { pRateInfo->firstValue = pRateInfo->lastValue; pRateInfo->firstKey = pRateInfo->lastKey; } pRateInfo->lastValue = v; - pRateInfo->lastKey = tsList[i]; + pRateInfo->lastKey = row.ts; continue; - } else if (tsList[i] == pRateInfo->lastKey) { - return TSDB_CODE_FUNC_DUP_TIMESTAMP; + } else if (row.ts == pRateInfo->lastKey) { + return TSDB_CODE_FUNC_DUP_TIMESTAMP; } + - if ((INT64_MIN == pRateInfo->firstKey) || tsList[i] > pRateInfo->firstKey) { + if ((INT64_MIN == pRateInfo->firstKey) || row.ts > pRateInfo->firstKey) { pRateInfo->firstValue = v; - pRateInfo->firstKey = tsList[i]; - } else if (tsList[i] == pRateInfo->firstKey) { + pRateInfo->firstKey = row.ts; + } else if (row.ts == pRateInfo->firstKey) { return TSDB_CODE_FUNC_DUP_TIMESTAMP; } } + numOfElems++; + SET_VAL(pResInfo, numOfElems, 1); return TSDB_CODE_SUCCESS; } @@ -6111,6 +6425,16 @@ int32_t cachedLastRowFunction(SqlFunctionCtx* pCtx) { int32_t bytes = pInputCol->info.bytes; pInfo->bytes = bytes; + SColumnInfoData* pkCol = pInput->pPrimaryKey; + pInfo->pkType = -1; + __compar_fn_t pkCompareFn = NULL; + if (pCtx->hasPrimaryKey) { + pInfo->pkType = pkCol->info.type; + pInfo->pkBytes = pkCol->info.bytes; + pkCompareFn = getKeyComparFunc(pInfo->pkType, TSDB_ORDER_DESC); + } + + // TODO it traverse the different way. // last_row function does not ignore the null value for (int32_t i = pInput->numOfRows + pInput->startRowIndex - 1; i >= pInput->startRowIndex; --i) { numOfElems++; diff --git a/source/libs/function/src/functionMgt.c b/source/libs/function/src/functionMgt.c index 068d1532c0..ae3958647b 100644 --- a/source/libs/function/src/functionMgt.c +++ b/source/libs/function/src/functionMgt.c @@ -352,8 +352,12 @@ bool fmIsSkipScanCheckFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_SKIP_SCAN_CHECK_FUNC); } -void getLastCacheDataType(SDataType* pType) { - pType->bytes = getFirstLastInfoSize(pType->bytes) + VARSTR_HEADER_SIZE; +bool fmIsPrimaryKeyFunc(int32_t funcId) { + return isSpecificClassifyFunc(funcId, FUNC_MGT_PRIMARY_KEY_FUNC); +} +void getLastCacheDataType(SDataType* pType, int32_t pkBytes) { + //TODO: do it later. + pType->bytes = getFirstLastInfoSize(pType->bytes, pkBytes) + VARSTR_HEADER_SIZE; pType->type = TSDB_DATA_TYPE_BINARY; } diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index ac9f2b3b09..e767bdac6e 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -804,6 +804,10 @@ static bool isImplicitTsFunc(const SNode* pNode) { return (QUERY_NODE_FUNCTION == nodeType(pNode) && fmIsImplicitTsFunc(((SFunctionNode*)pNode)->funcId)); } +static bool isPkFunc(const SNode* pNode) { + return (QUERY_NODE_FUNCTION == nodeType(pNode) && fmIsPrimaryKeyFunc(((SFunctionNode*)pNode)->funcId)); +} + static bool isScanPseudoColumnFunc(const SNode* pNode) { return (QUERY_NODE_FUNCTION == nodeType(pNode) && fmIsScanPseudoColumnFunc(((SFunctionNode*)pNode)->funcId)); } @@ -916,6 +920,10 @@ static bool isPrimaryKey(STempTableNode* pTable, SNode* pExpr) { return isPrimaryKeyImpl(pExpr); } +static bool hasPkInTable(STableMeta* pTableMeta) { + return pTableMeta->tableInfo.numOfColumns>=2 && pTableMeta->schema[1].flags | COL_IS_KEY; +} + static void setColumnInfoBySchema(const SRealTableNode* pTable, const SSchema* pColSchema, int32_t tagFlag, SColumnNode* pCol) { strcpy(pCol->dbName, pTable->table.dbName); @@ -930,6 +938,7 @@ static void setColumnInfoBySchema(const SRealTableNode* pTable, const SSchema* p } pCol->tableId = pTable->pMeta->uid; pCol->tableType = pTable->pMeta->tableType; + pCol->tableHasPk = hasPkInTable(pTable->pMeta); pCol->colId = pColSchema->colId; pCol->colType = (tagFlag >= 0 ? COLUMN_TYPE_TAG : COLUMN_TYPE_COLUMN); pCol->hasIndex = (pColSchema != NULL && IS_IDX_ON(pColSchema)); @@ -4615,6 +4624,63 @@ static int32_t appendTsForImplicitTsFunc(STranslateContext* pCxt, SSelectStmt* p return pCxt->errCode; } +static int32_t createPkColByTable(STranslateContext* pCxt, SRealTableNode* pTable, SNode** pPk) { + SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN); + if (NULL == pCol) { + return TSDB_CODE_OUT_OF_MEMORY; + } + pCol->colId = pTable->pMeta->schema[1].colId; + strcpy(pCol->colName, pTable->pMeta->schema[1].name); + bool found = false; + int32_t code = findAndSetColumn(pCxt, &pCol, (STableNode*)pTable, &found); + if (TSDB_CODE_SUCCESS != code || !found) { + return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INTERNAL_ERROR); + } + *pPk = (SNode*)pCol; + return TSDB_CODE_SUCCESS; +} + + +static EDealRes hasPkColImpl(SNode* pNode, void* pContext) { + if (nodeType(pNode) == QUERY_NODE_COLUMN && ((SColumnNode*)pNode)->tableHasPk) { + *(bool*)pContext = true; + return DEAL_RES_END; + } + return DEAL_RES_CONTINUE; +} + +static bool hasPkCol(SNode* pNode) { + bool hasPk = false; + nodesWalkExprPostOrder(pNode, hasPkColImpl, &hasPk); + return hasPk; +} + +static EDealRes appendPkForPkFuncImpl(SNode* pNode, void* pContext) { + STranslateContext* pCxt = pContext; + STableNode* pTable = NULL; + int32_t code = findTable(pCxt, NULL, &pTable); + if (TSDB_CODE_SUCCESS == code && QUERY_NODE_REAL_TABLE == nodeType(pTable) && isPkFunc(pNode) && hasPkCol(pNode)) { + SFunctionNode* pFunc = (SFunctionNode*)pNode; + SRealTableNode* pRealTable = (SRealTableNode*)pTable; + SNode* pPk = NULL; + pCxt->errCode = createPkColByTable(pCxt, pRealTable, &pPk); + if (TSDB_CODE_SUCCESS == pCxt->errCode) { + pCxt->errCode = nodesListMakeStrictAppend(&pFunc->pParameterList, pPk); + } + if (TSDB_CODE_SUCCESS == pCxt->errCode) { + pFunc->hasPk = true; + pFunc->pkBytes = ((SColumnNode*)pPk)->node.resType.bytes; + } + return TSDB_CODE_SUCCESS == pCxt->errCode ? DEAL_RES_IGNORE_CHILD : DEAL_RES_ERROR; + } + return DEAL_RES_CONTINUE; +} + +static int32_t appendPkParamForPkFunc(STranslateContext* pCxt, SSelectStmt* pSelect) { + nodesWalkSelectStmt(pSelect, SQL_CLAUSE_FROM, appendPkForPkFuncImpl, pCxt); + return pCxt->errCode; +} + typedef struct SReplaceOrderByAliasCxt { STranslateContext* pTranslateCxt; SNodeList* pProjectionList; @@ -4755,6 +4821,9 @@ static int32_t translateSelectFrom(STranslateContext* pCxt, SSelectStmt* pSelect if (TSDB_CODE_SUCCESS == code) { code = appendTsForImplicitTsFunc(pCxt, pSelect); } + if (TSDB_CODE_SUCCESS == code) { + code = appendPkParamForPkFunc(pCxt, pSelect); + } if (TSDB_CODE_SUCCESS == code) { code = replaceOrderByAliasForSelect(pCxt, pSelect); } @@ -5939,6 +6008,14 @@ static int32_t checkTableColsSchema(STranslateContext* pCxt, SHashObj* pHash, in if (TSDB_CODE_SUCCESS == code && pCol->is_pk && colIndex != 1) { code = generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_SECOND_COL_PK); } + if (TSDB_CODE_SUCCESS == code && pCol->is_pk && + !(TSDB_DATA_TYPE_INT == pCol->dataType.type || + TSDB_DATA_TYPE_UINT == pCol->dataType.type || + TSDB_DATA_TYPE_BIGINT == pCol->dataType.type || + TSDB_DATA_TYPE_UBIGINT == pCol->dataType.type || + TSDB_DATA_TYPE_VARCHAR == pCol->dataType.type)) { + code = generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_COL_PK_TYPE); + } if (TSDB_CODE_SUCCESS == code && pCol->dataType.type == TSDB_DATA_TYPE_JSON) { code = generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_COL_JSON); } diff --git a/source/libs/parser/src/parUtil.c b/source/libs/parser/src/parUtil.c index 91b60e07e4..1581f3ff07 100644 --- a/source/libs/parser/src/parUtil.c +++ b/source/libs/parser/src/parUtil.c @@ -198,6 +198,8 @@ static char* getSyntaxErrFormat(int32_t errCode) { return "tag %s can not be primary key"; case TSDB_CODE_PAR_SECOND_COL_PK: return "primary key column must be second column"; + case TSDB_CODE_PAR_COL_PK_TYPE: + return "primary key column must be of type int, uint, bigint, ubigint, and varchar"; default: return "Unknown error"; } diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 9cb67dc968..377c620b6c 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -2747,6 +2747,7 @@ typedef struct SLastRowScanOptSetColDataTypeCxt { SNodeList* pLastCols; SNodeList* pOtherCols; int32_t funcType; + int32_t pkBytes; } SLastRowScanOptSetColDataTypeCxt; static EDealRes lastRowScanOptSetColDataType(SNode* pNode, void* pContext) { @@ -2754,12 +2755,12 @@ static EDealRes lastRowScanOptSetColDataType(SNode* pNode, void* pContext) { SLastRowScanOptSetColDataTypeCxt* pCxt = pContext; if (pCxt->doAgg) { nodesListMakeAppend(&pCxt->pLastCols, pNode); - getLastCacheDataType(&(((SColumnNode*)pNode)->node.resType)); + getLastCacheDataType(&(((SColumnNode*)pNode)->node.resType), pCxt->pkBytes); } else { SNode* pCol = NULL; FOREACH(pCol, pCxt->pLastCols) { if (nodesEqualNode(pCol, pNode)) { - getLastCacheDataType(&(((SColumnNode*)pNode)->node.resType)); + getLastCacheDataType(&(((SColumnNode*)pNode)->node.resType), pCxt->pkBytes); break; } } @@ -2769,14 +2770,14 @@ static EDealRes lastRowScanOptSetColDataType(SNode* pNode, void* pContext) { return DEAL_RES_CONTINUE; } -static void lastRowScanOptSetLastTargets(SNodeList* pTargets, SNodeList* pLastCols, SNodeList* pLastRowCols, bool erase) { +static void lastRowScanOptSetLastTargets(SNodeList* pTargets, SNodeList* pLastCols, SNodeList* pLastRowCols, bool erase, int32_t pkBytes) { SNode* pTarget = NULL; WHERE_EACH(pTarget, pTargets) { bool found = false; SNode* pCol = NULL; FOREACH(pCol, pLastCols) { if (nodesEqualNode(pCol, pTarget)) { - getLastCacheDataType(&(((SColumnNode*)pTarget)->node.resType)); + getLastCacheDataType(&(((SColumnNode*)pTarget)->node.resType), pkBytes); found = true; break; } @@ -2883,6 +2884,7 @@ static int32_t lastRowScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogic return code; } cxt.funcType = pFunc->funcType; + cxt.pkBytes = (pFunc->hasPk) ? pFunc->pkBytes : 0; // add duplicate cols which be removed for both last_row, last if (pAgg->hasLast && pAgg->hasLastRow) { if (QUERY_NODE_COLUMN == nodeType(pParamNode)) { @@ -2961,9 +2963,9 @@ static int32_t lastRowScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogic if (NULL != cxt.pLastCols) { cxt.doAgg = false; cxt.funcType = FUNCTION_TYPE_CACHE_LAST; - lastRowScanOptSetLastTargets(pScan->pScanCols, cxt.pLastCols, pLastRowCols, true); + lastRowScanOptSetLastTargets(pScan->pScanCols, cxt.pLastCols, pLastRowCols, true, cxt.pkBytes); nodesWalkExprs(pScan->pScanPseudoCols, lastRowScanOptSetColDataType, &cxt); - lastRowScanOptSetLastTargets(pScan->node.pTargets, cxt.pLastCols, pLastRowCols, false); + lastRowScanOptSetLastTargets(pScan->node.pTargets, cxt.pLastCols, pLastRowCols, false, cxt.pkBytes); lastRowScanOptRemoveUslessTargets(pScan->node.pTargets, cxt.pLastCols, cxt.pOtherCols, pLastRowCols); if (pPKTsCol && pScan->node.pTargets->length == 1) { // when select last(ts),ts from ..., we add another ts to targets diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 14c89106c5..ffc6f8a42f 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -622,6 +622,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_PAR_VIEW_CONFLICT_WITH_TABLE, "View name is confli TAOS_DEFINE_ERROR(TSDB_CODE_PAR_NOT_SUPPORT_MULTI_RESULT, "Operator not supported multi result") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_TAG_IS_PRIMARY_KEY, "tag can not be primary key") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_SECOND_COL_PK, "primary key must be second column") +TAOS_DEFINE_ERROR(TSDB_CODE_PAR_COL_PK_TYPE, "primary key column must be of type int, uint, bigint, ubigint, and varchar") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INTERNAL_ERROR, "Parser internal error") //planner