fix(query): report error if certain function query stable has duplicate
timestamps TD-19892
This commit is contained in:
parent
eec1b0bfb0
commit
2b53f7be84
|
@ -2677,7 +2677,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
{
|
||||
.name = "csum",
|
||||
.type = FUNCTION_TYPE_CSUM,
|
||||
.classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_TIMELINE_FUNC |
|
||||
.classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC |
|
||||
FUNC_MGT_FORBID_STREAM_FUNC | FUNC_MGT_CUMULATIVE_FUNC | FUNC_MGT_KEEP_ORDER_FUNC,
|
||||
.translateFunc = translateCsum,
|
||||
.getEnvFunc = getCsumFuncEnv,
|
||||
|
@ -2690,7 +2690,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
{
|
||||
.name = "mavg",
|
||||
.type = FUNCTION_TYPE_MAVG,
|
||||
.classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_FORBID_STREAM_FUNC,
|
||||
.classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC |
|
||||
FUNC_MGT_FORBID_STREAM_FUNC,
|
||||
.translateFunc = translateMavg,
|
||||
.getEnvFunc = getMavgFuncEnv,
|
||||
.initFunc = mavgFunctionSetup,
|
||||
|
|
|
@ -48,6 +48,9 @@ typedef struct SSumRes {
|
|||
double dsum;
|
||||
};
|
||||
int16_t type;
|
||||
int64_t prevTs; // used for csum only
|
||||
bool isPrevTsSet; //used for csum only
|
||||
|
||||
} SSumRes;
|
||||
|
||||
typedef struct SAvgRes {
|
||||
|
@ -207,6 +210,8 @@ typedef enum {
|
|||
typedef struct SMavgInfo {
|
||||
int32_t pos;
|
||||
double sum;
|
||||
int64_t prevTs;
|
||||
bool isPrevTsSet;
|
||||
int32_t numOfPoints;
|
||||
bool pointsMeet;
|
||||
double points[];
|
||||
|
@ -5008,6 +5013,7 @@ int32_t csumFunction(SqlFunctionCtx* pCtx) {
|
|||
SSumRes* pSumRes = GET_ROWCELL_INTERBUF(pResInfo);
|
||||
|
||||
SInputColumnInfoData* pInput = &pCtx->input;
|
||||
TSKEY* tsList = (int64_t*)pInput->pPTS->pData;
|
||||
|
||||
SColumnInfoData* pInputCol = pInput->pData[0];
|
||||
SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput;
|
||||
|
@ -5016,6 +5022,13 @@ int32_t csumFunction(SqlFunctionCtx* pCtx) {
|
|||
int32_t type = pInputCol->info.type;
|
||||
int32_t startOffset = pCtx->offset;
|
||||
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += 1) {
|
||||
if (pSumRes->isPrevTsSet == true && tsList[i] == pSumRes->prevTs) {
|
||||
return TSDB_CODE_FUNC_DUP_TIMESTAMP;
|
||||
} else {
|
||||
pSumRes->prevTs = tsList[i];
|
||||
}
|
||||
pSumRes->isPrevTsSet = true;
|
||||
|
||||
int32_t pos = startOffset + numOfElems;
|
||||
if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
|
||||
// colDataAppendNULL(pOutput, i);
|
||||
|
@ -5053,7 +5066,8 @@ int32_t csumFunction(SqlFunctionCtx* pCtx) {
|
|||
numOfElems++;
|
||||
}
|
||||
|
||||
return numOfElems;
|
||||
pResInfo->numOfRes = numOfElems;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
bool getMavgFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
|
||||
|
@ -5069,6 +5083,8 @@ bool mavgFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) {
|
|||
SMavgInfo* pInfo = GET_ROWCELL_INTERBUF(pResultInfo);
|
||||
pInfo->pos = 0;
|
||||
pInfo->sum = 0;
|
||||
pInfo->prevTs = -1;
|
||||
pInfo->isPrevTsSet = false;
|
||||
pInfo->numOfPoints = pCtx->param[1].param.i;
|
||||
if (pInfo->numOfPoints < 1 || pInfo->numOfPoints > MAVG_MAX_POINTS_NUM) {
|
||||
return false;
|
||||
|
@ -5083,6 +5099,7 @@ int32_t mavgFunction(SqlFunctionCtx* pCtx) {
|
|||
SMavgInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
||||
|
||||
SInputColumnInfoData* pInput = &pCtx->input;
|
||||
TSKEY* tsList = (int64_t*)pInput->pPTS->pData;
|
||||
|
||||
SColumnInfoData* pInputCol = pInput->pData[0];
|
||||
SColumnInfoData* pTsOutput = pCtx->pTsOutput;
|
||||
|
@ -5092,6 +5109,13 @@ int32_t mavgFunction(SqlFunctionCtx* pCtx) {
|
|||
int32_t type = pInputCol->info.type;
|
||||
int32_t startOffset = pCtx->offset;
|
||||
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += 1) {
|
||||
if (pInfo->isPrevTsSet == true && tsList[i] == pInfo->prevTs) {
|
||||
return TSDB_CODE_FUNC_DUP_TIMESTAMP;
|
||||
} else {
|
||||
pInfo->prevTs = tsList[i];
|
||||
}
|
||||
pInfo->isPrevTsSet = true;
|
||||
|
||||
int32_t pos = startOffset + numOfElems;
|
||||
if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
|
||||
// colDataAppendNULL(pOutput, i);
|
||||
|
@ -5136,7 +5160,8 @@ int32_t mavgFunction(SqlFunctionCtx* pCtx) {
|
|||
}
|
||||
}
|
||||
|
||||
return numOfElems;
|
||||
pResInfo->numOfRes = numOfElems;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static SSampleInfo* getSampleOutputInfo(SqlFunctionCtx* pCtx) {
|
||||
|
|
Loading…
Reference in New Issue