Merge pull request #26417 from taosdata/enh/TD-29154/diff
Enh/td 29154/diff
This commit is contained in:
commit
92089e2d86
|
@ -41,6 +41,7 @@ typedef int32_t (*FExecProcess)(struct SqlFunctionCtx *pCtx);
|
|||
typedef int32_t (*FExecFinalize)(struct SqlFunctionCtx *pCtx, SSDataBlock *pBlock);
|
||||
typedef int32_t (*FScalarExecProcess)(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
||||
typedef int32_t (*FExecCombine)(struct SqlFunctionCtx *pDestCtx, struct SqlFunctionCtx *pSourceCtx);
|
||||
typedef int32_t (*processFuncByRow)(SArray* pCtx); // array of SqlFunctionCtx
|
||||
|
||||
typedef struct SScalarFuncExecFuncs {
|
||||
FExecGetEnv getEnv;
|
||||
|
@ -48,11 +49,12 @@ typedef struct SScalarFuncExecFuncs {
|
|||
} SScalarFuncExecFuncs;
|
||||
|
||||
typedef struct SFuncExecFuncs {
|
||||
FExecGetEnv getEnv;
|
||||
FExecInit init;
|
||||
FExecProcess process;
|
||||
FExecFinalize finalize;
|
||||
FExecCombine combine;
|
||||
FExecGetEnv getEnv;
|
||||
FExecInit init;
|
||||
FExecProcess process;
|
||||
FExecFinalize finalize;
|
||||
FExecCombine combine;
|
||||
processFuncByRow processFuncByRow;
|
||||
} SFuncExecFuncs;
|
||||
|
||||
#define MAX_INTERVAL_TIME_WINDOW 10000000 // maximum allowed time windows in final results
|
||||
|
|
|
@ -255,6 +255,7 @@ bool fmIsIgnoreNullFunc(int32_t funcId);
|
|||
bool fmIsConstantResFunc(SFunctionNode* pFunc);
|
||||
bool fmIsSkipScanCheckFunc(int32_t funcId);
|
||||
bool fmIsPrimaryKeyFunc(int32_t funcId);
|
||||
bool fmIsProcessByRowFunc(int32_t funcId);
|
||||
|
||||
void getLastCacheDataType(SDataType* pType, int32_t pkBytes);
|
||||
SFunctionNode* createFunction(const char* pName, SNodeList* pParameterList);
|
||||
|
|
|
@ -415,6 +415,7 @@ typedef struct SSelectStmt {
|
|||
int32_t returnRows; // EFuncReturnRows
|
||||
ETimeLineMode timeLineCurMode;
|
||||
ETimeLineMode timeLineResMode;
|
||||
int32_t lastProcessByRowFuncId;
|
||||
bool timeLineFromOrderBy;
|
||||
bool isEmptyResult;
|
||||
bool isSubquery;
|
||||
|
|
|
@ -835,6 +835,7 @@ int32_t taosGetErrSize();
|
|||
#define TSDB_CODE_PAR_TBNAME_ERROR TAOS_DEF_ERROR_CODE(0, 0x267D)
|
||||
#define TSDB_CODE_PAR_TBNAME_DUPLICATED TAOS_DEF_ERROR_CODE(0, 0x267E)
|
||||
#define TSDB_CODE_PAR_TAG_NAME_DUPLICATED TAOS_DEF_ERROR_CODE(0, 0x267F)
|
||||
#define TSDB_CODE_PAR_NOT_ALLOWED_DIFFERENT_BY_ROW_FUNC TAOS_DEF_ERROR_CODE(0, 0x2680)
|
||||
#define TSDB_CODE_PAR_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x26FF)
|
||||
|
||||
//planner
|
||||
|
|
|
@ -724,9 +724,12 @@ static void setPseudoOutputColInfo(SSDataBlock* pResult, SqlFunctionCtx* pCtx, S
|
|||
|
||||
int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx,
|
||||
int32_t numOfOutput, SArray* pPseudoList) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
setPseudoOutputColInfo(pResult, pCtx, pPseudoList);
|
||||
pResult->info.dataLoad = 1;
|
||||
|
||||
SArray* processByRowFunctionCtx = NULL;
|
||||
|
||||
if (pSrcBlock == NULL) {
|
||||
for (int32_t k = 0; k < numOfOutput; ++k) {
|
||||
int32_t outputSlotId = pExpr[k].base.resSchema.slotId;
|
||||
|
@ -743,7 +746,7 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
|
|||
}
|
||||
|
||||
pResult->info.rows = 1;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
if (pResult != pSrcBlock) {
|
||||
|
@ -816,10 +819,10 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
|
|||
SColumnInfoData idata = {.info = pResColData->info, .hasNull = true};
|
||||
|
||||
SScalarParam dest = {.columnData = &idata};
|
||||
int32_t code = scalarCalculate(pExpr[k].pExpr->_optrRoot.pRootNode, pBlockList, &dest);
|
||||
code = scalarCalculate(pExpr[k].pExpr->_optrRoot.pRootNode, pBlockList, &dest);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
taosArrayDestroy(pBlockList);
|
||||
return code;
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
int32_t startOffset = createNewColModel ? 0 : pResult->info.rows;
|
||||
|
@ -852,11 +855,21 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
|
|||
pfCtx->pDstBlock = pResult;
|
||||
}
|
||||
|
||||
int32_t code = pfCtx->fpSet.process(pfCtx);
|
||||
code = pfCtx->fpSet.process(pfCtx);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
goto _exit;
|
||||
}
|
||||
numOfRows = pResInfo->numOfRes;
|
||||
if (fmIsProcessByRowFunc(pfCtx->functionId)) {
|
||||
if (NULL == processByRowFunctionCtx) {
|
||||
processByRowFunctionCtx = taosArrayInit(1, sizeof(SqlFunctionCtx*));
|
||||
if (!processByRowFunctionCtx) {
|
||||
code = terrno;
|
||||
goto _exit;
|
||||
}
|
||||
}
|
||||
taosArrayPush(processByRowFunctionCtx, &pfCtx);
|
||||
}
|
||||
} else if (fmIsAggFunc(pfCtx->functionId)) {
|
||||
// selective value output should be set during corresponding function execution
|
||||
if (fmIsSelectValueFunc(pfCtx->functionId)) {
|
||||
|
@ -886,10 +899,10 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
|
|||
SColumnInfoData idata = {.info = pResColData->info, .hasNull = true};
|
||||
|
||||
SScalarParam dest = {.columnData = &idata};
|
||||
int32_t code = scalarCalculate((SNode*)pExpr[k].pExpr->_function.pFunctNode, pBlockList, &dest);
|
||||
code = scalarCalculate((SNode*)pExpr[k].pExpr->_function.pFunctNode, pBlockList, &dest);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
taosArrayDestroy(pBlockList);
|
||||
return code;
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
int32_t startOffset = createNewColModel ? 0 : pResult->info.rows;
|
||||
|
@ -905,9 +918,21 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
|
|||
}
|
||||
}
|
||||
|
||||
if (processByRowFunctionCtx && taosArrayGetSize(processByRowFunctionCtx) > 0){
|
||||
SqlFunctionCtx** pfCtx = taosArrayGet(processByRowFunctionCtx, 0);
|
||||
code = (*pfCtx)->fpSet.processFuncByRow(processByRowFunctionCtx);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _exit;
|
||||
}
|
||||
numOfRows = (*pfCtx)->resultInfo->numOfRes;
|
||||
}
|
||||
if (!createNewColModel) {
|
||||
pResult->info.rows += numOfRows;
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
_exit:
|
||||
if(processByRowFunctionCtx) {
|
||||
taosArrayDestroy(processByRowFunctionCtx);
|
||||
processByRowFunctionCtx = NULL;
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -50,6 +50,7 @@ typedef struct SBuiltinFuncDefinition {
|
|||
const char* pStateFunc;
|
||||
FCreateMergeFuncParameters createMergeParaFuc;
|
||||
FEstimateReturnRows estimateReturnRowsFunc;
|
||||
processFuncByRow processFuncByRow;
|
||||
} SBuiltinFuncDefinition;
|
||||
|
||||
extern const SBuiltinFuncDefinition funcMgtBuiltins[];
|
||||
|
|
|
@ -133,6 +133,7 @@ int32_t getApercentileMaxSize();
|
|||
bool getDiffFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||
bool diffFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo);
|
||||
int32_t diffFunction(SqlFunctionCtx* pCtx);
|
||||
int32_t diffFunctionByRow(SArray* pCtx);
|
||||
|
||||
bool getDerivativeFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||
bool derivativeFuncSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo);
|
||||
|
|
|
@ -57,6 +57,7 @@ extern "C" {
|
|||
#define FUNC_MGT_PRIMARY_KEY_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(28)
|
||||
#define FUNC_MGT_TSMA_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(29)
|
||||
#define FUNC_MGT_COUNT_LIKE_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(30) // funcs that should also return 0 when no rows found
|
||||
#define FUNC_MGT_PROCESS_BY_ROW FUNC_MGT_FUNC_CLASSIFICATION_MASK(31)
|
||||
|
||||
#define FUNC_MGT_TEST_MASK(val, mask) (((val) & (mask)) != 0)
|
||||
|
||||
|
|
|
@ -1965,9 +1965,9 @@ static int32_t translateDiff(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
|||
}
|
||||
|
||||
SValueNode* pValue = (SValueNode*)pParamNode1;
|
||||
if (pValue->datum.i != 0 && pValue->datum.i != 1) {
|
||||
if (pValue->datum.i < 0 || pValue->datum.i > 3) {
|
||||
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
|
||||
"Second parameter of DIFF function should be only 0 or 1");
|
||||
"Second parameter of DIFF function should be a number between 0 and 3.");
|
||||
}
|
||||
|
||||
pValue->notReserved = true;
|
||||
|
@ -1977,7 +1977,7 @@ static int32_t translateDiff(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
|||
if (IS_SIGNED_NUMERIC_TYPE(colType) || IS_TIMESTAMP_TYPE(colType) || TSDB_DATA_TYPE_BOOL == colType) {
|
||||
resType = TSDB_DATA_TYPE_BIGINT;
|
||||
} else if (IS_UNSIGNED_NUMERIC_TYPE(colType)) {
|
||||
resType = TSDB_DATA_TYPE_UBIGINT;
|
||||
resType = TSDB_DATA_TYPE_BIGINT;
|
||||
} else {
|
||||
resType = TSDB_DATA_TYPE_DOUBLE;
|
||||
}
|
||||
|
@ -1989,7 +1989,7 @@ static EFuncReturnRows diffEstReturnRows(SFunctionNode* pFunc) {
|
|||
if (1 == LIST_LENGTH(pFunc->pParameterList)) {
|
||||
return FUNC_RETURN_ROWS_N_MINUS_1;
|
||||
}
|
||||
return 1 == ((SValueNode*)nodesListGetNode(pFunc->pParameterList, 1))->datum.i ? FUNC_RETURN_ROWS_INDEFINITE
|
||||
return 1 < ((SValueNode*)nodesListGetNode(pFunc->pParameterList, 1))->datum.i ? FUNC_RETURN_ROWS_INDEFINITE
|
||||
: FUNC_RETURN_ROWS_N_MINUS_1;
|
||||
}
|
||||
|
||||
|
@ -3206,7 +3206,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
{
|
||||
.name = "diff",
|
||||
.type = FUNCTION_TYPE_DIFF,
|
||||
.classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC |
|
||||
.classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC | FUNC_MGT_PROCESS_BY_ROW |
|
||||
FUNC_MGT_KEEP_ORDER_FUNC | FUNC_MGT_FORBID_STREAM_FUNC | FUNC_MGT_CUMULATIVE_FUNC | FUNC_MGT_FORBID_SYSTABLE_FUNC | FUNC_MGT_PRIMARY_KEY_FUNC,
|
||||
.translateFunc = translateDiff,
|
||||
.getEnvFunc = getDiffFuncEnv,
|
||||
|
@ -3215,6 +3215,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
.sprocessFunc = diffScalarFunction,
|
||||
.finalizeFunc = functionFinalize,
|
||||
.estimateReturnRowsFunc = diffEstReturnRows,
|
||||
.processFuncByRow = diffFunctionByRow,
|
||||
},
|
||||
{
|
||||
.name = "statecount",
|
||||
|
|
|
@ -110,10 +110,9 @@ typedef enum {
|
|||
} EAPerctAlgoType;
|
||||
|
||||
typedef struct SDiffInfo {
|
||||
bool hasPrev;
|
||||
bool includeNull;
|
||||
bool ignoreNegative; // replace the ignore with case when
|
||||
bool firstOutput;
|
||||
bool hasPrev;
|
||||
bool isFirstRow;
|
||||
int8_t ignoreOption; // replace the ignore with case when
|
||||
union {
|
||||
int64_t i64;
|
||||
double d64;
|
||||
|
@ -122,6 +121,12 @@ typedef struct SDiffInfo {
|
|||
int64_t prevTs;
|
||||
} SDiffInfo;
|
||||
|
||||
bool ignoreNegative(int8_t ignoreOption){
|
||||
return (ignoreOption & 0x1) == 0x1;
|
||||
}
|
||||
bool ignoreNull(int8_t ignoreOption){
|
||||
return (ignoreOption & 0x2) == 0x2;
|
||||
}
|
||||
typedef struct SSpreadInfo {
|
||||
double result;
|
||||
bool hasResult;
|
||||
|
@ -3100,15 +3105,14 @@ bool diffFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) {
|
|||
|
||||
SDiffInfo* pDiffInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
||||
pDiffInfo->hasPrev = false;
|
||||
pDiffInfo->isFirstRow = true;
|
||||
pDiffInfo->prev.i64 = 0;
|
||||
pDiffInfo->prevTs = -1;
|
||||
if (pCtx->numOfParams > 1) {
|
||||
pDiffInfo->ignoreNegative = pCtx->param[1].param.i; // TODO set correct param
|
||||
pDiffInfo->ignoreOption = pCtx->param[1].param.i; // TODO set correct param
|
||||
} else {
|
||||
pDiffInfo->ignoreNegative = false;
|
||||
pDiffInfo->ignoreOption = 0;
|
||||
}
|
||||
pDiffInfo->includeNull = true;
|
||||
pDiffInfo->firstOutput = false;
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -3144,91 +3148,153 @@ static int32_t doSetPrevVal(SDiffInfo* pDiffInfo, int32_t type, const char* pv,
|
|||
return TSDB_CODE_FUNC_FUNTION_PARA_TYPE;
|
||||
}
|
||||
pDiffInfo->prevTs = ts;
|
||||
|
||||
pDiffInfo->hasPrev = true;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t diffIsNegtive(SDiffInfo* pDiffInfo, int32_t type, const char* pv) {
|
||||
switch (type) {
|
||||
case TSDB_DATA_TYPE_UINT: {
|
||||
int64_t v = *(uint32_t*)pv;
|
||||
return v < pDiffInfo->prev.i64;
|
||||
}
|
||||
case TSDB_DATA_TYPE_INT: {
|
||||
int64_t v = *(int32_t*)pv;
|
||||
return v < pDiffInfo->prev.i64;
|
||||
}
|
||||
case TSDB_DATA_TYPE_BOOL: {
|
||||
int64_t v = *(bool*)pv;
|
||||
return v < pDiffInfo->prev.i64;
|
||||
}
|
||||
case TSDB_DATA_TYPE_UTINYINT: {
|
||||
int64_t v = *(uint8_t*)pv;
|
||||
return v < pDiffInfo->prev.i64;
|
||||
}
|
||||
case TSDB_DATA_TYPE_TINYINT: {
|
||||
int64_t v = *(int8_t*)pv;
|
||||
return v < pDiffInfo->prev.i64;
|
||||
}
|
||||
case TSDB_DATA_TYPE_USMALLINT: {
|
||||
int64_t v = *(uint16_t*)pv;
|
||||
return v < pDiffInfo->prev.i64;
|
||||
}
|
||||
case TSDB_DATA_TYPE_SMALLINT: {
|
||||
int64_t v = *(int16_t*)pv;
|
||||
return v < pDiffInfo->prev.i64;
|
||||
}
|
||||
case TSDB_DATA_TYPE_UBIGINT:{
|
||||
uint64_t v = *(uint64_t*)pv;
|
||||
return v < (uint64_t)pDiffInfo->prev.i64;
|
||||
}
|
||||
case TSDB_DATA_TYPE_TIMESTAMP:
|
||||
case TSDB_DATA_TYPE_BIGINT: {
|
||||
int64_t v = *(int64_t*)pv;
|
||||
return v < pDiffInfo->prev.i64;
|
||||
}
|
||||
case TSDB_DATA_TYPE_FLOAT: {
|
||||
float v = *(float*)pv;
|
||||
return v < pDiffInfo->prev.d64;
|
||||
}
|
||||
case TSDB_DATA_TYPE_DOUBLE: {
|
||||
double v = *(double*)pv;
|
||||
return v < pDiffInfo->prev.d64;
|
||||
}
|
||||
default:
|
||||
return false;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
static void tryToSetInt64(SDiffInfo* pDiffInfo, int32_t type, SColumnInfoData* pOutput, int64_t v, int32_t pos) {
|
||||
bool isNegative = v < pDiffInfo->prev.i64;
|
||||
if(type == TSDB_DATA_TYPE_UBIGINT){
|
||||
isNegative = (uint64_t)v < (uint64_t)pDiffInfo->prev.i64;
|
||||
}
|
||||
int64_t delta = v - pDiffInfo->prev.i64;
|
||||
if (isNegative && ignoreNegative(pDiffInfo->ignoreOption)) {
|
||||
colDataSetNull_f_s(pOutput, pos);
|
||||
pOutput->hasNull = true;
|
||||
} else {
|
||||
colDataSetInt64(pOutput, pos, &delta);
|
||||
}
|
||||
pDiffInfo->prev.i64 = v;
|
||||
}
|
||||
|
||||
static void tryToSetDouble(SDiffInfo* pDiffInfo, SColumnInfoData* pOutput, double v, int32_t pos) {
|
||||
double delta = v - pDiffInfo->prev.d64;
|
||||
if (delta < 0 && ignoreNegative(pDiffInfo->ignoreOption)) {
|
||||
colDataSetNull_f_s(pOutput, pos);
|
||||
} else {
|
||||
colDataSetDouble(pOutput, pos, &delta);
|
||||
}
|
||||
pDiffInfo->prev.d64 = v;
|
||||
}
|
||||
|
||||
static int32_t doHandleDiff(SDiffInfo* pDiffInfo, int32_t type, const char* pv, SColumnInfoData* pOutput, int32_t pos,
|
||||
int64_t ts) {
|
||||
if (!pDiffInfo->hasPrev) {
|
||||
colDataSetNull_f_s(pOutput, pos);
|
||||
return doSetPrevVal(pDiffInfo, type, pv, ts);
|
||||
}
|
||||
pDiffInfo->prevTs = ts;
|
||||
switch (type) {
|
||||
case TSDB_DATA_TYPE_UINT:
|
||||
case TSDB_DATA_TYPE_UINT: {
|
||||
int64_t v = *(uint32_t*)pv;
|
||||
tryToSetInt64(pDiffInfo, type, pOutput, v, pos);
|
||||
break;
|
||||
}
|
||||
case TSDB_DATA_TYPE_INT: {
|
||||
int32_t v = *(int32_t*)pv;
|
||||
int64_t delta = v - pDiffInfo->prev.i64; // direct previous may be null
|
||||
if (delta < 0 && pDiffInfo->ignoreNegative) {
|
||||
colDataSetNull_f_s(pOutput, pos);
|
||||
} else {
|
||||
colDataSetInt64(pOutput, pos, &delta);
|
||||
}
|
||||
pDiffInfo->prev.i64 = v;
|
||||
|
||||
int64_t v = *(int32_t*)pv;
|
||||
tryToSetInt64(pDiffInfo, type, pOutput, v, pos);
|
||||
break;
|
||||
}
|
||||
case TSDB_DATA_TYPE_BOOL: {
|
||||
int64_t v = *(bool*)pv;
|
||||
tryToSetInt64(pDiffInfo, type, pOutput, v, pos);
|
||||
break;
|
||||
}
|
||||
case TSDB_DATA_TYPE_UTINYINT: {
|
||||
int64_t v = *(uint8_t*)pv;
|
||||
tryToSetInt64(pDiffInfo, type, pOutput, v, pos);
|
||||
break;
|
||||
}
|
||||
case TSDB_DATA_TYPE_BOOL:
|
||||
case TSDB_DATA_TYPE_UTINYINT:
|
||||
case TSDB_DATA_TYPE_TINYINT: {
|
||||
int8_t v = *(int8_t*)pv;
|
||||
int64_t delta = v - pDiffInfo->prev.i64; // direct previous may be null
|
||||
if (delta < 0 && pDiffInfo->ignoreNegative) {
|
||||
colDataSetNull_f_s(pOutput, pos);
|
||||
} else {
|
||||
colDataSetInt64(pOutput, pos, &delta);
|
||||
}
|
||||
pDiffInfo->prev.i64 = v;
|
||||
int64_t v = *(int8_t*)pv;
|
||||
tryToSetInt64(pDiffInfo, type, pOutput, v, pos);
|
||||
break;
|
||||
}
|
||||
case TSDB_DATA_TYPE_USMALLINT:{
|
||||
int64_t v = *(uint16_t*)pv;
|
||||
tryToSetInt64(pDiffInfo, type, pOutput, v, pos);
|
||||
break;
|
||||
}
|
||||
case TSDB_DATA_TYPE_USMALLINT:
|
||||
case TSDB_DATA_TYPE_SMALLINT: {
|
||||
int16_t v = *(int16_t*)pv;
|
||||
int64_t delta = v - pDiffInfo->prev.i64; // direct previous may be null
|
||||
if (delta < 0 && pDiffInfo->ignoreNegative) {
|
||||
colDataSetNull_f_s(pOutput, pos);
|
||||
} else {
|
||||
colDataSetInt64(pOutput, pos, &delta);
|
||||
}
|
||||
pDiffInfo->prev.i64 = v;
|
||||
int64_t v = *(int16_t*)pv;
|
||||
tryToSetInt64(pDiffInfo, type, pOutput, v, pos);
|
||||
break;
|
||||
}
|
||||
case TSDB_DATA_TYPE_TIMESTAMP:
|
||||
case TSDB_DATA_TYPE_UBIGINT:
|
||||
case TSDB_DATA_TYPE_BIGINT: {
|
||||
int64_t v = *(int64_t*)pv;
|
||||
int64_t delta = v - pDiffInfo->prev.i64; // direct previous may be null
|
||||
if (delta < 0 && pDiffInfo->ignoreNegative) {
|
||||
colDataSetNull_f_s(pOutput, pos);
|
||||
} else {
|
||||
colDataSetInt64(pOutput, pos, &delta);
|
||||
}
|
||||
pDiffInfo->prev.i64 = v;
|
||||
tryToSetInt64(pDiffInfo, type, pOutput, v, pos);
|
||||
break;
|
||||
}
|
||||
case TSDB_DATA_TYPE_FLOAT: {
|
||||
float v = *(float*)pv;
|
||||
double delta = v - pDiffInfo->prev.d64; // direct previous may be null
|
||||
if ((delta < 0 && pDiffInfo->ignoreNegative) || isinf(delta) || isnan(delta)) { // check for overflow
|
||||
colDataSetNull_f_s(pOutput, pos);
|
||||
} else {
|
||||
colDataSetDouble(pOutput, pos, &delta);
|
||||
}
|
||||
pDiffInfo->prev.d64 = v;
|
||||
double v = *(float*)pv;
|
||||
tryToSetDouble(pDiffInfo, pOutput, v, pos);
|
||||
break;
|
||||
}
|
||||
case TSDB_DATA_TYPE_DOUBLE: {
|
||||
double v = *(double*)pv;
|
||||
double delta = v - pDiffInfo->prev.d64; // direct previous may be null
|
||||
if ((delta < 0 && pDiffInfo->ignoreNegative) || isinf(delta) || isnan(delta)) { // check for overflow
|
||||
colDataSetNull_f_s(pOutput, pos);
|
||||
} else {
|
||||
colDataSetDouble(pOutput, pos, &delta);
|
||||
}
|
||||
pDiffInfo->prev.d64 = v;
|
||||
tryToSetDouble(pDiffInfo, pOutput, v, pos);
|
||||
break;
|
||||
}
|
||||
default:
|
||||
return TSDB_CODE_FUNC_FUNTION_PARA_TYPE;
|
||||
}
|
||||
|
||||
pDiffInfo->hasPrev = true;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -3271,71 +3337,155 @@ bool funcInputGetNextRowIndex(SInputColumnInfoData* pInput, int32_t from, bool f
|
|||
}
|
||||
}
|
||||
|
||||
int32_t diffFunction(SqlFunctionCtx* pCtx) {
|
||||
int32_t diffResultIsNull(SqlFunctionCtx* pCtx, SFuncInputRow* pRow){
|
||||
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||
SDiffInfo* pDiffInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
||||
|
||||
if (pRow->isDataNull || !pDiffInfo->hasPrev ) {
|
||||
return true;
|
||||
} else if (ignoreNegative(pDiffInfo->ignoreOption)){
|
||||
return diffIsNegtive(pDiffInfo, pCtx->input.pData[0]->info.type, pRow->pData);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
bool isFirstRow(SqlFunctionCtx* pCtx, SFuncInputRow* pRow) {
|
||||
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||
SDiffInfo* pDiffInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
||||
return pDiffInfo->isFirstRow;
|
||||
}
|
||||
|
||||
int32_t trySetPreVal(SqlFunctionCtx* pCtx, SFuncInputRow* pRow) {
|
||||
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||
SDiffInfo* pDiffInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
||||
pDiffInfo->isFirstRow = false;
|
||||
if (pRow->isDataNull) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SInputColumnInfoData* pInput = &pCtx->input;
|
||||
SColumnInfoData* pInputCol = pInput->pData[0];
|
||||
int8_t inputType = pInputCol->info.type;
|
||||
|
||||
char* pv = pRow->pData;
|
||||
return doSetPrevVal(pDiffInfo, inputType, pv, pRow->ts);
|
||||
}
|
||||
|
||||
int32_t setDoDiffResult(SqlFunctionCtx* pCtx, SFuncInputRow* pRow, int32_t pos) {
|
||||
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||
SDiffInfo* pDiffInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
||||
|
||||
SInputColumnInfoData* pInput = &pCtx->input;
|
||||
SColumnInfoData* pInputCol = pInput->pData[0];
|
||||
int8_t inputType = pInputCol->info.type;
|
||||
SColumnInfoData* pInputCol = pInput->pData[0];
|
||||
int8_t inputType = pInputCol->info.type;
|
||||
SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput;
|
||||
|
||||
TSKEY* tsList = (int64_t*)pInput->pPTS->pData;
|
||||
if (pRow->isDataNull) {
|
||||
colDataSetNull_f_s(pOutput, pos);
|
||||
pOutput->hasNull = true;
|
||||
|
||||
int32_t numOfElems = 0;
|
||||
int32_t startOffset = pCtx->offset;
|
||||
|
||||
SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput;
|
||||
|
||||
funcInputUpdate(pCtx);
|
||||
|
||||
SFuncInputRow row = {0};
|
||||
while (funcInputGetNextRow(pCtx, &row)) {
|
||||
int32_t pos = startOffset + numOfElems;
|
||||
|
||||
if (row.isDataNull) {
|
||||
if (pDiffInfo->includeNull) {
|
||||
colDataSetNull_f_s(pOutput, pos);
|
||||
|
||||
// handle selectivity
|
||||
if (pCtx->subsidiaries.num > 0) {
|
||||
appendSelectivityCols(pCtx, row.block, row.rowIndex, pos);
|
||||
}
|
||||
|
||||
numOfElems += 1;
|
||||
}
|
||||
continue;
|
||||
// handle selectivity
|
||||
if (pCtx->subsidiaries.num > 0) {
|
||||
appendSelectivityCols(pCtx, pRow->block, pRow->rowIndex, pos);
|
||||
}
|
||||
|
||||
char* pv = row.pData;
|
||||
|
||||
if (pDiffInfo->hasPrev) {
|
||||
if (row.ts == pDiffInfo->prevTs) {
|
||||
return TSDB_CODE_FUNC_DUP_TIMESTAMP;
|
||||
}
|
||||
int32_t code = doHandleDiff(pDiffInfo, inputType, pv, pOutput, pos, row.ts);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
// handle selectivity
|
||||
if (pCtx->subsidiaries.num > 0) {
|
||||
appendSelectivityCols(pCtx, row.block, row.rowIndex, pos);
|
||||
}
|
||||
|
||||
numOfElems++;
|
||||
} else {
|
||||
int32_t code = doSetPrevVal(pDiffInfo, inputType, pv, row.ts);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
||||
pDiffInfo->hasPrev = true;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
char* pv = pRow->pData;
|
||||
|
||||
if (pRow->ts == pDiffInfo->prevTs) {
|
||||
return TSDB_CODE_FUNC_DUP_TIMESTAMP;
|
||||
}
|
||||
int32_t code = doHandleDiff(pDiffInfo, inputType, pv, pOutput, pos, pRow->ts);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
// handle selectivity
|
||||
if (pCtx->subsidiaries.num > 0) {
|
||||
appendSelectivityCols(pCtx, pRow->block, pRow->rowIndex, pos);
|
||||
}
|
||||
|
||||
pResInfo->numOfRes = numOfElems;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t diffFunction(SqlFunctionCtx* pCtx) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t diffFunctionByRow(SArray* pCtxArray) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int diffColNum = pCtxArray->size;
|
||||
if(diffColNum == 0) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
int32_t numOfElems = 0;
|
||||
|
||||
SArray* pRows = taosArrayInit_s(sizeof(SFuncInputRow), diffColNum);
|
||||
|
||||
bool keepNull = false;
|
||||
for (int i = 0; i < diffColNum; ++i) {
|
||||
SqlFunctionCtx* pCtx = *(SqlFunctionCtx**)taosArrayGet(pCtxArray, i);
|
||||
funcInputUpdate(pCtx);
|
||||
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||
SDiffInfo* pDiffInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
||||
if (!ignoreNull(pDiffInfo->ignoreOption)) {
|
||||
keepNull = true;
|
||||
}
|
||||
}
|
||||
|
||||
SqlFunctionCtx* pCtx0 = *(SqlFunctionCtx**)taosArrayGet(pCtxArray, 0);
|
||||
SFuncInputRow* pRow0 = (SFuncInputRow*)taosArrayGet(pRows, 0);
|
||||
int32_t startOffset = pCtx0->offset;
|
||||
while (funcInputGetNextRow(pCtx0, pRow0)) {
|
||||
bool hasNotNullValue = !diffResultIsNull(pCtx0, pRow0);
|
||||
for (int i = 1; i < diffColNum; ++i) {
|
||||
SqlFunctionCtx* pCtx = *(SqlFunctionCtx**)taosArrayGet(pCtxArray, i);
|
||||
SFuncInputRow* pRow = (SFuncInputRow*)taosArrayGet(pRows, i);
|
||||
if(!funcInputGetNextRow(pCtx, pRow)) {
|
||||
// rows are not equal
|
||||
code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
|
||||
goto _exit;
|
||||
}
|
||||
if (!diffResultIsNull(pCtx, pRow)) {
|
||||
hasNotNullValue = true;
|
||||
}
|
||||
}
|
||||
int32_t pos = startOffset + numOfElems;
|
||||
|
||||
bool newRow = false;
|
||||
for (int i = 0; i < diffColNum; ++i) {
|
||||
SqlFunctionCtx* pCtx = *(SqlFunctionCtx**)taosArrayGet(pCtxArray, i);
|
||||
SFuncInputRow* pRow = (SFuncInputRow*)taosArrayGet(pRows, i);
|
||||
if ((keepNull || hasNotNullValue) && !isFirstRow(pCtx, pRow)){
|
||||
code = setDoDiffResult(pCtx, pRow, pos);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _exit;
|
||||
}
|
||||
newRow = true;
|
||||
} else {
|
||||
code = trySetPreVal(pCtx, pRow);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _exit;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (newRow) ++numOfElems;
|
||||
}
|
||||
|
||||
for (int i = 0; i < diffColNum; ++i) {
|
||||
SqlFunctionCtx* pCtx = *(SqlFunctionCtx**)taosArrayGet(pCtxArray, i);
|
||||
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||
pResInfo->numOfRes = numOfElems;
|
||||
}
|
||||
|
||||
_exit:
|
||||
if (pRows) {
|
||||
taosArrayDestroy(pRows);
|
||||
pRows = NULL;
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t getTopBotInfoSize(int64_t numOfItems) { return sizeof(STopBotRes) + numOfItems * sizeof(STopBotResItem); }
|
||||
|
||||
bool getTopBotFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
||||
|
|
|
@ -141,6 +141,7 @@ int32_t fmGetFuncExecFuncs(int32_t funcId, SFuncExecFuncs* pFpSet) {
|
|||
pFpSet->process = funcMgtBuiltins[funcId].processFunc;
|
||||
pFpSet->finalize = funcMgtBuiltins[funcId].finalizeFunc;
|
||||
pFpSet->combine = funcMgtBuiltins[funcId].combineFunc;
|
||||
pFpSet->processFuncByRow = funcMgtBuiltins[funcId].processFuncByRow;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -274,6 +275,8 @@ bool fmIsBlockDistFunc(int32_t funcId) {
|
|||
return FUNCTION_TYPE_BLOCK_DIST == funcMgtBuiltins[funcId].type;
|
||||
}
|
||||
|
||||
bool fmIsProcessByRowFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_PROCESS_BY_ROW); }
|
||||
|
||||
bool fmIsIgnoreNullFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_IGNORE_NULL_FUNC); }
|
||||
|
||||
void fmFuncMgtDestroy() {
|
||||
|
|
|
@ -2194,11 +2194,17 @@ static int32_t translateIndefiniteRowsFunc(STranslateContext* pCxt, SFunctionNod
|
|||
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_NOT_ALLOWED_FUNC);
|
||||
}
|
||||
SSelectStmt* pSelect = (SSelectStmt*)pCxt->pCurrStmt;
|
||||
if (pSelect->hasAggFuncs || pSelect->hasMultiRowsFunc ||
|
||||
(pSelect->hasIndefiniteRowsFunc &&
|
||||
(FUNC_RETURN_ROWS_INDEFINITE == pSelect->returnRows || pSelect->returnRows != fmGetFuncReturnRows(pFunc)))) {
|
||||
if (pSelect->hasAggFuncs || pSelect->hasMultiRowsFunc) {
|
||||
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_NOT_ALLOWED_FUNC);
|
||||
}
|
||||
if (pSelect->hasIndefiniteRowsFunc &&
|
||||
(FUNC_RETURN_ROWS_INDEFINITE == pSelect->returnRows || pSelect->returnRows != fmGetFuncReturnRows(pFunc)) &&
|
||||
(pSelect->lastProcessByRowFuncId == -1 || !fmIsProcessByRowFunc(pFunc->funcId))) {
|
||||
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_NOT_ALLOWED_FUNC);
|
||||
}
|
||||
if (pSelect->lastProcessByRowFuncId != -1 && pSelect->lastProcessByRowFuncId != pFunc->funcId) {
|
||||
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_NOT_ALLOWED_DIFFERENT_BY_ROW_FUNC);
|
||||
}
|
||||
if (NULL != pSelect->pWindow || NULL != pSelect->pGroupByList) {
|
||||
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_NOT_ALLOWED_FUNC,
|
||||
"%s function is not supported in window query or group query", pFunc->functionName);
|
||||
|
@ -2462,6 +2468,9 @@ static void setFuncClassification(SNode* pCurrStmt, SFunctionNode* pFunc) {
|
|||
} else if (fmIsInterpFunc(pFunc->funcId)) {
|
||||
pSelect->returnRows = fmGetFuncReturnRows(pFunc);
|
||||
}
|
||||
if (fmIsProcessByRowFunc(pFunc->funcId)) {
|
||||
pSelect->lastProcessByRowFuncId = pFunc->funcId;
|
||||
}
|
||||
|
||||
pSelect->hasMultiRowsFunc = pSelect->hasMultiRowsFunc ? true : fmIsMultiRowsFunc(pFunc->funcId);
|
||||
if (fmIsSelectFunc(pFunc->funcId)) {
|
||||
|
@ -3398,6 +3407,7 @@ static int32_t checkIsEmptyResult(STranslateContext* pCxt, SSelectStmt* pSelect)
|
|||
static int32_t resetSelectFuncNumWithoutDup(SSelectStmt* pSelect) {
|
||||
if (pSelect->selectFuncNum <= 1) return TSDB_CODE_SUCCESS;
|
||||
pSelect->selectFuncNum = 0;
|
||||
pSelect->lastProcessByRowFuncId = -1;
|
||||
SNodeList* pNodeList = nodesMakeList();
|
||||
int32_t code = nodesCollectSelectFuncs(pSelect, SQL_CLAUSE_FROM, NULL, fmIsSelectFunc, pNodeList);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
|
|
|
@ -221,6 +221,8 @@ static char* getSyntaxErrFormat(int32_t errCode) {
|
|||
return "Table name:%s duplicated";
|
||||
case TSDB_CODE_PAR_TAG_NAME_DUPLICATED:
|
||||
return "Tag name:%s duplicated";
|
||||
case TSDB_CODE_PAR_NOT_ALLOWED_DIFFERENT_BY_ROW_FUNC:
|
||||
return "Some functions cannot appear in the select list at the same time";
|
||||
default:
|
||||
return "Unknown error";
|
||||
}
|
||||
|
@ -772,6 +774,7 @@ SNode* createSelectStmtImpl(bool isDistinct, SNodeList* pProjectionList, SNode*
|
|||
select->onlyHasKeepOrderFunc = true;
|
||||
select->timeRange = TSWINDOW_INITIALIZER;
|
||||
select->pHint = pHint;
|
||||
select->lastProcessByRowFuncId = -1;
|
||||
return (SNode*)select;
|
||||
}
|
||||
|
||||
|
|
|
@ -682,6 +682,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_PAR_PRIMARY_KEY_IS_NONE, "Primary key column
|
|||
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_TBNAME_ERROR, "Pseudo tag tbname not set")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_TBNAME_DUPLICATED, "Table name duplicated")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_TAG_NAME_DUPLICATED, "Tag name duplicated")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_NOT_ALLOWED_DIFFERENT_BY_ROW_FUNC, "Some functions cannot appear in the select list at the same time")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INTERNAL_ERROR, "Parser internal error")
|
||||
|
||||
//planner
|
||||
|
|
|
@ -45,12 +45,357 @@ class TDTestCase:
|
|||
else:
|
||||
tdSql.checkData(i, j, 1)
|
||||
|
||||
def ignoreTest(self):
|
||||
dbname = "db"
|
||||
|
||||
ts1 = 1694912400000
|
||||
tdSql.execute(f'''create table {dbname}.stb30749(ts timestamp, col1 tinyint, col2 smallint) tags(loc nchar(20))''')
|
||||
tdSql.execute(f"create table {dbname}.stb30749_1 using {dbname}.stb30749 tags('shanghai')")
|
||||
|
||||
tdSql.execute(f"insert into {dbname}.stb30749_1 values(%d, null, 1)" % (ts1 + 1))
|
||||
tdSql.execute(f"insert into {dbname}.stb30749_1 values(%d, 3, null)" % (ts1 + 2))
|
||||
tdSql.execute(f"insert into {dbname}.stb30749_1 values(%d, 4, 3)" % (ts1 + 3))
|
||||
tdSql.execute(f"insert into {dbname}.stb30749_1 values(%d, 1, 1)" % (ts1 + 4))
|
||||
tdSql.execute(f"insert into {dbname}.stb30749_1 values(%d, 2, null)" % (ts1 + 5))
|
||||
tdSql.execute(f"insert into {dbname}.stb30749_1 values(%d, null, null)" % (ts1 + 6))
|
||||
|
||||
tdSql.query(f"select ts, diff(col1) from {dbname}.stb30749_1")
|
||||
tdSql.checkRows(5)
|
||||
tdSql.checkData(0, 0, '2023-09-17 09:00:00.002')
|
||||
tdSql.checkData(0, 1, None)
|
||||
tdSql.checkData(1, 0, '2023-09-17 09:00:00.003')
|
||||
tdSql.checkData(1, 1, 1)
|
||||
tdSql.checkData(2, 0, '2023-09-17 09:00:00.004')
|
||||
tdSql.checkData(2, 1, -3)
|
||||
tdSql.checkData(3, 0, '2023-09-17 09:00:00.005')
|
||||
tdSql.checkData(3, 1, 1)
|
||||
tdSql.checkData(4, 0, '2023-09-17 09:00:00.006')
|
||||
tdSql.checkData(4, 1, None)
|
||||
|
||||
tdSql.query(f"select ts, diff(col1, 1) from {dbname}.stb30749_1")
|
||||
tdSql.checkRows(5)
|
||||
tdSql.checkData(0, 0, '2023-09-17 09:00:00.002')
|
||||
tdSql.checkData(0, 1, None)
|
||||
tdSql.checkData(1, 0, '2023-09-17 09:00:00.003')
|
||||
tdSql.checkData(1, 1, 1)
|
||||
tdSql.checkData(2, 0, '2023-09-17 09:00:00.004')
|
||||
tdSql.checkData(2, 1, None)
|
||||
tdSql.checkData(3, 0, '2023-09-17 09:00:00.005')
|
||||
tdSql.checkData(3, 1, 1)
|
||||
tdSql.checkData(4, 0, '2023-09-17 09:00:00.006')
|
||||
tdSql.checkData(4, 1, None)
|
||||
|
||||
tdSql.query(f"select ts, diff(col1, 2) from {dbname}.stb30749_1")
|
||||
tdSql.checkRows(3)
|
||||
tdSql.checkData(0, 0, '2023-09-17 09:00:00.003')
|
||||
tdSql.checkData(0, 1, 1)
|
||||
tdSql.checkData(1, 0, '2023-09-17 09:00:00.004')
|
||||
tdSql.checkData(1, 1, -3)
|
||||
tdSql.checkData(2, 0, '2023-09-17 09:00:00.005')
|
||||
tdSql.checkData(2, 1, 1)
|
||||
|
||||
tdSql.query(f"select ts, diff(col1, 3) from {dbname}.stb30749_1")
|
||||
tdSql.checkRows(2)
|
||||
tdSql.checkData(0, 0, '2023-09-17 09:00:00.003')
|
||||
tdSql.checkData(0, 1, 1)
|
||||
tdSql.checkData(1, 0, '2023-09-17 09:00:00.005')
|
||||
tdSql.checkData(1, 1, 1)
|
||||
|
||||
tdSql.query(f"select ts, diff(col1, 3), diff(col2, 0) from {dbname}.stb30749_1")
|
||||
tdSql.checkRows(5)
|
||||
tdSql.checkData(0, 0, '2023-09-17 09:00:00.002')
|
||||
tdSql.checkData(1, 2, 2)
|
||||
tdSql.checkData(2, 1, None)
|
||||
tdSql.checkData(2, 2, -2)
|
||||
|
||||
tdSql.query(f"select ts, diff(col1, 3), diff(col2, 1) from {dbname}.stb30749_1")
|
||||
tdSql.checkRows(5)
|
||||
tdSql.checkData(0, 0, '2023-09-17 09:00:00.002')
|
||||
tdSql.checkData(1, 2, 2)
|
||||
tdSql.checkData(2, 1, None)
|
||||
tdSql.checkData(2, 2, None)
|
||||
|
||||
tdSql.query(f"select ts, diff(col1, 2), diff(col2, 2) from {dbname}.stb30749_1")
|
||||
tdSql.checkRows(3)
|
||||
tdSql.checkData(0, 0, '2023-09-17 09:00:00.003')
|
||||
tdSql.checkData(1, 0, '2023-09-17 09:00:00.004')
|
||||
tdSql.checkData(2, 0, '2023-09-17 09:00:00.005')
|
||||
tdSql.checkData(0, 1, 1)
|
||||
tdSql.checkData(1, 1, -3)
|
||||
tdSql.checkData(2, 1, 1)
|
||||
tdSql.checkData(0, 2, 2)
|
||||
tdSql.checkData(1, 2, -2)
|
||||
tdSql.checkData(2, 2, None)
|
||||
|
||||
tdSql.query(f"select ts, diff(col1, 3), diff(col2, 2) from {dbname}.stb30749_1")
|
||||
tdSql.checkRows(3)
|
||||
tdSql.checkData(0, 0, '2023-09-17 09:00:00.003')
|
||||
tdSql.checkData(1, 0, '2023-09-17 09:00:00.004')
|
||||
tdSql.checkData(2, 0, '2023-09-17 09:00:00.005')
|
||||
tdSql.checkData(0, 1, 1)
|
||||
tdSql.checkData(1, 1, None)
|
||||
tdSql.checkData(2, 1, 1)
|
||||
tdSql.checkData(0, 2, 2)
|
||||
tdSql.checkData(1, 2, -2)
|
||||
tdSql.checkData(2, 2, None)
|
||||
|
||||
tdSql.query(f"select ts, diff(col1, 3), diff(col2, 3) from {dbname}.stb30749_1")
|
||||
tdSql.checkRows(2)
|
||||
tdSql.checkData(0, 0, '2023-09-17 09:00:00.003')
|
||||
tdSql.checkData(1, 0, '2023-09-17 09:00:00.005')
|
||||
tdSql.checkData(0, 1, 1)
|
||||
tdSql.checkData(1, 1, 1)
|
||||
tdSql.checkData(0, 2, 2)
|
||||
tdSql.checkData(1, 2, None)
|
||||
|
||||
tdSql.execute(f"create table {dbname}.stb30749_2 using {dbname}.stb30749 tags('shanghai')")
|
||||
|
||||
tdSql.execute(f"insert into {dbname}.stb30749_2 values(%d, null, 1)" % (ts1 - 1))
|
||||
tdSql.execute(f"insert into {dbname}.stb30749_2 values(%d, 4, 3)" % (ts1 + 0))
|
||||
tdSql.execute(f"insert into {dbname}.stb30749_2 values(%d, null, null)" % (ts1 + 10))
|
||||
|
||||
tdSql.query(f"select ts, diff(col1), diff(col2, 1) from {dbname}.stb30749")
|
||||
tdSql.checkRows(8)
|
||||
tdSql.checkData(2, 0, '2023-09-17 09:00:00.002')
|
||||
tdSql.checkData(3, 0, '2023-09-17 09:00:00.003')
|
||||
tdSql.checkData(2, 1, -1)
|
||||
tdSql.checkData(2, 2, None)
|
||||
tdSql.checkData(3, 1, 1)
|
||||
tdSql.checkData(3, 2, 2)
|
||||
|
||||
tdSql.query(f"select ts, diff(col1), diff(col2) from {dbname}.stb30749")
|
||||
tdSql.checkRows(8)
|
||||
tdSql.checkData(2, 0, '2023-09-17 09:00:00.002')
|
||||
tdSql.checkData(3, 0, '2023-09-17 09:00:00.003')
|
||||
tdSql.checkData(2, 1, -1)
|
||||
tdSql.checkData(2, 2, None)
|
||||
tdSql.checkData(3, 1, 1)
|
||||
tdSql.checkData(3, 2, 2)
|
||||
|
||||
tdSql.query(f"select ts, diff(col1), diff(col2, 3) from {dbname}.stb30749")
|
||||
tdSql.checkRows(8)
|
||||
tdSql.checkData(2, 0, '2023-09-17 09:00:00.002')
|
||||
tdSql.checkData(3, 0, '2023-09-17 09:00:00.003')
|
||||
tdSql.checkData(2, 1, -1)
|
||||
tdSql.checkData(2, 2, None)
|
||||
tdSql.checkData(3, 1, 1)
|
||||
tdSql.checkData(3, 2, 2)
|
||||
|
||||
tdSql.query(f"select ts, diff(col1, 1), diff(col2, 2) from {dbname}.stb30749")
|
||||
tdSql.checkRows(8)
|
||||
tdSql.checkData(2, 0, '2023-09-17 09:00:00.002')
|
||||
tdSql.checkData(3, 0, '2023-09-17 09:00:00.003')
|
||||
tdSql.checkData(2, 1, None)
|
||||
tdSql.checkData(2, 2, None)
|
||||
tdSql.checkData(3, 1, 1)
|
||||
tdSql.checkData(3, 2, 2)
|
||||
|
||||
tdSql.query(f"select ts, diff(col1, 1), diff(col2, 3) from {dbname}.stb30749")
|
||||
tdSql.checkRows(8)
|
||||
tdSql.checkData(2, 0, '2023-09-17 09:00:00.002')
|
||||
tdSql.checkData(3, 0, '2023-09-17 09:00:00.003')
|
||||
tdSql.checkData(2, 1, None)
|
||||
tdSql.checkData(2, 2, None)
|
||||
tdSql.checkData(3, 1, 1)
|
||||
tdSql.checkData(3, 2, 2)
|
||||
|
||||
tdSql.query(f"select ts, diff(col1, 2), diff(col2, 2) from {dbname}.stb30749")
|
||||
tdSql.checkRows(6)
|
||||
tdSql.checkData(2, 0, '2023-09-17 09:00:00.002')
|
||||
tdSql.checkData(3, 0, '2023-09-17 09:00:00.003')
|
||||
tdSql.checkData(2, 1, -1)
|
||||
tdSql.checkData(2, 2, None)
|
||||
tdSql.checkData(3, 1, 1)
|
||||
tdSql.checkData(3, 2, 2)
|
||||
|
||||
tdSql.query(f"select ts, diff(col1, 3), diff(col2, 2) from {dbname}.stb30749")
|
||||
tdSql.checkRows(5)
|
||||
tdSql.checkData(2, 0, '2023-09-17 09:00:00.003')
|
||||
tdSql.checkData(3, 0, '2023-09-17 09:00:00.004')
|
||||
tdSql.checkData(2, 1, 1)
|
||||
tdSql.checkData(2, 2, 2)
|
||||
tdSql.checkData(3, 1, None)
|
||||
tdSql.checkData(3, 2, -2)
|
||||
|
||||
tdSql.query(f"select ts, diff(col1, 2), diff(col2, 3) from {dbname}.stb30749")
|
||||
tdSql.checkRows(5)
|
||||
tdSql.checkData(2, 0, '2023-09-17 09:00:00.003')
|
||||
tdSql.checkData(3, 0, '2023-09-17 09:00:00.004')
|
||||
tdSql.checkData(2, 1, 1)
|
||||
tdSql.checkData(2, 2, 2)
|
||||
tdSql.checkData(3, 1, -3)
|
||||
tdSql.checkData(3, 2, None)
|
||||
|
||||
tdSql.query(f"select ts, diff(col1, 3), diff(col2, 3) from {dbname}.stb30749")
|
||||
tdSql.checkRows(3)
|
||||
tdSql.checkData(1, 0, '2023-09-17 09:00:00.003')
|
||||
tdSql.checkData(2, 0, '2023-09-17 09:00:00.005')
|
||||
tdSql.checkData(1, 1, 1)
|
||||
tdSql.checkData(1, 2, 2)
|
||||
tdSql.checkData(2, 1, 1)
|
||||
tdSql.checkData(2, 2, None)
|
||||
|
||||
tdSql.query(f"select ts, diff(col1), diff(col2) from {dbname}.stb30749 partition by tbname")
|
||||
tdSql.checkRows(7)
|
||||
tdSql.checkData(0, 0, '2023-09-17 09:00:00.002')
|
||||
tdSql.checkData(1, 0, '2023-09-17 09:00:00.003')
|
||||
tdSql.checkData(0, 1, None)
|
||||
tdSql.checkData(0, 2, None)
|
||||
tdSql.checkData(1, 1, 1)
|
||||
tdSql.checkData(1, 2, 2)
|
||||
|
||||
tdSql.query(f"select ts, diff(col1, 3), diff(col2, 2) from {dbname}.stb30749 partition by tbname")
|
||||
tdSql.checkRows(4)
|
||||
tdSql.checkData(3, 0, '2023-09-17 09:00:00.000')
|
||||
tdSql.checkData(3, 1, None)
|
||||
tdSql.checkData(3, 2, 2)
|
||||
|
||||
tdSql.execute(f"insert into {dbname}.stb30749_2 values(%d, null, 1)" % (ts1 + 1))
|
||||
tdSql.error(f"select ts, diff(col1, 3), diff(col2, 2) from {dbname}.stb30749")
|
||||
|
||||
def withPkTest(self):
|
||||
dbname = "db"
|
||||
|
||||
ts1 = 1694912400000
|
||||
tdSql.execute(f'''create table {dbname}.stb5(ts timestamp, col1 int PRIMARY KEY, col2 smallint) tags(loc nchar(20))''')
|
||||
tdSql.execute(f"create table {dbname}.stb5_1 using {dbname}.stb5 tags('shanghai')")
|
||||
|
||||
tdSql.execute(f"insert into {dbname}.stb5_1 values(%d, 2, 1)" % (ts1 + 1))
|
||||
tdSql.execute(f"insert into {dbname}.stb5_1 values(%d, 3, null)" % (ts1 + 2))
|
||||
tdSql.execute(f"insert into {dbname}.stb5_1 values(%d, 4, 3)" % (ts1 + 3))
|
||||
|
||||
tdSql.execute(f"create table {dbname}.stb5_2 using {dbname}.stb5 tags('shanghai')")
|
||||
|
||||
tdSql.execute(f"insert into {dbname}.stb5_2 values(%d, 5, 4)" % (ts1 + 1))
|
||||
tdSql.query(f"select ts, diff(col1, 3), diff(col2, 2) from {dbname}.stb5")
|
||||
tdSql.checkRows(2)
|
||||
|
||||
tdSql.execute(f"insert into {dbname}.stb5_2 values(%d, 3, 3)" % (ts1 + 2))
|
||||
tdSql.query(f"select ts, diff(col1, 3), diff(col2, 2) from {dbname}.stb5")
|
||||
tdSql.checkRows(2)
|
||||
|
||||
|
||||
def intOverflowTest(self):
|
||||
dbname = "db"
|
||||
|
||||
ts1 = 1694912400000
|
||||
tdSql.execute(f'''create table {dbname}.stb6(ts timestamp, c1 int, c2 smallint, c3 int unsigned, c4 BIGINT, c5 BIGINT unsigned) tags(loc nchar(20))''')
|
||||
tdSql.execute(f"create table {dbname}.stb6_1 using {dbname}.stb6 tags('shanghai')")
|
||||
|
||||
tdSql.execute(f"insert into {dbname}.stb6_1 values(%d, -2147483648, -32768, 0, 9223372036854775806, 9223372036854775806)" % (ts1 + 1))
|
||||
tdSql.execute(f"insert into {dbname}.stb6_1 values(%d, 2147483647, 32767, 4294967295, 0, 0)" % (ts1 + 2))
|
||||
tdSql.execute(f"insert into {dbname}.stb6_1 values(%d, -10, -10, 0, -9223372036854775806, 16223372036854775806)" % (ts1 + 3))
|
||||
|
||||
tdSql.query(f"select ts, diff(c1), diff(c2), diff(c3), diff(c4), diff(c5) from {dbname}.stb6_1")
|
||||
tdSql.checkRows(2)
|
||||
tdSql.checkData(0, 0, '2023-09-17 09:00:00.002')
|
||||
tdSql.checkData(0, 1, 4294967295)
|
||||
tdSql.checkData(0, 2, 65535)
|
||||
tdSql.checkData(0, 3, 4294967295)
|
||||
tdSql.checkData(0, 4, -9223372036854775806)
|
||||
tdSql.checkData(0, 5, -9223372036854775806)
|
||||
tdSql.checkData(1, 0, '2023-09-17 09:00:00.003')
|
||||
tdSql.checkData(1, 1, -2147483657)
|
||||
tdSql.checkData(1, 2, -32777)
|
||||
tdSql.checkData(1, 3, -4294967295)
|
||||
tdSql.checkData(1, 4, -9223372036854775806)
|
||||
|
||||
tdSql.query(f"select ts, diff(c1, 1), diff(c2) from {dbname}.stb6_1")
|
||||
tdSql.checkRows(2)
|
||||
tdSql.checkData(0, 1, 4294967295)
|
||||
tdSql.checkData(0, 2, 65535)
|
||||
tdSql.checkData(1, 1, None)
|
||||
tdSql.checkData(1, 2, -32777)
|
||||
|
||||
tdSql.query(f"select ts, diff(c1, 1), diff(c2, 1) from {dbname}.stb6_1")
|
||||
tdSql.checkRows(2)
|
||||
tdSql.checkData(0, 1, 4294967295)
|
||||
tdSql.checkData(0, 2, 65535)
|
||||
tdSql.checkData(1, 1, None)
|
||||
tdSql.checkData(1, 2, None)
|
||||
|
||||
tdSql.query(f"select ts, diff(c1, 2), diff(c2, 3) from {dbname}.stb6_1")
|
||||
tdSql.checkRows(2)
|
||||
tdSql.checkData(0, 1, 4294967295)
|
||||
tdSql.checkData(0, 2, 65535)
|
||||
tdSql.checkData(1, 1, -2147483657)
|
||||
tdSql.checkData(1, 2, None)
|
||||
|
||||
tdSql.query(f"select ts, diff(c1, 3), diff(c2, 3) from {dbname}.stb6_1")
|
||||
tdSql.checkRows(1)
|
||||
tdSql.checkData(0, 1, 4294967295)
|
||||
tdSql.checkData(0, 2, 65535)
|
||||
|
||||
tdSql.execute(f"insert into {dbname}.stb6_1 values(%d, -10, -10, 0, 9223372036854775800, 0)" % (ts1 + 4))
|
||||
tdSql.execute(f"insert into {dbname}.stb6_1 values(%d, -10, -10, 0, 9223372036854775800, 16223372036854775806)" % (ts1 + 5))
|
||||
|
||||
tdSql.query(f"select ts, diff(c4, 0) from {dbname}.stb6_1")
|
||||
tdSql.checkRows(4)
|
||||
|
||||
tdSql.query(f"select ts, diff(c4, 1) from {dbname}.stb6_1")
|
||||
tdSql.checkRows(4)
|
||||
tdSql.checkData(2, 1, -10)
|
||||
|
||||
tdSql.query(f"select ts, diff(c4, 2) from {dbname}.stb6_1")
|
||||
tdSql.checkRows(4)
|
||||
|
||||
tdSql.query(f"select ts, diff(c4, 3) from {dbname}.stb6_1")
|
||||
tdSql.checkRows(2)
|
||||
tdSql.checkData(0, 1, -10)
|
||||
tdSql.checkData(1, 1, 0)
|
||||
|
||||
tdSql.query(f"select ts, diff(c5, 0) from {dbname}.stb6_1")
|
||||
tdSql.checkRows(4)
|
||||
|
||||
tdSql.query(f"select ts, diff(c5, 1) from {dbname}.stb6_1")
|
||||
tdSql.checkRows(4)
|
||||
tdSql.checkData(0, 1, None)
|
||||
tdSql.checkData(1, 0, '2023-09-17 09:00:00.003')
|
||||
tdSql.checkData(2, 1, None)
|
||||
tdSql.checkData(3, 0, '2023-09-17 09:00:00.005')
|
||||
|
||||
tdSql.query(f"select ts, diff(c5, 2) from {dbname}.stb6_1")
|
||||
tdSql.checkRows(4)
|
||||
|
||||
tdSql.query(f"select ts, diff(c5, 3) from {dbname}.stb6_1")
|
||||
tdSql.checkRows(2)
|
||||
tdSql.checkData(0, 0, '2023-09-17 09:00:00.003')
|
||||
tdSql.checkData(1, 0, '2023-09-17 09:00:00.005')
|
||||
|
||||
def doubleOverflowTest(self):
|
||||
dbname = "db"
|
||||
|
||||
ts1 = 1694912400000
|
||||
tdSql.execute(f'''create table {dbname}.stb7(ts timestamp, c1 float, c2 double) tags(loc nchar(20))''')
|
||||
tdSql.execute(f"create table {dbname}.stb7_1 using {dbname}.stb7 tags('shanghai')")
|
||||
|
||||
tdSql.execute(f"insert into {dbname}.stb7_1 values(%d, 334567777777777777777343434343333333733, 334567777777777777777343434343333333733)" % (ts1 + 1))
|
||||
tdSql.execute(f"insert into {dbname}.stb7_1 values(%d, -334567777777777777777343434343333333733, -334567777777777777777343434343333333733)" % (ts1 + 2))
|
||||
tdSql.execute(f"insert into {dbname}.stb7_1 values(%d, 334567777777777777777343434343333333733, 334567777777777777777343434343333333733)" % (ts1 + 3))
|
||||
|
||||
tdSql.query(f"select ts, diff(c1), diff(c2) from {dbname}.stb7_1")
|
||||
tdSql.checkRows(2)
|
||||
|
||||
tdSql.query(f"select ts, diff(c1, 1), diff(c2, 1) from {dbname}.stb7_1")
|
||||
tdSql.checkRows(2)
|
||||
tdSql.checkData(0, 1, None)
|
||||
tdSql.checkData(0, 2, None)
|
||||
|
||||
tdSql.query(f"select ts, diff(c1, 3), diff(c2, 3) from {dbname}.stb7_1")
|
||||
tdSql.checkRows(1)
|
||||
tdSql.checkData(0, 0, '2023-09-17 09:00:00.003')
|
||||
|
||||
def run(self):
|
||||
tdSql.prepare()
|
||||
dbname = "db"
|
||||
|
||||
# full type test
|
||||
self.full_datatype_test()
|
||||
|
||||
self.ignoreTest()
|
||||
self.withPkTest()
|
||||
self.intOverflowTest()
|
||||
self.doubleOverflowTest()
|
||||
|
||||
tdSql.execute(
|
||||
f"create table {dbname}.ntb(ts timestamp,c1 int,c2 double,c3 float)")
|
||||
|
@ -219,9 +564,18 @@ class TDTestCase:
|
|||
tdSql.error(f"select diff(col1,1.23) from {dbname}.stb_1")
|
||||
tdSql.error(f"select diff(col1,-1) from {dbname}.stb_1")
|
||||
tdSql.query(f"select ts,diff(col1),ts from {dbname}.stb_1")
|
||||
tdSql.error(f"select diff(col1, 1),diff(col2) from {dbname}.stb_1")
|
||||
tdSql.error(f"select diff(col1, 1),diff(col2, 0) from {dbname}.stb_1")
|
||||
tdSql.error(f"select diff(col1, 1),diff(col2, 1) from {dbname}.stb_1")
|
||||
tdSql.error(f"select diff(col1, -1) from {dbname}.stb_1")
|
||||
tdSql.error(f"select diff(col1, 4) from {dbname}.stb_1")
|
||||
tdSql.error(f"select diff(col1, 1),diff(col2, 4) from {dbname}.stb_1")
|
||||
|
||||
tdSql.query(f"select diff(col1, 1),diff(col2) from {dbname}.stb_1")
|
||||
tdSql.checkRows(self.rowNum)
|
||||
|
||||
tdSql.query(f"select diff(col1, 1),diff(col2, 0) from {dbname}.stb_1")
|
||||
tdSql.checkRows(self.rowNum)
|
||||
|
||||
tdSql.query(f"select diff(col1, 1),diff(col2, 1) from {dbname}.stb_1")
|
||||
tdSql.checkRows(self.rowNum)
|
||||
|
||||
tdSql.query(f"select diff(ts) from {dbname}.stb_1")
|
||||
tdSql.checkRows(10)
|
||||
|
|
|
@ -172,7 +172,7 @@ class TDTestCase:
|
|||
tdSql.checkRows(90)
|
||||
|
||||
tdSql.query(f"select c1 , diff(c1 , 0) from {dbname}.stb partition by c1")
|
||||
tdSql.checkRows(140)
|
||||
tdSql.checkRows(139)
|
||||
|
||||
tdSql.query(f"select c1 , csum(c1) from {dbname}.stb partition by c1")
|
||||
tdSql.checkRows(100)
|
||||
|
|
Loading…
Reference in New Issue