feat: begin coding function
This commit is contained in:
parent
34c849bdbe
commit
cee96e2048
|
@ -107,6 +107,9 @@ typedef struct SFirstLastRes {
|
|||
bool isNull;
|
||||
int32_t bytes;
|
||||
int64_t ts;
|
||||
char* pkData;
|
||||
int32_t pkBytes;
|
||||
int8_t pkType;
|
||||
STuplePos pos;
|
||||
char buf[];
|
||||
} SFirstLastRes;
|
||||
|
|
|
@ -118,6 +118,7 @@ typedef struct SInputColumnInfoData {
|
|||
int32_t numOfInputCols; // PTS is not included
|
||||
bool colDataSMAIsSet; // if agg is set or not
|
||||
SColumnInfoData *pPTS; // primary timestamp column
|
||||
SColumnInfoData *pPrimaryKey; // primary key column
|
||||
SColumnInfoData **pData;
|
||||
SColumnDataAgg **pColumnDataAgg;
|
||||
uint64_t uid; // table uid, used to set the tag value when building the final query result for selectivity functions.
|
||||
|
@ -180,6 +181,43 @@ typedef struct SFunctionStateStore {
|
|||
int32_t (*streamStateFuncGet)(SStreamState *pState, const SWinKey *key, void **ppVal, int32_t *pVLen);
|
||||
} SFunctionStateStore;
|
||||
|
||||
typedef struct SFuncInputRow {
|
||||
TSKEY ts;
|
||||
bool isDataNull;
|
||||
char* pData;
|
||||
|
||||
SSDataBlock* block; // prev row block or src block
|
||||
int32_t rowIndex; // prev row block ? 0 : rowIndex in srcBlock
|
||||
|
||||
//TODO:
|
||||
// int32_t startOffset; // for diff, derivative
|
||||
// SPoint1 startPoint; // for twa
|
||||
} SFuncInputRow;
|
||||
|
||||
typedef struct SFuncInputRowIter {
|
||||
bool hasPrev;
|
||||
|
||||
SInputColumnInfoData* pInput;
|
||||
SColumnInfoData* pData;
|
||||
TSKEY* tsList;
|
||||
int32_t rowIndex;
|
||||
int32_t inputEndIndex;
|
||||
SSDataBlock* pSrcBlock;
|
||||
|
||||
TSKEY prevBlockTsEnd;
|
||||
bool prevIsDataNull;
|
||||
char* pPrevData;
|
||||
SSDataBlock* pPrevRowBlock; // pre one row block
|
||||
|
||||
//TODO:
|
||||
// int32_t prevStartOffset; // for diff, derivative.
|
||||
// SPoint1 prevStartPoint; // for twa.
|
||||
// int32_t startOffset; // for diff, derivative.
|
||||
// SPoint1 startPoint; // for twa.
|
||||
|
||||
bool finalRow;
|
||||
} SFuncInputRowIter;
|
||||
|
||||
// sql function runtime context
|
||||
typedef struct SqlFunctionCtx {
|
||||
SInputColumnInfoData input;
|
||||
|
@ -209,6 +247,9 @@ typedef struct SqlFunctionCtx {
|
|||
int32_t exprIdx;
|
||||
char *udfName;
|
||||
SFunctionStateStore *pStore;
|
||||
bool hasPrimaryKey;
|
||||
SFuncInputRowIter rowIter;
|
||||
bool bInputFinished;
|
||||
} SqlFunctionCtx;
|
||||
|
||||
typedef struct tExprNode {
|
||||
|
|
|
@ -240,8 +240,9 @@ bool fmIsGroupKeyFunc(int32_t funcId);
|
|||
bool fmIsBlockDistFunc(int32_t funcId);
|
||||
bool fmIsConstantResFunc(SFunctionNode* pFunc);
|
||||
bool fmIsSkipScanCheckFunc(int32_t funcId);
|
||||
bool fmIsPrimaryKeyFunc(int32_t funcId);
|
||||
|
||||
void getLastCacheDataType(SDataType* pType);
|
||||
void getLastCacheDataType(SDataType* pType, int32_t pkBytes);
|
||||
SFunctionNode* createFunction(const char* pName, SNodeList* pParameterList);
|
||||
|
||||
int32_t fmGetDistMethod(const SFunctionNode* pFunc, SFunctionNode** pPartialFunc, SFunctionNode** pMidFunc, SFunctionNode** pMergeFunc);
|
||||
|
|
|
@ -75,6 +75,7 @@ typedef struct SColumnNode {
|
|||
SExprNode node; // QUERY_NODE_COLUMN
|
||||
uint64_t tableId;
|
||||
int8_t tableType;
|
||||
bool tableHasPk;
|
||||
col_id_t colId;
|
||||
uint16_t projIdx; // the idx in project list, start from 1
|
||||
EColumnType colType; // column or tag
|
||||
|
@ -163,6 +164,8 @@ typedef struct SFunctionNode {
|
|||
int32_t funcType;
|
||||
SNodeList* pParameterList;
|
||||
int32_t udfBufSize;
|
||||
bool hasPk;
|
||||
int32_t pkBytes;
|
||||
} SFunctionNode;
|
||||
|
||||
typedef struct STableNode {
|
||||
|
|
|
@ -760,6 +760,7 @@ int32_t* taosGetErrno();
|
|||
#define TSDB_CODE_PAR_NOT_SUPPORT_MULTI_RESULT TAOS_DEF_ERROR_CODE(0, 0x2670)
|
||||
#define TSDB_CODE_PAR_TAG_IS_PRIMARY_KEY TAOS_DEF_ERROR_CODE(0, 0x2671)
|
||||
#define TSDB_CODE_PAR_SECOND_COL_PK TAOS_DEF_ERROR_CODE(0, 0x2672)
|
||||
#define TSDB_CODE_PAR_COL_PK_TYPE TAOS_DEF_ERROR_CODE(0, 0x2673)
|
||||
#define TSDB_CODE_PAR_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x26FF)
|
||||
|
||||
//planner
|
||||
|
|
|
@ -303,6 +303,12 @@ static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int
|
|||
pInput->colDataSMAIsSet = false;
|
||||
|
||||
SExprInfo* pOneExpr = &pExprSup->pExprInfo[i];
|
||||
bool hasPk = pOneExpr->pExpr->nodeType == QUERY_NODE_FUNCTION && pOneExpr->pExpr->_function.pFunctNode->hasPk;
|
||||
pCtx[i].hasPrimaryKey = hasPk;
|
||||
|
||||
int16_t tsParamIdx = (!hasPk) ? pOneExpr->base.numOfParams - 1 : pOneExpr->base.numOfParams - 2;
|
||||
int16_t pkParamIdx = pOneExpr->base.numOfParams - 1;
|
||||
|
||||
for (int32_t j = 0; j < pOneExpr->base.numOfParams; ++j) {
|
||||
SFunctParam* pFuncParam = &pOneExpr->base.pParam[j];
|
||||
if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) {
|
||||
|
@ -315,9 +321,13 @@ static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int
|
|||
|
||||
// NOTE: the last parameter is the primary timestamp column
|
||||
// todo: refactor this
|
||||
if (fmIsImplicitTsFunc(pCtx[i].functionId) && (j == pOneExpr->base.numOfParams - 1)) {
|
||||
|
||||
if (fmIsImplicitTsFunc(pCtx[i].functionId) && (j == tsParamIdx)) {
|
||||
pInput->pPTS = pInput->pData[j]; // in case of merge function, this is not always the ts column data.
|
||||
}
|
||||
if (hasPk && (j == pkParamIdx)) {
|
||||
pInput->pPrimaryKey = pInput->pData[j];
|
||||
}
|
||||
ASSERT(pInput->pData[j] != NULL);
|
||||
} else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
|
||||
// todo avoid case: top(k, 12), 12 is the value parameter.
|
||||
|
|
|
@ -157,7 +157,7 @@ int32_t firstLastFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
|||
int32_t firstLastPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||
int32_t firstCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
|
||||
int32_t lastCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
|
||||
int32_t getFirstLastInfoSize(int32_t resBytes);
|
||||
int32_t getFirstLastInfoSize(int32_t resBytes, int32_t pkBytes);
|
||||
EFuncDataRequired firstDynDataReq(void* pRes, STimeWindow* pTimeWindow);
|
||||
EFuncDataRequired lastDynDataReq(void* pRes, STimeWindow* pTimeWindow);
|
||||
|
||||
|
|
|
@ -53,6 +53,7 @@ extern "C" {
|
|||
#define FUNC_MGT_GEOMETRY_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(24)
|
||||
#define FUNC_MGT_FORBID_SYSTABLE_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(25)
|
||||
#define FUNC_MGT_SKIP_SCAN_CHECK_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(26)
|
||||
#define FUNC_MGT_PRIMARY_KEY_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(27)
|
||||
|
||||
#define FUNC_MGT_TEST_MASK(val, mask) (((val) & (mask)) != 0)
|
||||
|
||||
|
|
|
@ -1782,9 +1782,9 @@ static int32_t translateFirstLastImpl(SFunctionNode* pFunc, char* pErrBuf, int32
|
|||
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
|
||||
}
|
||||
}
|
||||
|
||||
int32_t pkBytes = (pFunc->hasPk) ? pFunc->pkBytes : 0;
|
||||
pFunc->node.resType =
|
||||
(SDataType){.bytes = getFirstLastInfoSize(paraBytes) + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY};
|
||||
(SDataType){.bytes = getFirstLastInfoSize(paraBytes, pkBytes) + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY};
|
||||
} else {
|
||||
if (TSDB_DATA_TYPE_BINARY != paraType) {
|
||||
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
|
||||
|
@ -2753,7 +2753,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
.name = "interp",
|
||||
.type = FUNCTION_TYPE_INTERP,
|
||||
.classification = FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_INTERVAL_INTERPO_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC |
|
||||
FUNC_MGT_FORBID_STREAM_FUNC | FUNC_MGT_FORBID_SYSTABLE_FUNC | FUNC_MGT_KEEP_ORDER_FUNC,
|
||||
FUNC_MGT_FORBID_STREAM_FUNC | FUNC_MGT_FORBID_SYSTABLE_FUNC | FUNC_MGT_KEEP_ORDER_FUNC | FUNC_MGT_PRIMARY_KEY_FUNC,
|
||||
.translateFunc = translateInterp,
|
||||
.getEnvFunc = getSelectivityFuncEnv,
|
||||
.initFunc = functionSetup,
|
||||
|
@ -2765,7 +2765,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
.name = "derivative",
|
||||
.type = FUNCTION_TYPE_DERIVATIVE,
|
||||
.classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC |
|
||||
FUNC_MGT_KEEP_ORDER_FUNC | FUNC_MGT_CUMULATIVE_FUNC | FUNC_MGT_FORBID_STREAM_FUNC | FUNC_MGT_FORBID_SYSTABLE_FUNC,
|
||||
FUNC_MGT_KEEP_ORDER_FUNC | FUNC_MGT_CUMULATIVE_FUNC | FUNC_MGT_FORBID_STREAM_FUNC | FUNC_MGT_FORBID_SYSTABLE_FUNC | FUNC_MGT_PRIMARY_KEY_FUNC,
|
||||
.translateFunc = translateDerivative,
|
||||
.getEnvFunc = getDerivativeFuncEnv,
|
||||
.initFunc = derivativeFuncSetup,
|
||||
|
@ -2778,7 +2778,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
.name = "irate",
|
||||
.type = FUNCTION_TYPE_IRATE,
|
||||
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC | FUNC_MGT_FORBID_STREAM_FUNC |
|
||||
FUNC_MGT_FORBID_SYSTABLE_FUNC,
|
||||
FUNC_MGT_FORBID_SYSTABLE_FUNC | FUNC_MGT_PRIMARY_KEY_FUNC,
|
||||
.translateFunc = translateIrate,
|
||||
.getEnvFunc = getIrateFuncEnv,
|
||||
.initFunc = irateFuncSetup,
|
||||
|
@ -2792,7 +2792,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
.name = "_irate_partial",
|
||||
.type = FUNCTION_TYPE_IRATE_PARTIAL,
|
||||
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC | FUNC_MGT_FORBID_STREAM_FUNC |
|
||||
FUNC_MGT_FORBID_SYSTABLE_FUNC,
|
||||
FUNC_MGT_FORBID_SYSTABLE_FUNC | FUNC_MGT_PRIMARY_KEY_FUNC,
|
||||
.translateFunc = translateIratePartial,
|
||||
.getEnvFunc = getIrateFuncEnv,
|
||||
.initFunc = irateFuncSetup,
|
||||
|
@ -2815,7 +2815,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
.name = "last_row",
|
||||
.type = FUNCTION_TYPE_LAST_ROW,
|
||||
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC |
|
||||
FUNC_MGT_KEEP_ORDER_FUNC | FUNC_MGT_FORBID_SYSTABLE_FUNC,
|
||||
FUNC_MGT_KEEP_ORDER_FUNC | FUNC_MGT_FORBID_SYSTABLE_FUNC | FUNC_MGT_PRIMARY_KEY_FUNC,
|
||||
.translateFunc = translateFirstLast,
|
||||
.dynDataRequiredFunc = lastDynDataReq,
|
||||
.getEnvFunc = getFirstLastFuncEnv,
|
||||
|
@ -2852,7 +2852,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
.name = "_last_row_partial",
|
||||
.type = FUNCTION_TYPE_LAST_PARTIAL,
|
||||
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC |
|
||||
FUNC_MGT_FORBID_SYSTABLE_FUNC,
|
||||
FUNC_MGT_FORBID_SYSTABLE_FUNC | FUNC_MGT_PRIMARY_KEY_FUNC,
|
||||
.translateFunc = translateFirstLastPartial,
|
||||
.dynDataRequiredFunc = lastDynDataReq,
|
||||
.getEnvFunc = getFirstLastFuncEnv,
|
||||
|
@ -2864,7 +2864,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
.name = "_last_row_merge",
|
||||
.type = FUNCTION_TYPE_LAST_MERGE,
|
||||
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC |
|
||||
FUNC_MGT_FORBID_SYSTABLE_FUNC,
|
||||
FUNC_MGT_FORBID_SYSTABLE_FUNC | FUNC_MGT_PRIMARY_KEY_FUNC,
|
||||
.translateFunc = translateFirstLastMerge,
|
||||
.getEnvFunc = getFirstLastFuncEnv,
|
||||
.initFunc = functionSetup,
|
||||
|
@ -2875,7 +2875,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
.name = "first",
|
||||
.type = FUNCTION_TYPE_FIRST,
|
||||
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC |
|
||||
FUNC_MGT_KEEP_ORDER_FUNC | FUNC_MGT_FORBID_SYSTABLE_FUNC,
|
||||
FUNC_MGT_KEEP_ORDER_FUNC | FUNC_MGT_FORBID_SYSTABLE_FUNC | FUNC_MGT_PRIMARY_KEY_FUNC,
|
||||
.translateFunc = translateFirstLast,
|
||||
.dynDataRequiredFunc = firstDynDataReq,
|
||||
.getEnvFunc = getFirstLastFuncEnv,
|
||||
|
@ -2891,7 +2891,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
.name = "_first_partial",
|
||||
.type = FUNCTION_TYPE_FIRST_PARTIAL,
|
||||
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC |
|
||||
FUNC_MGT_FORBID_SYSTABLE_FUNC,
|
||||
FUNC_MGT_FORBID_SYSTABLE_FUNC | FUNC_MGT_PRIMARY_KEY_FUNC,
|
||||
.translateFunc = translateFirstLastPartial,
|
||||
.dynDataRequiredFunc = firstDynDataReq,
|
||||
.getEnvFunc = getFirstLastFuncEnv,
|
||||
|
@ -2904,7 +2904,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
.name = "_first_merge",
|
||||
.type = FUNCTION_TYPE_FIRST_MERGE,
|
||||
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC |
|
||||
FUNC_MGT_FORBID_SYSTABLE_FUNC,
|
||||
FUNC_MGT_FORBID_SYSTABLE_FUNC | FUNC_MGT_PRIMARY_KEY_FUNC,
|
||||
.translateFunc = translateFirstLastMerge,
|
||||
.getEnvFunc = getFirstLastFuncEnv,
|
||||
.initFunc = functionSetup,
|
||||
|
@ -2916,7 +2916,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
.name = "last",
|
||||
.type = FUNCTION_TYPE_LAST,
|
||||
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC |
|
||||
FUNC_MGT_KEEP_ORDER_FUNC | FUNC_MGT_FORBID_SYSTABLE_FUNC,
|
||||
FUNC_MGT_KEEP_ORDER_FUNC | FUNC_MGT_FORBID_SYSTABLE_FUNC | FUNC_MGT_PRIMARY_KEY_FUNC,
|
||||
.translateFunc = translateFirstLast,
|
||||
.dynDataRequiredFunc = lastDynDataReq,
|
||||
.getEnvFunc = getFirstLastFuncEnv,
|
||||
|
@ -2932,7 +2932,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
.name = "_last_partial",
|
||||
.type = FUNCTION_TYPE_LAST_PARTIAL,
|
||||
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC |
|
||||
FUNC_MGT_FORBID_SYSTABLE_FUNC,
|
||||
FUNC_MGT_FORBID_SYSTABLE_FUNC | FUNC_MGT_PRIMARY_KEY_FUNC,
|
||||
.translateFunc = translateFirstLastPartial,
|
||||
.dynDataRequiredFunc = lastDynDataReq,
|
||||
.getEnvFunc = getFirstLastFuncEnv,
|
||||
|
@ -2945,7 +2945,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
.name = "_last_merge",
|
||||
.type = FUNCTION_TYPE_LAST_MERGE,
|
||||
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC |
|
||||
FUNC_MGT_FORBID_SYSTABLE_FUNC,
|
||||
FUNC_MGT_FORBID_SYSTABLE_FUNC | FUNC_MGT_PRIMARY_KEY_FUNC,
|
||||
.translateFunc = translateFirstLastMerge,
|
||||
.getEnvFunc = getFirstLastFuncEnv,
|
||||
.initFunc = functionSetup,
|
||||
|
@ -2957,7 +2957,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
.name = "twa",
|
||||
.type = FUNCTION_TYPE_TWA,
|
||||
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_INTERVAL_INTERPO_FUNC | FUNC_MGT_FORBID_STREAM_FUNC |
|
||||
FUNC_MGT_IMPLICIT_TS_FUNC | FUNC_MGT_FORBID_SYSTABLE_FUNC,
|
||||
FUNC_MGT_IMPLICIT_TS_FUNC | FUNC_MGT_FORBID_SYSTABLE_FUNC | FUNC_MGT_PRIMARY_KEY_FUNC,
|
||||
.translateFunc = translateInNumOutDou,
|
||||
.dataRequiredFunc = statisDataRequired,
|
||||
.getEnvFunc = getTwaFuncEnv,
|
||||
|
@ -3060,7 +3060,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
.name = "diff",
|
||||
.type = FUNCTION_TYPE_DIFF,
|
||||
.classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC |
|
||||
FUNC_MGT_KEEP_ORDER_FUNC | FUNC_MGT_FORBID_STREAM_FUNC | FUNC_MGT_CUMULATIVE_FUNC | FUNC_MGT_FORBID_SYSTABLE_FUNC,
|
||||
FUNC_MGT_KEEP_ORDER_FUNC | FUNC_MGT_FORBID_STREAM_FUNC | FUNC_MGT_CUMULATIVE_FUNC | FUNC_MGT_FORBID_SYSTABLE_FUNC | FUNC_MGT_PRIMARY_KEY_FUNC,
|
||||
.translateFunc = translateDiff,
|
||||
.getEnvFunc = getDiffFuncEnv,
|
||||
.initFunc = diffFunctionSetup,
|
||||
|
@ -3144,7 +3144,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
{
|
||||
.name = "unique",
|
||||
.type = FUNCTION_TYPE_UNIQUE,
|
||||
.classification = FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_FORBID_STREAM_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC,
|
||||
.classification = FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_FORBID_STREAM_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC | FUNC_MGT_PRIMARY_KEY_FUNC,
|
||||
.translateFunc = translateUnique,
|
||||
.getEnvFunc = getUniqueFuncEnv,
|
||||
.initFunc = uniqueFunctionSetup,
|
||||
|
|
|
@ -418,6 +418,209 @@ typedef struct SGroupKeyInfo {
|
|||
(_p).val = (_v); \
|
||||
} while (0)
|
||||
|
||||
int32_t funcInputUpdate(SqlFunctionCtx* pCtx) {
|
||||
SFuncInputRowIter* pIter = &pCtx->rowIter;
|
||||
|
||||
if (!pCtx->bInputFinished) {
|
||||
pIter->pInput = &pCtx->input;
|
||||
pIter->tsList = (TSKEY*)pIter->pInput->pPTS->pData;
|
||||
pIter->pData = pIter->pInput->pData[0];
|
||||
pIter->rowIndex = pIter->pInput->startRowIndex;
|
||||
pIter->inputEndIndex = pIter->rowIndex + pIter->pInput->numOfRows - 1;
|
||||
pIter->pSrcBlock = pCtx->pSrcBlock;
|
||||
} else {
|
||||
pIter->finalRow = true;
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
bool funcInputGetNextRowDescPk(SFuncInputRowIter* pIter, SFuncInputRow* pRow) {
|
||||
if (pIter->finalRow) {
|
||||
if (pIter->hasPrev) {
|
||||
pRow->ts = pIter->prevBlockTsEnd;
|
||||
pRow->isDataNull = pIter->prevIsDataNull;
|
||||
pRow->pData = pIter->pPrevData;
|
||||
pRow->block = pIter->pPrevRowBlock;
|
||||
pRow->rowIndex = 0;
|
||||
|
||||
pIter->hasPrev = false;
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
if (pIter->hasPrev) {
|
||||
if (pIter->prevBlockTsEnd == pIter->tsList[pIter->inputEndIndex]) {
|
||||
blockDataDestroy(pIter->pPrevRowBlock);
|
||||
pIter->pPrevRowBlock = blockDataExtractBlock(pIter->pSrcBlock, pIter->inputEndIndex, 1);
|
||||
pIter->prevIsDataNull = colDataIsNull_f(pIter->pData->nullbitmap, pIter->inputEndIndex);
|
||||
pIter->pPrevData = taosMemoryMalloc(pIter->pData->info.bytes);
|
||||
char* srcData = colDataGetData(pIter->pData, pIter->inputEndIndex);
|
||||
memcpy(pIter->pPrevData, srcData, pIter->pData->info.bytes);
|
||||
pIter->pPrevRowBlock = blockDataExtractBlock(pIter->pSrcBlock, pIter->inputEndIndex, 1);
|
||||
|
||||
pIter->hasPrev = true;
|
||||
return false;
|
||||
} else {
|
||||
int32_t idx = pIter->rowIndex;
|
||||
while (pIter->tsList[idx] == pIter->prevBlockTsEnd) {
|
||||
++idx;
|
||||
}
|
||||
pRow->ts = pIter->prevBlockTsEnd;
|
||||
if (idx == pIter->pInput->startRowIndex) {
|
||||
pRow->isDataNull = pIter->prevIsDataNull;
|
||||
pRow->pData = pIter->pPrevData;
|
||||
pRow->block = pIter->pPrevRowBlock;
|
||||
pRow->rowIndex = 0;
|
||||
} else {
|
||||
pRow->ts = pIter->tsList[idx - 1];
|
||||
pRow->isDataNull = colDataIsNull_f(pIter->pData->nullbitmap, idx - 1);
|
||||
pRow->pData = colDataGetData(pIter->pData, idx - 1);
|
||||
pRow->block = pIter->pSrcBlock;
|
||||
pRow->rowIndex = idx - 1;
|
||||
}
|
||||
pIter->hasPrev = false;
|
||||
pIter->rowIndex = idx;
|
||||
return true;
|
||||
}
|
||||
} else {
|
||||
TSKEY tsEnd = pIter->tsList[pIter->inputEndIndex];
|
||||
if (pIter->tsList[pIter->rowIndex] != tsEnd) {
|
||||
int32_t idx = pIter->rowIndex;
|
||||
while (pIter->tsList[idx + 1] == pIter->tsList[pIter->rowIndex]) {
|
||||
++idx;
|
||||
}
|
||||
pRow->ts = pIter->tsList[idx];
|
||||
pRow->isDataNull = colDataIsNull_f(pIter->pData->nullbitmap, idx);
|
||||
pRow->pData = colDataGetData(pIter->pData, pIter->rowIndex);
|
||||
pRow->block = pIter->pSrcBlock;
|
||||
pRow->rowIndex = idx;
|
||||
|
||||
pIter->rowIndex = idx + 1;
|
||||
return true;
|
||||
} else {
|
||||
pIter->hasPrev = true;
|
||||
pIter->prevBlockTsEnd = tsEnd;
|
||||
// TODO
|
||||
pIter->prevIsDataNull = colDataIsNull_f(pIter->pData->nullbitmap, pIter->inputEndIndex);
|
||||
pIter->pPrevData = taosMemoryMalloc(pIter->pData->info.bytes);
|
||||
memcpy(pIter->pPrevData, colDataGetData(pIter->pData, pIter->inputEndIndex), pIter->pData->info.bytes);
|
||||
pIter->pPrevRowBlock = blockDataExtractBlock(pIter->pSrcBlock, pIter->inputEndIndex, 1);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bool funcInputGetNextRowAscPk(SFuncInputRowIter *pIter, SFuncInputRow* pRow) {
|
||||
if (pIter->hasPrev) {
|
||||
if (pIter->prevBlockTsEnd == pIter->tsList[pIter->inputEndIndex]) {
|
||||
pIter->hasPrev = true;
|
||||
return false;
|
||||
} else {
|
||||
int32_t idx = pIter->rowIndex;
|
||||
while (pIter->tsList[idx] == pIter->prevBlockTsEnd) {
|
||||
++idx;
|
||||
}
|
||||
pRow->ts = pIter->tsList[idx];
|
||||
pRow->isDataNull = colDataIsNull_f(pIter->pData->nullbitmap, idx);
|
||||
pRow->pData = colDataGetData(pIter->pData, idx);
|
||||
pRow->block = pIter->pSrcBlock;
|
||||
pRow->rowIndex = idx;
|
||||
|
||||
pIter->hasPrev = false;
|
||||
pIter->rowIndex = idx + 1;
|
||||
return true;
|
||||
}
|
||||
} else {
|
||||
if (pIter->rowIndex <= pIter->inputEndIndex) {
|
||||
pRow->ts = pIter->tsList[pIter->rowIndex];
|
||||
pRow->isDataNull = colDataIsNull_f(pIter->pData->nullbitmap, pIter->rowIndex);
|
||||
pRow->pData = colDataGetData(pIter->pData, pIter->rowIndex);
|
||||
pRow->block = pIter->pSrcBlock;
|
||||
pRow->rowIndex = pIter->rowIndex;
|
||||
|
||||
TSKEY tsEnd = pIter->tsList[pIter->inputEndIndex];
|
||||
if (pIter->tsList[pIter->rowIndex] != tsEnd) {
|
||||
int32_t idx = pIter->rowIndex + 1;
|
||||
while (idx <= pIter->inputEndIndex && pIter->tsList[idx] == pIter->tsList[pIter->rowIndex]) {
|
||||
++idx;
|
||||
}
|
||||
pIter->rowIndex = idx;
|
||||
} else {
|
||||
pIter->rowIndex = pIter->inputEndIndex + 1;
|
||||
}
|
||||
return true;
|
||||
} else {
|
||||
TSKEY tsEnd = pIter->tsList[pIter->inputEndIndex];
|
||||
pIter->hasPrev = true;
|
||||
pIter->prevBlockTsEnd = tsEnd;
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bool funcInputGetNextRowNoPk(SFuncInputRowIter *pIter, SFuncInputRow* pRow) {
|
||||
if (pIter->rowIndex <= pIter->inputEndIndex) {
|
||||
pRow->ts = pIter->tsList[pIter->rowIndex];
|
||||
pRow->isDataNull = colDataIsNull_f(pIter->pData->nullbitmap, pIter->rowIndex);
|
||||
pRow->pData = colDataGetData(pIter->pData, pIter->rowIndex);
|
||||
pRow->block = pIter->pSrcBlock;
|
||||
pRow->rowIndex = pIter->rowIndex;
|
||||
|
||||
++pIter->rowIndex;
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
bool funcInputGetNextRow(SqlFunctionCtx* pCtx, SFuncInputRow* pRow) {
|
||||
SFuncInputRowIter* pIter = &pCtx->rowIter;
|
||||
if (pCtx->hasPrimaryKey) {
|
||||
if (pCtx->order == TSDB_ORDER_DESC) {
|
||||
return funcInputGetNextRowDescPk(pIter, pRow);
|
||||
} else {
|
||||
return funcInputGetNextRowAscPk(pIter, pRow);
|
||||
}
|
||||
} else {
|
||||
return funcInputGetNextRowNoPk(pIter, pRow);
|
||||
}
|
||||
}
|
||||
|
||||
// This function append the selectivity to subsidiaries function context directly, without fetching data
|
||||
// from intermediate disk based buf page
|
||||
void appendSelectivityCols(SqlFunctionCtx* pCtx, SSDataBlock* pSrcBlock, int32_t rowIndex, int32_t pos) {
|
||||
if (pCtx->subsidiaries.num <= 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
for (int32_t j = 0; j < pCtx->subsidiaries.num; ++j) {
|
||||
SqlFunctionCtx* pc = pCtx->subsidiaries.pCtx[j];
|
||||
|
||||
// get data from source col
|
||||
SFunctParam* pFuncParam = &pc->pExpr->base.pParam[0];
|
||||
int32_t srcSlotId = pFuncParam->pCol->slotId;
|
||||
|
||||
SColumnInfoData* pSrcCol = taosArrayGet(pSrcBlock->pDataBlock, srcSlotId);
|
||||
|
||||
char* pData = colDataGetData(pSrcCol, rowIndex);
|
||||
|
||||
// append to dest col
|
||||
int32_t dstSlotId = pc->pExpr->base.resSchema.slotId;
|
||||
|
||||
SColumnInfoData* pDstCol = taosArrayGet(pCtx->pDstBlock->pDataBlock, dstSlotId);
|
||||
|
||||
if (colDataIsNull_s(pSrcCol, rowIndex) == true) {
|
||||
colDataSetNULL(pDstCol, pos);
|
||||
} else {
|
||||
colDataSetVal(pDstCol, pos, pData, false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bool funcInputGetNextRowIndex(SInputColumnInfoData* pInput, int32_t from, bool firstOccur, int32_t* pRowIndex, int32_t* nextFrom);
|
||||
|
||||
static int32_t firstLastTransferInfoImpl(SFirstLastRes* pInput, SFirstLastRes* pOutput, bool isFirst);
|
||||
|
||||
bool functionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) {
|
||||
|
@ -2119,11 +2322,14 @@ EFuncDataRequired lastDynDataReq(void* pRes, STimeWindow* pTimeWindow) {
|
|||
}
|
||||
}
|
||||
|
||||
int32_t getFirstLastInfoSize(int32_t resBytes) { return sizeof(SFirstLastRes) + resBytes; }
|
||||
//TODO modify it to include primary key bytes
|
||||
int32_t getFirstLastInfoSize(int32_t resBytes, int32_t pkBytes) { return sizeof(SFirstLastRes) + resBytes + pkBytes; }
|
||||
|
||||
bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
||||
SColumnNode* pNode = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, 0);
|
||||
pEnv->calcMemSize = getFirstLastInfoSize(pNode->node.resType.bytes);
|
||||
//TODO: change SFunctionNode to add pk info
|
||||
int32_t pkBytes = (pFunc->hasPk) ? pFunc->pkBytes : 0;
|
||||
pEnv->calcMemSize = getFirstLastInfoSize(pNode->node.resType.bytes, pkBytes);
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -2177,7 +2383,7 @@ static int32_t firstlastSaveTupleData(const SSDataBlock* pSrcBlock, int32_t rowI
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t doSaveCurrentVal(SqlFunctionCtx* pCtx, int32_t rowIndex, int64_t currentTs, int32_t type, char* pData) {
|
||||
static int32_t doSaveCurrentVal(SqlFunctionCtx* pCtx, int32_t rowIndex, int64_t currentTs, char* pkData, int32_t type, char* pData) {
|
||||
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||
SFirstLastRes* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
||||
|
||||
|
@ -2186,6 +2392,14 @@ static int32_t doSaveCurrentVal(SqlFunctionCtx* pCtx, int32_t rowIndex, int64_t
|
|||
}
|
||||
|
||||
memcpy(pInfo->buf, pData, pInfo->bytes);
|
||||
if (pkData != NULL) {
|
||||
if (IS_VAR_DATA_TYPE(pInfo->pkType)) {
|
||||
pInfo->pkBytes = varDataTLen(pkData);
|
||||
}
|
||||
memcpy(pInfo->buf + pInfo->bytes, pkData, pInfo->pkBytes);
|
||||
pInfo->pkData = pInfo->buf + pInfo->bytes;
|
||||
}
|
||||
|
||||
pInfo->ts = currentTs;
|
||||
int32_t code = firstlastSaveTupleData(pCtx->pSrcBlock, rowIndex, pCtx, pInfo);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -2213,6 +2427,15 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SColumnInfoData* pkCol = pInput->pPrimaryKey;
|
||||
pInfo->pkType = -1;
|
||||
__compar_fn_t pkCompareFn = NULL;
|
||||
if (pCtx->hasPrimaryKey) {
|
||||
pInfo->pkType = pkCol->info.type;
|
||||
pInfo->pkBytes = pkCol->info.bytes;
|
||||
pkCompareFn = getKeyComparFunc(pInfo->pkType, TSDB_ORDER_ASC);
|
||||
}
|
||||
|
||||
// All null data column, return directly.
|
||||
if (pInput->colDataSMAIsSet && (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows) &&
|
||||
pInputCol->hasNull == true) {
|
||||
|
@ -2283,17 +2506,24 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) {
|
|||
}
|
||||
#else
|
||||
int64_t* pts = (int64_t*)pInput->pPTS->pData;
|
||||
for (int32_t i = pInput->startRowIndex; i < pInput->startRowIndex + pInput->numOfRows; ++i) {
|
||||
|
||||
int from = -1;
|
||||
int32_t i = -1;
|
||||
while (funcInputGetNextRowIndex(pInput, from, true, &i, &from)) {
|
||||
if (pInputCol->hasNull && colDataIsNull(pInputCol, pInput->totalRows, i, pColAgg)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
numOfElems++;
|
||||
|
||||
char* data = colDataGetData(pInputCol, i);
|
||||
char* pkData = NULL;
|
||||
if (pCtx->hasPrimaryKey) {
|
||||
pkData = colDataGetData(pkCol, i);
|
||||
}
|
||||
TSKEY cts = pts[i];
|
||||
if (pResInfo->numOfRes == 0 || pInfo->ts > cts) {
|
||||
int32_t code = doSaveCurrentVal(pCtx, i, cts, pInputCol->info.type, data);
|
||||
if (pResInfo->numOfRes == 0 || pInfo->ts > cts ||
|
||||
(pInfo->ts == cts && pkCompareFn && pkCompareFn(pkData, pInfo->pkData) < 0)) {
|
||||
int32_t code = doSaveCurrentVal(pCtx, i, cts, pkData, pInputCol->info.type, data);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
@ -2330,6 +2560,15 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SColumnInfoData* pkCol = pInput->pPrimaryKey;
|
||||
pInfo->pkType = -1;
|
||||
__compar_fn_t pkCompareFn = NULL;
|
||||
if (pCtx->hasPrimaryKey) {
|
||||
pInfo->pkType = pkCol->info.type;
|
||||
pInfo->pkBytes = pkCol->info.bytes;
|
||||
pkCompareFn = getKeyComparFunc(pInfo->pkType, TSDB_ORDER_DESC);
|
||||
}
|
||||
|
||||
// All null data column, return directly.
|
||||
if (pInput->colDataSMAIsSet && (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows) &&
|
||||
pInputCol->hasNull == true) {
|
||||
|
@ -2400,7 +2639,7 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) {
|
|||
}
|
||||
}
|
||||
#else
|
||||
if (!pInputCol->hasNull) {
|
||||
if (!pInputCol->hasNull && !pCtx->hasPrimaryKey) {
|
||||
numOfElems = 1;
|
||||
|
||||
int32_t round = pInput->numOfRows >> 2;
|
||||
|
@ -2427,7 +2666,7 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) {
|
|||
|
||||
if (pResInfo->numOfRes == 0 || pInfo->ts < cts) {
|
||||
char* data = colDataGetData(pInputCol, chosen);
|
||||
int32_t code = doSaveCurrentVal(pCtx, i, cts, type, data);
|
||||
int32_t code = doSaveCurrentVal(pCtx, i, cts, NULL, type, data);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
@ -2438,7 +2677,7 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) {
|
|||
for (int32_t i = pInput->startRowIndex + round * 4; i < pInput->startRowIndex + pInput->numOfRows; ++i) {
|
||||
if (pResInfo->numOfRes == 0 || pInfo->ts < pts[i]) {
|
||||
char* data = colDataGetData(pInputCol, i);
|
||||
int32_t code = doSaveCurrentVal(pCtx, i, pts[i], type, data);
|
||||
int32_t code = doSaveCurrentVal(pCtx, i, pts[i], NULL, type, data);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
@ -2446,16 +2685,22 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) {
|
|||
}
|
||||
}
|
||||
} else {
|
||||
for (int32_t i = pInput->startRowIndex; i < pInput->startRowIndex + pInput->numOfRows; ++i) {
|
||||
int from = -1;
|
||||
int32_t i = -1;
|
||||
while (funcInputGetNextRowIndex(pInput, from, false, &i, &from)) {
|
||||
if (colDataIsNull(pInputCol, pInput->totalRows, i, pColAgg)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
numOfElems++;
|
||||
|
||||
if (pResInfo->numOfRes == 0 || pInfo->ts < pts[i]) {
|
||||
char* pkData = NULL;
|
||||
if (pCtx->hasPrimaryKey) {
|
||||
pkData = colDataGetData(pkCol, i);
|
||||
}
|
||||
if (pResInfo->numOfRes == 0 || pInfo->ts < pts[i] ||
|
||||
(pInfo->ts == pts[i] && pkCompareFn && pkCompareFn(pkData, pInfo->pkData) < 0)) {
|
||||
char* data = colDataGetData(pInputCol, i);
|
||||
int32_t code = doSaveCurrentVal(pCtx, i, pts[i], type, data);
|
||||
int32_t code = doSaveCurrentVal(pCtx, i, pts[i], pkData, type, data);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
@ -2483,14 +2728,19 @@ static int32_t firstLastTransferInfoImpl(SFirstLastRes* pInput, SFirstLastRes* p
|
|||
if (!pInput->hasResult) {
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
__compar_fn_t pkCompareFn = NULL;
|
||||
if (pInput->pkData) {
|
||||
pkCompareFn = getKeyComparFunc(pInput->pkType, (isFirst) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC);
|
||||
}
|
||||
if (pOutput->hasResult) {
|
||||
if (isFirst) {
|
||||
if (pInput->ts > pOutput->ts) {
|
||||
if (pInput->ts > pOutput->ts ||
|
||||
(pInput->ts == pOutput->ts && pkCompareFn && pkCompareFn(pInput->pkData, pOutput->pkData) > 0)) {
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
} else {
|
||||
if (pInput->ts < pOutput->ts) {
|
||||
if (pInput->ts < pOutput->ts ||
|
||||
(pInput->ts == pOutput->ts && pkCompareFn && pkCompareFn(pInput->pkData, pOutput->pkData) < 0)) {
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
}
|
||||
|
@ -2501,6 +2751,10 @@ static int32_t firstLastTransferInfoImpl(SFirstLastRes* pInput, SFirstLastRes* p
|
|||
pOutput->bytes = pInput->bytes;
|
||||
|
||||
memcpy(pOutput->buf, pInput->buf, pOutput->bytes);
|
||||
if (pInput->pkData) {
|
||||
pOutput->pkBytes = pInput->pkBytes;
|
||||
memcpy(pOutput->buf+pOutput->bytes, pInput->pkData, pOutput->pkBytes);
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -2535,6 +2789,11 @@ static int32_t firstLastFunctionMergeImpl(SqlFunctionCtx* pCtx, bool isFirstQuer
|
|||
}
|
||||
char* data = colDataGetData(pCol, i);
|
||||
SFirstLastRes* pInputInfo = (SFirstLastRes*)varDataVal(data);
|
||||
if (pCtx->hasPrimaryKey) {
|
||||
pInputInfo->pkData = pInputInfo->buf + pInputInfo->bytes;
|
||||
} else {
|
||||
pInputInfo->pkData = NULL;
|
||||
}
|
||||
int32_t code = firstLastTransferInfo(pCtx, pInputInfo, pInfo, isFirstQuery, i);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
|
@ -2575,7 +2834,7 @@ int32_t firstLastPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
|||
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
|
||||
SFirstLastRes* pRes = GET_ROWCELL_INTERBUF(pEntryInfo);
|
||||
|
||||
int32_t resultBytes = getFirstLastInfoSize(pRes->bytes);
|
||||
int32_t resultBytes = getFirstLastInfoSize(pRes->bytes, pRes->pkBytes);
|
||||
|
||||
// todo check for failure
|
||||
char* res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char));
|
||||
|
@ -2613,6 +2872,8 @@ int32_t lastCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
|
|||
static int32_t doSaveLastrow(SqlFunctionCtx* pCtx, char* pData, int32_t rowIndex, int64_t cts, SFirstLastRes* pInfo) {
|
||||
SInputColumnInfoData* pInput = &pCtx->input;
|
||||
SColumnInfoData* pInputCol = pInput->pData[0];
|
||||
SColumnInfoData* pkCol = pInput->pPrimaryKey;
|
||||
|
||||
|
||||
if (colDataIsNull_s(pInputCol, rowIndex)) {
|
||||
pInfo->isNull = true;
|
||||
|
@ -2626,6 +2887,14 @@ static int32_t doSaveLastrow(SqlFunctionCtx* pCtx, char* pData, int32_t rowIndex
|
|||
memcpy(pInfo->buf, pData, pInfo->bytes);
|
||||
}
|
||||
|
||||
if (pCtx->hasPrimaryKey) {
|
||||
char* pkData = colDataGetData(pkCol, rowIndex);
|
||||
if (IS_VAR_DATA_TYPE(pInfo->pkType)) {
|
||||
pInfo->pkBytes = varDataTLen(pkData);
|
||||
}
|
||||
memcpy(pInfo->buf + pInfo->bytes, pkData, pInfo->pkBytes);
|
||||
pInfo->pkData = pInfo->buf + pInfo->bytes;
|
||||
}
|
||||
pInfo->ts = cts;
|
||||
int32_t code = firstlastSaveTupleData(pCtx->pSrcBlock, rowIndex, pCtx, pInfo);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -2653,7 +2922,14 @@ int32_t lastRowFunction(SqlFunctionCtx* pCtx) {
|
|||
if (IS_NULL_TYPE(type)) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SColumnInfoData* pkCol = pInput->pPrimaryKey;
|
||||
pInfo->pkType = -1;
|
||||
__compar_fn_t pkCompareFn = NULL;
|
||||
if (pCtx->hasPrimaryKey) {
|
||||
pInfo->pkType = pkCol->info.type;
|
||||
pInfo->pkBytes = pkCol->info.bytes;
|
||||
pkCompareFn = getKeyComparFunc(pInfo->pkType, TSDB_ORDER_DESC);
|
||||
}
|
||||
TSKEY startKey = getRowPTs(pInput->pPTS, 0);
|
||||
TSKEY endKey = getRowPTs(pInput->pPTS, pInput->totalRows - 1);
|
||||
|
||||
|
@ -2689,13 +2965,20 @@ int32_t lastRowFunction(SqlFunctionCtx* pCtx) {
|
|||
#else
|
||||
|
||||
int64_t* pts = (int64_t*)pInput->pPTS->pData;
|
||||
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) {
|
||||
int from = -1;
|
||||
int32_t i = -1;
|
||||
while (funcInputGetNextRowIndex(pInput, from, false, &i, &from)) {
|
||||
bool isNull = colDataIsNull(pInputCol, pInput->numOfRows, i, NULL);
|
||||
char* data = isNull ? NULL : colDataGetData(pInputCol, i);
|
||||
TSKEY cts = pts[i];
|
||||
|
||||
numOfElems++;
|
||||
if (pResInfo->numOfRes == 0 || pInfo->ts < cts) {
|
||||
char* pkData = NULL;
|
||||
if (pCtx->hasPrimaryKey) {
|
||||
pkData = colDataGetData(pkCol, i);
|
||||
}
|
||||
if (pResInfo->numOfRes == 0 || pInfo->ts < cts ||
|
||||
(pInfo->ts == pts[i] && pkCompareFn && pkCompareFn(pkData, pInfo->pkData) < 0)) {
|
||||
int32_t code = doSaveLastrow(pCtx, data, i, cts, pInfo);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
|
@ -2854,13 +3137,52 @@ static int32_t doHandleDiff(SDiffInfo* pDiffInfo, int32_t type, const char* pv,
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
//TODO: the primary key compare can be skipped for ordered pk if knonwn before
|
||||
//TODO: for desc ordered, pk shall select the smallest one for one ts. if across block boundaries.
|
||||
bool funcInputGetNextRowIndex(SInputColumnInfoData* pInput, int32_t from, bool firstOccur, int32_t* pRowIndex, int32_t* nextFrom) {
|
||||
if (pInput->pPrimaryKey == NULL) {
|
||||
if (from == -1) {
|
||||
from = pInput->startRowIndex;
|
||||
} else if (from >= pInput->numOfRows + pInput->startRowIndex) {
|
||||
return false;
|
||||
}
|
||||
*pRowIndex = from;
|
||||
*nextFrom = from + 1;
|
||||
return true;
|
||||
} else {
|
||||
if (from == -1) {
|
||||
from = pInput->startRowIndex;
|
||||
} else if (from >= pInput->numOfRows + pInput->startRowIndex) {
|
||||
return false;
|
||||
}
|
||||
TSKEY* tsList = (int64_t*)pInput->pPTS->pData;
|
||||
SColumnInfoData* pkCol = pInput->pPrimaryKey;
|
||||
int8_t pkType = pkCol->info.type;
|
||||
int32_t order = (firstOccur) ? TSDB_ORDER_ASC: TSDB_ORDER_DESC;
|
||||
__compar_fn_t compareFunc = getKeyComparFunc(pkType, order);
|
||||
int32_t select = from;
|
||||
char* val = colDataGetData(pkCol, select);
|
||||
while (from < pInput->numOfRows + pInput->startRowIndex - 1 && tsList[from + 1] == tsList[from]) {
|
||||
char* val1 = colDataGetData(pkCol, from + 1);
|
||||
if (compareFunc(val1, val) < 0) {
|
||||
select = from + 1;
|
||||
val = val1;
|
||||
}
|
||||
from = from + 1;
|
||||
}
|
||||
*pRowIndex = select;
|
||||
*nextFrom = from + 1;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
int32_t diffFunction(SqlFunctionCtx* pCtx) {
|
||||
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||
SDiffInfo* pDiffInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
||||
|
||||
SInputColumnInfoData* pInput = &pCtx->input;
|
||||
|
||||
SColumnInfoData* pInputCol = pInput->pData[0];
|
||||
int8_t inputType = pInputCol->info.type;
|
||||
|
||||
TSKEY* tsList = (int64_t*)pInput->pPTS->pData;
|
||||
|
||||
|
@ -2869,10 +3191,13 @@ int32_t diffFunction(SqlFunctionCtx* pCtx) {
|
|||
|
||||
SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput;
|
||||
|
||||
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += 1) {
|
||||
funcInputUpdate(pCtx);
|
||||
|
||||
SFuncInputRow row = {0};
|
||||
while (funcInputGetNextRow(pCtx, &row)) {
|
||||
int32_t pos = startOffset + numOfElems;
|
||||
|
||||
if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
|
||||
if (row.isDataNull) {
|
||||
if (pDiffInfo->includeNull) {
|
||||
colDataSetNull_f_s(pOutput, pos);
|
||||
|
||||
|
@ -2881,24 +3206,24 @@ int32_t diffFunction(SqlFunctionCtx* pCtx) {
|
|||
continue;
|
||||
}
|
||||
|
||||
char* pv = colDataGetData(pInputCol, i);
|
||||
char* pv = row.pData;
|
||||
|
||||
if (pDiffInfo->hasPrev) {
|
||||
if (tsList[i] == pDiffInfo->prevTs) {
|
||||
return TSDB_CODE_FUNC_DUP_TIMESTAMP;
|
||||
if (row.ts == pDiffInfo->prevTs) {
|
||||
return TSDB_CODE_FUNC_DUP_TIMESTAMP;
|
||||
}
|
||||
int32_t code = doHandleDiff(pDiffInfo, pInputCol->info.type, pv, pOutput, pos, tsList[i]);
|
||||
int32_t code = doHandleDiff(pDiffInfo, inputType, pv, pOutput, pos, row.ts);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
// handle selectivity
|
||||
if (pCtx->subsidiaries.num > 0) {
|
||||
appendSelectivityValue(pCtx, i, pos);
|
||||
appendSelectivityCols(pCtx, row.block, row.rowIndex, pos);
|
||||
}
|
||||
|
||||
|
||||
numOfElems++;
|
||||
} else {
|
||||
int32_t code = doSetPrevVal(pDiffInfo, pInputCol->info.type, pv, tsList[i]);
|
||||
int32_t code = doSetPrevVal(pDiffInfo, inputType, pv, row.ts);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
@ -5196,36 +5521,35 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) {
|
|||
goto _twa_over;
|
||||
}
|
||||
|
||||
int32_t i = pInput->startRowIndex;
|
||||
funcInputUpdate(pCtx);
|
||||
SFuncInputRow row = {0};
|
||||
if (pCtx->start.key != INT64_MIN && last->key == INT64_MIN) {
|
||||
for (; i < pInput->numOfRows + pInput->startRowIndex; ++i) {
|
||||
if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
|
||||
while (funcInputGetNextRow(pCtx, &row)) {
|
||||
if (row.isDataNull) {
|
||||
continue;
|
||||
}
|
||||
|
||||
last->key = tsList[i];
|
||||
last->key = row.ts;
|
||||
|
||||
GET_TYPED_DATA(last->val, double, pInputCol->info.type, colDataGetData(pInputCol, i));
|
||||
GET_TYPED_DATA(last->val, double, pInputCol->info.type, row.pData);
|
||||
|
||||
pInfo->dOutput += twa_get_area(pCtx->start, *last);
|
||||
pInfo->win.skey = pCtx->start.key;
|
||||
numOfElems++;
|
||||
i += 1;
|
||||
break;
|
||||
}
|
||||
} else if (pInfo->p.key == INT64_MIN) {
|
||||
for (; i < pInput->numOfRows + pInput->startRowIndex; ++i) {
|
||||
if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
|
||||
while (funcInputGetNextRow(pCtx, &row)) {
|
||||
if (row.isDataNull) {
|
||||
continue;
|
||||
}
|
||||
|
||||
last->key = tsList[i];
|
||||
last->key = row.ts;
|
||||
|
||||
GET_TYPED_DATA(last->val, double, pInputCol->info.type, colDataGetData(pInputCol, i));
|
||||
GET_TYPED_DATA(last->val, double, pInputCol->info.type, row.pData);
|
||||
|
||||
pInfo->win.skey = last->key;
|
||||
numOfElems++;
|
||||
i += 1;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -5235,14 +5559,13 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) {
|
|||
// calculate the value of
|
||||
switch (pInputCol->info.type) {
|
||||
case TSDB_DATA_TYPE_TINYINT: {
|
||||
int8_t* val = (int8_t*)colDataGetData(pInputCol, 0);
|
||||
for (; i < pInput->numOfRows + pInput->startRowIndex; i += 1) {
|
||||
if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
|
||||
while (funcInputGetNextRow(pCtx, &row)) {
|
||||
if (row.isDataNull) {
|
||||
continue;
|
||||
}
|
||||
numOfElems++;
|
||||
|
||||
INIT_INTP_POINT(st, tsList[i], val[i]);
|
||||
INIT_INTP_POINT(st, row.ts, *(int8_t*)row.pData);
|
||||
if (pInfo->p.key == st.key) {
|
||||
return TSDB_CODE_FUNC_DUP_TIMESTAMP;
|
||||
}
|
||||
|
@ -5254,14 +5577,13 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) {
|
|||
}
|
||||
|
||||
case TSDB_DATA_TYPE_SMALLINT: {
|
||||
int16_t* val = (int16_t*)colDataGetData(pInputCol, 0);
|
||||
for (; i < pInput->numOfRows + pInput->startRowIndex; i += 1) {
|
||||
if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
|
||||
while (funcInputGetNextRow(pCtx, &row)) {
|
||||
if (row.isDataNull) {
|
||||
continue;
|
||||
}
|
||||
numOfElems++;
|
||||
|
||||
INIT_INTP_POINT(st, tsList[i], val[i]);
|
||||
INIT_INTP_POINT(st, row.ts, *(int16_t*)row.pData);
|
||||
if (pInfo->p.key == st.key) {
|
||||
return TSDB_CODE_FUNC_DUP_TIMESTAMP;
|
||||
}
|
||||
|
@ -5272,14 +5594,13 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) {
|
|||
break;
|
||||
}
|
||||
case TSDB_DATA_TYPE_INT: {
|
||||
int32_t* val = (int32_t*)colDataGetData(pInputCol, 0);
|
||||
for (; i < pInput->numOfRows + pInput->startRowIndex; i += 1) {
|
||||
if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
|
||||
while (funcInputGetNextRow(pCtx, &row)) {
|
||||
if (row.isDataNull) {
|
||||
continue;
|
||||
}
|
||||
numOfElems++;
|
||||
|
||||
INIT_INTP_POINT(st, tsList[i], val[i]);
|
||||
INIT_INTP_POINT(st, row.ts, *(int32_t*)row.pData);
|
||||
if (pInfo->p.key == st.key) {
|
||||
return TSDB_CODE_FUNC_DUP_TIMESTAMP;
|
||||
}
|
||||
|
@ -5290,14 +5611,13 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) {
|
|||
break;
|
||||
}
|
||||
case TSDB_DATA_TYPE_BIGINT: {
|
||||
int64_t* val = (int64_t*)colDataGetData(pInputCol, 0);
|
||||
for (; i < pInput->numOfRows + pInput->startRowIndex; i += 1) {
|
||||
if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
|
||||
while (funcInputGetNextRow(pCtx, &row)) {
|
||||
if (row.isDataNull) {
|
||||
continue;
|
||||
}
|
||||
numOfElems++;
|
||||
|
||||
INIT_INTP_POINT(st, tsList[i], val[i]);
|
||||
INIT_INTP_POINT(st, row.ts, *(int64_t*)row.pData);
|
||||
if (pInfo->p.key == st.key) {
|
||||
return TSDB_CODE_FUNC_DUP_TIMESTAMP;
|
||||
}
|
||||
|
@ -5308,14 +5628,13 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) {
|
|||
break;
|
||||
}
|
||||
case TSDB_DATA_TYPE_FLOAT: {
|
||||
float* val = (float*)colDataGetData(pInputCol, 0);
|
||||
for (; i < pInput->numOfRows + pInput->startRowIndex; i += 1) {
|
||||
if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
|
||||
while (funcInputGetNextRow(pCtx, &row)) {
|
||||
if (row.isDataNull) {
|
||||
continue;
|
||||
}
|
||||
numOfElems++;
|
||||
|
||||
INIT_INTP_POINT(st, tsList[i], val[i]);
|
||||
INIT_INTP_POINT(st, row.ts, *(float_t*)row.pData);
|
||||
if (pInfo->p.key == st.key) {
|
||||
return TSDB_CODE_FUNC_DUP_TIMESTAMP;
|
||||
}
|
||||
|
@ -5326,14 +5645,13 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) {
|
|||
break;
|
||||
}
|
||||
case TSDB_DATA_TYPE_DOUBLE: {
|
||||
double* val = (double*)colDataGetData(pInputCol, 0);
|
||||
for (; i < pInput->numOfRows + pInput->startRowIndex; i += 1) {
|
||||
if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
|
||||
while (funcInputGetNextRow(pCtx, &row)) {
|
||||
if (row.isDataNull) {
|
||||
continue;
|
||||
}
|
||||
numOfElems++;
|
||||
|
||||
INIT_INTP_POINT(st, tsList[i], val[i]);
|
||||
INIT_INTP_POINT(st, row.ts, *(double*)row.pData);
|
||||
if (pInfo->p.key == st.key) {
|
||||
return TSDB_CODE_FUNC_DUP_TIMESTAMP;
|
||||
}
|
||||
|
@ -5344,14 +5662,13 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) {
|
|||
break;
|
||||
}
|
||||
case TSDB_DATA_TYPE_UTINYINT: {
|
||||
uint8_t* val = (uint8_t*)colDataGetData(pInputCol, 0);
|
||||
for (; i < pInput->numOfRows + pInput->startRowIndex; i += 1) {
|
||||
if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
|
||||
while (funcInputGetNextRow(pCtx, &row)) {
|
||||
if (row.isDataNull) {
|
||||
continue;
|
||||
}
|
||||
numOfElems++;
|
||||
|
||||
INIT_INTP_POINT(st, tsList[i], val[i]);
|
||||
INIT_INTP_POINT(st, row.ts, *(uint8_t*)row.pData);
|
||||
if (pInfo->p.key == st.key) {
|
||||
return TSDB_CODE_FUNC_DUP_TIMESTAMP;
|
||||
}
|
||||
|
@ -5362,14 +5679,13 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) {
|
|||
break;
|
||||
}
|
||||
case TSDB_DATA_TYPE_USMALLINT: {
|
||||
uint16_t* val = (uint16_t*)colDataGetData(pInputCol, 0);
|
||||
for (; i < pInput->numOfRows + pInput->startRowIndex; i += 1) {
|
||||
if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
|
||||
while (funcInputGetNextRow(pCtx, &row)) {
|
||||
if (row.isDataNull) {
|
||||
continue;
|
||||
}
|
||||
numOfElems++;
|
||||
|
||||
INIT_INTP_POINT(st, tsList[i], val[i]);
|
||||
INIT_INTP_POINT(st, row.ts, *(uint16_t*)row.pData);
|
||||
if (pInfo->p.key == st.key) {
|
||||
return TSDB_CODE_FUNC_DUP_TIMESTAMP;
|
||||
}
|
||||
|
@ -5380,14 +5696,13 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) {
|
|||
break;
|
||||
}
|
||||
case TSDB_DATA_TYPE_UINT: {
|
||||
uint32_t* val = (uint32_t*)colDataGetData(pInputCol, 0);
|
||||
for (; i < pInput->numOfRows + pInput->startRowIndex; i += 1) {
|
||||
if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
|
||||
while (funcInputGetNextRow(pCtx, &row)) {
|
||||
if (row.isDataNull) {
|
||||
continue;
|
||||
}
|
||||
numOfElems++;
|
||||
|
||||
INIT_INTP_POINT(st, tsList[i], val[i]);
|
||||
INIT_INTP_POINT(st, row.ts, *(uint32_t*)row.pData);
|
||||
if (pInfo->p.key == st.key) {
|
||||
return TSDB_CODE_FUNC_DUP_TIMESTAMP;
|
||||
}
|
||||
|
@ -5398,14 +5713,13 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) {
|
|||
break;
|
||||
}
|
||||
case TSDB_DATA_TYPE_UBIGINT: {
|
||||
uint64_t* val = (uint64_t*)colDataGetData(pInputCol, 0);
|
||||
for (; i < pInput->numOfRows + pInput->startRowIndex; i += 1) {
|
||||
if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
|
||||
while (funcInputGetNextRow(pCtx, &row)) {
|
||||
if (row.isDataNull) {
|
||||
continue;
|
||||
}
|
||||
numOfElems++;
|
||||
|
||||
INIT_INTP_POINT(st, tsList[i], val[i]);
|
||||
INIT_INTP_POINT(st, row.ts, *(uint64_t*)row.pData);
|
||||
if (pInfo->p.key == st.key) {
|
||||
return TSDB_CODE_FUNC_DUP_TIMESTAMP;
|
||||
}
|
||||
|
@ -5707,28 +6021,27 @@ int32_t derivativeFunction(SqlFunctionCtx* pCtx) {
|
|||
SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput;
|
||||
SColumnInfoData* pTsOutput = pCtx->pTsOutput;
|
||||
|
||||
int32_t i = pInput->startRowIndex;
|
||||
TSKEY* tsList = (int64_t*)pInput->pPTS->pData;
|
||||
funcInputUpdate(pCtx);
|
||||
|
||||
double v = 0;
|
||||
|
||||
if (pCtx->order == TSDB_ORDER_ASC) {
|
||||
for (; i < pInput->numOfRows + pInput->startRowIndex; i += 1) {
|
||||
if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
|
||||
SFuncInputRow row = {0};
|
||||
while (funcInputGetNextRow(pCtx, &row)) {
|
||||
if (row.isDataNull) {
|
||||
continue;
|
||||
}
|
||||
|
||||
char* d = (char*)pInputCol->pData + pInputCol->info.bytes * i;
|
||||
char* d = row.pData;
|
||||
GET_TYPED_DATA(v, double, pInputCol->info.type, d);
|
||||
|
||||
int32_t pos = pCtx->offset + numOfElems;
|
||||
if (!pDerivInfo->valueSet) { // initial value is not set yet
|
||||
pDerivInfo->valueSet = true;
|
||||
} else {
|
||||
if (tsList[i] == pDerivInfo->prevTs) {
|
||||
if (row.ts == pDerivInfo->prevTs) {
|
||||
return TSDB_CODE_FUNC_DUP_TIMESTAMP;
|
||||
}
|
||||
double r = ((v - pDerivInfo->prevValue) * pDerivInfo->tsWindow) / (tsList[i] - pDerivInfo->prevTs);
|
||||
double r = ((v - pDerivInfo->prevValue) * pDerivInfo->tsWindow) / (row.ts - pDerivInfo->prevTs);
|
||||
if (pDerivInfo->ignoreNegative && r < 0) {
|
||||
} else {
|
||||
if (isinf(r) || isnan(r)) {
|
||||
|
@ -5738,12 +6051,12 @@ int32_t derivativeFunction(SqlFunctionCtx* pCtx) {
|
|||
}
|
||||
|
||||
if (pTsOutput != NULL) {
|
||||
colDataSetInt64(pTsOutput, pos, &tsList[i]);
|
||||
colDataSetInt64(pTsOutput, pos, &row.ts);
|
||||
}
|
||||
|
||||
// handle selectivity
|
||||
if (pCtx->subsidiaries.num > 0) {
|
||||
appendSelectivityValue(pCtx, i, pos);
|
||||
appendSelectivityCols(pCtx, row.block, row.rowIndex, pos);
|
||||
}
|
||||
|
||||
numOfElems++;
|
||||
|
@ -5751,25 +6064,26 @@ int32_t derivativeFunction(SqlFunctionCtx* pCtx) {
|
|||
}
|
||||
|
||||
pDerivInfo->prevValue = v;
|
||||
pDerivInfo->prevTs = tsList[i];
|
||||
pDerivInfo->prevTs = row.ts;
|
||||
}
|
||||
} else {
|
||||
for (; i < pInput->numOfRows + pInput->startRowIndex; i += 1) {
|
||||
if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
|
||||
SFuncInputRow row = {0};
|
||||
while (funcInputGetNextRow(pCtx, &row)) {
|
||||
if (row.isDataNull) {
|
||||
continue;
|
||||
}
|
||||
|
||||
char* d = (char*)pInputCol->pData + pInputCol->info.bytes * i;
|
||||
char* d = row.pData;
|
||||
GET_TYPED_DATA(v, double, pInputCol->info.type, d);
|
||||
|
||||
int32_t pos = pCtx->offset + numOfElems;
|
||||
if (!pDerivInfo->valueSet) { // initial value is not set yet
|
||||
pDerivInfo->valueSet = true;
|
||||
} else {
|
||||
if (tsList[i] == pDerivInfo->prevTs) {
|
||||
if (row.ts == pDerivInfo->prevTs) {
|
||||
return TSDB_CODE_FUNC_DUP_TIMESTAMP;
|
||||
}
|
||||
double r = ((pDerivInfo->prevValue - v) * pDerivInfo->tsWindow) / (pDerivInfo->prevTs - tsList[i]);
|
||||
double r = ((pDerivInfo->prevValue - v) * pDerivInfo->tsWindow) / (pDerivInfo->prevTs - row.ts);
|
||||
if (pDerivInfo->ignoreNegative && r < 0) {
|
||||
} else {
|
||||
if (isinf(r) || isnan(r)) {
|
||||
|
@ -5784,15 +6098,14 @@ int32_t derivativeFunction(SqlFunctionCtx* pCtx) {
|
|||
|
||||
// handle selectivity
|
||||
if (pCtx->subsidiaries.num > 0) {
|
||||
appendSelectivityValue(pCtx, i, pos);
|
||||
appendSelectivityCols(pCtx, row.block, row.rowIndex, pos);
|
||||
}
|
||||
|
||||
numOfElems++;
|
||||
}
|
||||
}
|
||||
|
||||
pDerivInfo->prevValue = v;
|
||||
pDerivInfo->prevTs = tsList[i];
|
||||
pDerivInfo->prevTs = row.ts;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -5833,51 +6146,52 @@ int32_t irateFunction(SqlFunctionCtx* pCtx) {
|
|||
|
||||
SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput;
|
||||
|
||||
TSKEY* tsList = (int64_t*)pInput->pPTS->pData;
|
||||
funcInputUpdate(pCtx);
|
||||
|
||||
int32_t numOfElems = 0;
|
||||
int32_t type = pInputCol->info.type;
|
||||
|
||||
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += 1) {
|
||||
if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
|
||||
SFuncInputRow row = {0};
|
||||
while (funcInputGetNextRow(pCtx, &row)) {
|
||||
if (row.isDataNull) {
|
||||
continue;
|
||||
}
|
||||
|
||||
numOfElems++;
|
||||
|
||||
char* data = colDataGetData(pInputCol, i);
|
||||
char* data = row.pData;
|
||||
double v = 0;
|
||||
GET_TYPED_DATA(v, double, type, data);
|
||||
|
||||
if (INT64_MIN == pRateInfo->lastKey) {
|
||||
pRateInfo->lastValue = v;
|
||||
pRateInfo->lastKey = tsList[i];
|
||||
pRateInfo->lastKey = row.ts;
|
||||
pRateInfo->hasResult = 1;
|
||||
continue;
|
||||
}
|
||||
|
||||
if (tsList[i] > pRateInfo->lastKey) {
|
||||
if (row.ts > pRateInfo->lastKey) {
|
||||
if ((INT64_MIN == pRateInfo->firstKey) || pRateInfo->lastKey > pRateInfo->firstKey) {
|
||||
pRateInfo->firstValue = pRateInfo->lastValue;
|
||||
pRateInfo->firstKey = pRateInfo->lastKey;
|
||||
}
|
||||
|
||||
pRateInfo->lastValue = v;
|
||||
pRateInfo->lastKey = tsList[i];
|
||||
pRateInfo->lastKey = row.ts;
|
||||
|
||||
continue;
|
||||
} else if (tsList[i] == pRateInfo->lastKey) {
|
||||
return TSDB_CODE_FUNC_DUP_TIMESTAMP;
|
||||
} else if (row.ts == pRateInfo->lastKey) {
|
||||
return TSDB_CODE_FUNC_DUP_TIMESTAMP;
|
||||
}
|
||||
|
||||
|
||||
if ((INT64_MIN == pRateInfo->firstKey) || tsList[i] > pRateInfo->firstKey) {
|
||||
if ((INT64_MIN == pRateInfo->firstKey) || row.ts > pRateInfo->firstKey) {
|
||||
pRateInfo->firstValue = v;
|
||||
pRateInfo->firstKey = tsList[i];
|
||||
} else if (tsList[i] == pRateInfo->firstKey) {
|
||||
pRateInfo->firstKey = row.ts;
|
||||
} else if (row.ts == pRateInfo->firstKey) {
|
||||
return TSDB_CODE_FUNC_DUP_TIMESTAMP;
|
||||
}
|
||||
}
|
||||
|
||||
numOfElems++;
|
||||
|
||||
SET_VAL(pResInfo, numOfElems, 1);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -6111,6 +6425,16 @@ int32_t cachedLastRowFunction(SqlFunctionCtx* pCtx) {
|
|||
int32_t bytes = pInputCol->info.bytes;
|
||||
pInfo->bytes = bytes;
|
||||
|
||||
SColumnInfoData* pkCol = pInput->pPrimaryKey;
|
||||
pInfo->pkType = -1;
|
||||
__compar_fn_t pkCompareFn = NULL;
|
||||
if (pCtx->hasPrimaryKey) {
|
||||
pInfo->pkType = pkCol->info.type;
|
||||
pInfo->pkBytes = pkCol->info.bytes;
|
||||
pkCompareFn = getKeyComparFunc(pInfo->pkType, TSDB_ORDER_DESC);
|
||||
}
|
||||
|
||||
// TODO it traverse the different way.
|
||||
// last_row function does not ignore the null value
|
||||
for (int32_t i = pInput->numOfRows + pInput->startRowIndex - 1; i >= pInput->startRowIndex; --i) {
|
||||
numOfElems++;
|
||||
|
|
|
@ -352,8 +352,12 @@ bool fmIsSkipScanCheckFunc(int32_t funcId) {
|
|||
return isSpecificClassifyFunc(funcId, FUNC_MGT_SKIP_SCAN_CHECK_FUNC);
|
||||
}
|
||||
|
||||
void getLastCacheDataType(SDataType* pType) {
|
||||
pType->bytes = getFirstLastInfoSize(pType->bytes) + VARSTR_HEADER_SIZE;
|
||||
bool fmIsPrimaryKeyFunc(int32_t funcId) {
|
||||
return isSpecificClassifyFunc(funcId, FUNC_MGT_PRIMARY_KEY_FUNC);
|
||||
}
|
||||
void getLastCacheDataType(SDataType* pType, int32_t pkBytes) {
|
||||
//TODO: do it later.
|
||||
pType->bytes = getFirstLastInfoSize(pType->bytes, pkBytes) + VARSTR_HEADER_SIZE;
|
||||
pType->type = TSDB_DATA_TYPE_BINARY;
|
||||
}
|
||||
|
||||
|
|
|
@ -804,6 +804,10 @@ static bool isImplicitTsFunc(const SNode* pNode) {
|
|||
return (QUERY_NODE_FUNCTION == nodeType(pNode) && fmIsImplicitTsFunc(((SFunctionNode*)pNode)->funcId));
|
||||
}
|
||||
|
||||
static bool isPkFunc(const SNode* pNode) {
|
||||
return (QUERY_NODE_FUNCTION == nodeType(pNode) && fmIsPrimaryKeyFunc(((SFunctionNode*)pNode)->funcId));
|
||||
}
|
||||
|
||||
static bool isScanPseudoColumnFunc(const SNode* pNode) {
|
||||
return (QUERY_NODE_FUNCTION == nodeType(pNode) && fmIsScanPseudoColumnFunc(((SFunctionNode*)pNode)->funcId));
|
||||
}
|
||||
|
@ -916,6 +920,10 @@ static bool isPrimaryKey(STempTableNode* pTable, SNode* pExpr) {
|
|||
return isPrimaryKeyImpl(pExpr);
|
||||
}
|
||||
|
||||
static bool hasPkInTable(STableMeta* pTableMeta) {
|
||||
return pTableMeta->tableInfo.numOfColumns>=2 && pTableMeta->schema[1].flags | COL_IS_KEY;
|
||||
}
|
||||
|
||||
static void setColumnInfoBySchema(const SRealTableNode* pTable, const SSchema* pColSchema, int32_t tagFlag,
|
||||
SColumnNode* pCol) {
|
||||
strcpy(pCol->dbName, pTable->table.dbName);
|
||||
|
@ -930,6 +938,7 @@ static void setColumnInfoBySchema(const SRealTableNode* pTable, const SSchema* p
|
|||
}
|
||||
pCol->tableId = pTable->pMeta->uid;
|
||||
pCol->tableType = pTable->pMeta->tableType;
|
||||
pCol->tableHasPk = hasPkInTable(pTable->pMeta);
|
||||
pCol->colId = pColSchema->colId;
|
||||
pCol->colType = (tagFlag >= 0 ? COLUMN_TYPE_TAG : COLUMN_TYPE_COLUMN);
|
||||
pCol->hasIndex = (pColSchema != NULL && IS_IDX_ON(pColSchema));
|
||||
|
@ -4615,6 +4624,63 @@ static int32_t appendTsForImplicitTsFunc(STranslateContext* pCxt, SSelectStmt* p
|
|||
return pCxt->errCode;
|
||||
}
|
||||
|
||||
static int32_t createPkColByTable(STranslateContext* pCxt, SRealTableNode* pTable, SNode** pPk) {
|
||||
SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
|
||||
if (NULL == pCol) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
pCol->colId = pTable->pMeta->schema[1].colId;
|
||||
strcpy(pCol->colName, pTable->pMeta->schema[1].name);
|
||||
bool found = false;
|
||||
int32_t code = findAndSetColumn(pCxt, &pCol, (STableNode*)pTable, &found);
|
||||
if (TSDB_CODE_SUCCESS != code || !found) {
|
||||
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INTERNAL_ERROR);
|
||||
}
|
||||
*pPk = (SNode*)pCol;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
static EDealRes hasPkColImpl(SNode* pNode, void* pContext) {
|
||||
if (nodeType(pNode) == QUERY_NODE_COLUMN && ((SColumnNode*)pNode)->tableHasPk) {
|
||||
*(bool*)pContext = true;
|
||||
return DEAL_RES_END;
|
||||
}
|
||||
return DEAL_RES_CONTINUE;
|
||||
}
|
||||
|
||||
static bool hasPkCol(SNode* pNode) {
|
||||
bool hasPk = false;
|
||||
nodesWalkExprPostOrder(pNode, hasPkColImpl, &hasPk);
|
||||
return hasPk;
|
||||
}
|
||||
|
||||
static EDealRes appendPkForPkFuncImpl(SNode* pNode, void* pContext) {
|
||||
STranslateContext* pCxt = pContext;
|
||||
STableNode* pTable = NULL;
|
||||
int32_t code = findTable(pCxt, NULL, &pTable);
|
||||
if (TSDB_CODE_SUCCESS == code && QUERY_NODE_REAL_TABLE == nodeType(pTable) && isPkFunc(pNode) && hasPkCol(pNode)) {
|
||||
SFunctionNode* pFunc = (SFunctionNode*)pNode;
|
||||
SRealTableNode* pRealTable = (SRealTableNode*)pTable;
|
||||
SNode* pPk = NULL;
|
||||
pCxt->errCode = createPkColByTable(pCxt, pRealTable, &pPk);
|
||||
if (TSDB_CODE_SUCCESS == pCxt->errCode) {
|
||||
pCxt->errCode = nodesListMakeStrictAppend(&pFunc->pParameterList, pPk);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == pCxt->errCode) {
|
||||
pFunc->hasPk = true;
|
||||
pFunc->pkBytes = ((SColumnNode*)pPk)->node.resType.bytes;
|
||||
}
|
||||
return TSDB_CODE_SUCCESS == pCxt->errCode ? DEAL_RES_IGNORE_CHILD : DEAL_RES_ERROR;
|
||||
}
|
||||
return DEAL_RES_CONTINUE;
|
||||
}
|
||||
|
||||
static int32_t appendPkParamForPkFunc(STranslateContext* pCxt, SSelectStmt* pSelect) {
|
||||
nodesWalkSelectStmt(pSelect, SQL_CLAUSE_FROM, appendPkForPkFuncImpl, pCxt);
|
||||
return pCxt->errCode;
|
||||
}
|
||||
|
||||
typedef struct SReplaceOrderByAliasCxt {
|
||||
STranslateContext* pTranslateCxt;
|
||||
SNodeList* pProjectionList;
|
||||
|
@ -4755,6 +4821,9 @@ static int32_t translateSelectFrom(STranslateContext* pCxt, SSelectStmt* pSelect
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = appendTsForImplicitTsFunc(pCxt, pSelect);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = appendPkParamForPkFunc(pCxt, pSelect);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = replaceOrderByAliasForSelect(pCxt, pSelect);
|
||||
}
|
||||
|
@ -5939,6 +6008,14 @@ static int32_t checkTableColsSchema(STranslateContext* pCxt, SHashObj* pHash, in
|
|||
if (TSDB_CODE_SUCCESS == code && pCol->is_pk && colIndex != 1) {
|
||||
code = generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_SECOND_COL_PK);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code && pCol->is_pk &&
|
||||
!(TSDB_DATA_TYPE_INT == pCol->dataType.type ||
|
||||
TSDB_DATA_TYPE_UINT == pCol->dataType.type ||
|
||||
TSDB_DATA_TYPE_BIGINT == pCol->dataType.type ||
|
||||
TSDB_DATA_TYPE_UBIGINT == pCol->dataType.type ||
|
||||
TSDB_DATA_TYPE_VARCHAR == pCol->dataType.type)) {
|
||||
code = generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_COL_PK_TYPE);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code && pCol->dataType.type == TSDB_DATA_TYPE_JSON) {
|
||||
code = generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_COL_JSON);
|
||||
}
|
||||
|
|
|
@ -198,6 +198,8 @@ static char* getSyntaxErrFormat(int32_t errCode) {
|
|||
return "tag %s can not be primary key";
|
||||
case TSDB_CODE_PAR_SECOND_COL_PK:
|
||||
return "primary key column must be second column";
|
||||
case TSDB_CODE_PAR_COL_PK_TYPE:
|
||||
return "primary key column must be of type int, uint, bigint, ubigint, and varchar";
|
||||
default:
|
||||
return "Unknown error";
|
||||
}
|
||||
|
|
|
@ -2747,6 +2747,7 @@ typedef struct SLastRowScanOptSetColDataTypeCxt {
|
|||
SNodeList* pLastCols;
|
||||
SNodeList* pOtherCols;
|
||||
int32_t funcType;
|
||||
int32_t pkBytes;
|
||||
} SLastRowScanOptSetColDataTypeCxt;
|
||||
|
||||
static EDealRes lastRowScanOptSetColDataType(SNode* pNode, void* pContext) {
|
||||
|
@ -2754,12 +2755,12 @@ static EDealRes lastRowScanOptSetColDataType(SNode* pNode, void* pContext) {
|
|||
SLastRowScanOptSetColDataTypeCxt* pCxt = pContext;
|
||||
if (pCxt->doAgg) {
|
||||
nodesListMakeAppend(&pCxt->pLastCols, pNode);
|
||||
getLastCacheDataType(&(((SColumnNode*)pNode)->node.resType));
|
||||
getLastCacheDataType(&(((SColumnNode*)pNode)->node.resType), pCxt->pkBytes);
|
||||
} else {
|
||||
SNode* pCol = NULL;
|
||||
FOREACH(pCol, pCxt->pLastCols) {
|
||||
if (nodesEqualNode(pCol, pNode)) {
|
||||
getLastCacheDataType(&(((SColumnNode*)pNode)->node.resType));
|
||||
getLastCacheDataType(&(((SColumnNode*)pNode)->node.resType), pCxt->pkBytes);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -2769,14 +2770,14 @@ static EDealRes lastRowScanOptSetColDataType(SNode* pNode, void* pContext) {
|
|||
return DEAL_RES_CONTINUE;
|
||||
}
|
||||
|
||||
static void lastRowScanOptSetLastTargets(SNodeList* pTargets, SNodeList* pLastCols, SNodeList* pLastRowCols, bool erase) {
|
||||
static void lastRowScanOptSetLastTargets(SNodeList* pTargets, SNodeList* pLastCols, SNodeList* pLastRowCols, bool erase, int32_t pkBytes) {
|
||||
SNode* pTarget = NULL;
|
||||
WHERE_EACH(pTarget, pTargets) {
|
||||
bool found = false;
|
||||
SNode* pCol = NULL;
|
||||
FOREACH(pCol, pLastCols) {
|
||||
if (nodesEqualNode(pCol, pTarget)) {
|
||||
getLastCacheDataType(&(((SColumnNode*)pTarget)->node.resType));
|
||||
getLastCacheDataType(&(((SColumnNode*)pTarget)->node.resType), pkBytes);
|
||||
found = true;
|
||||
break;
|
||||
}
|
||||
|
@ -2883,6 +2884,7 @@ static int32_t lastRowScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogic
|
|||
return code;
|
||||
}
|
||||
cxt.funcType = pFunc->funcType;
|
||||
cxt.pkBytes = (pFunc->hasPk) ? pFunc->pkBytes : 0;
|
||||
// add duplicate cols which be removed for both last_row, last
|
||||
if (pAgg->hasLast && pAgg->hasLastRow) {
|
||||
if (QUERY_NODE_COLUMN == nodeType(pParamNode)) {
|
||||
|
@ -2961,9 +2963,9 @@ static int32_t lastRowScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogic
|
|||
if (NULL != cxt.pLastCols) {
|
||||
cxt.doAgg = false;
|
||||
cxt.funcType = FUNCTION_TYPE_CACHE_LAST;
|
||||
lastRowScanOptSetLastTargets(pScan->pScanCols, cxt.pLastCols, pLastRowCols, true);
|
||||
lastRowScanOptSetLastTargets(pScan->pScanCols, cxt.pLastCols, pLastRowCols, true, cxt.pkBytes);
|
||||
nodesWalkExprs(pScan->pScanPseudoCols, lastRowScanOptSetColDataType, &cxt);
|
||||
lastRowScanOptSetLastTargets(pScan->node.pTargets, cxt.pLastCols, pLastRowCols, false);
|
||||
lastRowScanOptSetLastTargets(pScan->node.pTargets, cxt.pLastCols, pLastRowCols, false, cxt.pkBytes);
|
||||
lastRowScanOptRemoveUslessTargets(pScan->node.pTargets, cxt.pLastCols, cxt.pOtherCols, pLastRowCols);
|
||||
if (pPKTsCol && pScan->node.pTargets->length == 1) {
|
||||
// when select last(ts),ts from ..., we add another ts to targets
|
||||
|
|
|
@ -622,6 +622,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_PAR_VIEW_CONFLICT_WITH_TABLE, "View name is confli
|
|||
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_NOT_SUPPORT_MULTI_RESULT, "Operator not supported multi result")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_TAG_IS_PRIMARY_KEY, "tag can not be primary key")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_SECOND_COL_PK, "primary key must be second column")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_COL_PK_TYPE, "primary key column must be of type int, uint, bigint, ubigint, and varchar")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INTERNAL_ERROR, "Parser internal error")
|
||||
|
||||
//planner
|
||||
|
|
Loading…
Reference in New Issue