fix(query): report error if certain function query stable has duplicate
timestamps TD-19892
This commit is contained in:
parent
a63e79e334
commit
1cba568604
|
@ -653,7 +653,11 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
|
|||
pfCtx->pDstBlock = pResult;
|
||||
}
|
||||
|
||||
numOfRows = pfCtx->fpSet.process(pfCtx);
|
||||
int32_t code = pfCtx->fpSet.process(pfCtx);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
numOfRows = pResInfo->numOfRes;
|
||||
} else if (fmIsAggFunc(pfCtx->functionId)) {
|
||||
// selective value output should be set during corresponding function execution
|
||||
if (fmIsSelectValueFunc(pfCtx->functionId)) {
|
||||
|
|
|
@ -2640,8 +2640,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
{
|
||||
.name = "diff",
|
||||
.type = FUNCTION_TYPE_DIFF,
|
||||
.classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_KEEP_ORDER_FUNC |
|
||||
FUNC_MGT_FORBID_STREAM_FUNC | FUNC_MGT_CUMULATIVE_FUNC,
|
||||
.classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC |
|
||||
FUNC_MGT_KEEP_ORDER_FUNC | FUNC_MGT_FORBID_STREAM_FUNC | FUNC_MGT_CUMULATIVE_FUNC,
|
||||
.translateFunc = translateDiff,
|
||||
.getEnvFunc = getDiffFuncEnv,
|
||||
.initFunc = diffFunctionSetup,
|
||||
|
|
|
@ -3325,6 +3325,7 @@ bool diffFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) {
|
|||
SDiffInfo* pDiffInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
||||
pDiffInfo->hasPrev = false;
|
||||
pDiffInfo->prev.i64 = 0;
|
||||
pDiffInfo->prevTs = -1;
|
||||
if (pCtx->numOfParams > 1) {
|
||||
pDiffInfo->ignoreNegative = pCtx->param[1].param.i; // TODO set correct param
|
||||
} else {
|
||||
|
@ -3335,7 +3336,7 @@ bool diffFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) {
|
|||
return true;
|
||||
}
|
||||
|
||||
static void doSetPrevVal(SDiffInfo* pDiffInfo, int32_t type, const char* pv) {
|
||||
static void doSetPrevVal(SDiffInfo* pDiffInfo, int32_t type, const char* pv, int64_t ts) {
|
||||
switch (type) {
|
||||
case TSDB_DATA_TYPE_BOOL:
|
||||
pDiffInfo->prev.i64 = *(bool*)pv ? 1 : 0;
|
||||
|
@ -3362,11 +3363,13 @@ static void doSetPrevVal(SDiffInfo* pDiffInfo, int32_t type, const char* pv) {
|
|||
default:
|
||||
ASSERT(0);
|
||||
}
|
||||
pDiffInfo->prevTs = ts;
|
||||
}
|
||||
|
||||
static void doHandleDiff(SDiffInfo* pDiffInfo, int32_t type, const char* pv, SColumnInfoData* pOutput, int32_t pos,
|
||||
int32_t order) {
|
||||
int32_t order, int64_t ts) {
|
||||
int32_t factor = (order == TSDB_ORDER_ASC) ? 1 : -1;
|
||||
pDiffInfo->prevTs = ts;
|
||||
switch (type) {
|
||||
case TSDB_DATA_TYPE_INT: {
|
||||
int32_t v = *(int32_t*)pv;
|
||||
|
@ -3450,6 +3453,8 @@ int32_t diffFunction(SqlFunctionCtx* pCtx) {
|
|||
|
||||
SColumnInfoData* pInputCol = pInput->pData[0];
|
||||
|
||||
TSKEY* tsList = (int64_t*)pInput->pPTS->pData;
|
||||
|
||||
int32_t numOfElems = 0;
|
||||
int32_t startOffset = pCtx->offset;
|
||||
|
||||
|
@ -3471,7 +3476,10 @@ int32_t diffFunction(SqlFunctionCtx* pCtx) {
|
|||
char* pv = colDataGetData(pInputCol, i);
|
||||
|
||||
if (pDiffInfo->hasPrev) {
|
||||
doHandleDiff(pDiffInfo, pInputCol->info.type, pv, pOutput, pos, pCtx->order);
|
||||
if (tsList[i] == pDiffInfo->prevTs) {
|
||||
return TSDB_CODE_FUNC_DUP_TIMESTAMP;
|
||||
}
|
||||
doHandleDiff(pDiffInfo, pInputCol->info.type, pv, pOutput, pos, pCtx->order, tsList[i]);
|
||||
// handle selectivity
|
||||
if (pCtx->subsidiaries.num > 0) {
|
||||
appendSelectivityValue(pCtx, i, pos);
|
||||
|
@ -3479,7 +3487,7 @@ int32_t diffFunction(SqlFunctionCtx* pCtx) {
|
|||
|
||||
numOfElems++;
|
||||
} else {
|
||||
doSetPrevVal(pDiffInfo, pInputCol->info.type, pv);
|
||||
doSetPrevVal(pDiffInfo, pInputCol->info.type, pv, tsList[i]);
|
||||
}
|
||||
|
||||
pDiffInfo->hasPrev = true;
|
||||
|
@ -3501,7 +3509,10 @@ int32_t diffFunction(SqlFunctionCtx* pCtx) {
|
|||
|
||||
// there is a row of previous data block to be handled in the first place.
|
||||
if (pDiffInfo->hasPrev) {
|
||||
doHandleDiff(pDiffInfo, pInputCol->info.type, pv, pOutput, pos, pCtx->order);
|
||||
if (tsList[i] == pDiffInfo->prevTs) {
|
||||
return TSDB_CODE_FUNC_DUP_TIMESTAMP;
|
||||
}
|
||||
doHandleDiff(pDiffInfo, pInputCol->info.type, pv, pOutput, pos, pCtx->order, tsList[i]);
|
||||
// handle selectivity
|
||||
if (pCtx->subsidiaries.num > 0) {
|
||||
appendSelectivityValue(pCtx, i, pos);
|
||||
|
@ -3509,15 +3520,15 @@ int32_t diffFunction(SqlFunctionCtx* pCtx) {
|
|||
|
||||
numOfElems++;
|
||||
} else {
|
||||
doSetPrevVal(pDiffInfo, pInputCol->info.type, pv);
|
||||
doSetPrevVal(pDiffInfo, pInputCol->info.type, pv, tsList[i]);
|
||||
}
|
||||
|
||||
pDiffInfo->hasPrev = true;
|
||||
}
|
||||
}
|
||||
|
||||
// initial value is not set yet
|
||||
return numOfElems;
|
||||
pResInfo->numOfRes = numOfElems;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t getTopBotInfoSize(int64_t numOfItems) { return sizeof(STopBotRes) + numOfItems * sizeof(STopBotResItem); }
|
||||
|
@ -6137,6 +6148,9 @@ int32_t derivativeFunction(SqlFunctionCtx* pCtx) {
|
|||
if (!pDerivInfo->valueSet) { // initial value is not set yet
|
||||
pDerivInfo->valueSet = true;
|
||||
} else {
|
||||
if (tsList[i] == pDerivInfo->prevTs) {
|
||||
return TSDB_CODE_FUNC_DUP_TIMESTAMP;
|
||||
}
|
||||
double r = ((v - pDerivInfo->prevValue) * pDerivInfo->tsWindow) / (tsList[i] - pDerivInfo->prevTs);
|
||||
if (pDerivInfo->ignoreNegative && r < 0) {
|
||||
} else {
|
||||
|
@ -6175,6 +6189,9 @@ int32_t derivativeFunction(SqlFunctionCtx* pCtx) {
|
|||
if (!pDerivInfo->valueSet) { // initial value is not set yet
|
||||
pDerivInfo->valueSet = true;
|
||||
} else {
|
||||
if (tsList[i] == pDerivInfo->prevTs) {
|
||||
return TSDB_CODE_FUNC_DUP_TIMESTAMP;
|
||||
}
|
||||
double r = ((pDerivInfo->prevValue - v) * pDerivInfo->tsWindow) / (pDerivInfo->prevTs - tsList[i]);
|
||||
if (pDerivInfo->ignoreNegative && r < 0) {
|
||||
} else {
|
||||
|
@ -6202,7 +6219,9 @@ int32_t derivativeFunction(SqlFunctionCtx* pCtx) {
|
|||
}
|
||||
}
|
||||
|
||||
return numOfElems;
|
||||
pResInfo->numOfRes = numOfElems;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
bool getIrateFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
||||
|
@ -6267,11 +6286,15 @@ int32_t irateFunction(SqlFunctionCtx* pCtx) {
|
|||
pRateInfo->lastKey = tsList[i];
|
||||
|
||||
continue;
|
||||
} else if (tsList[i] == pRateInfo->lastKey) {
|
||||
return TSDB_CODE_FUNC_DUP_TIMESTAMP;
|
||||
}
|
||||
|
||||
if ((INT64_MIN == pRateInfo->firstKey) || tsList[i] > pRateInfo->firstKey) {
|
||||
pRateInfo->firstValue = v;
|
||||
pRateInfo->firstKey = tsList[i];
|
||||
} else if (tsList[i] == pRateInfo->firstKey) {
|
||||
return TSDB_CODE_FUNC_DUP_TIMESTAMP;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue