fix(query): report error if certain function query stable has duplicate
timestamps TD-19892
This commit is contained in:
parent
1cba568604
commit
eec1b0bfb0
|
@ -2653,7 +2653,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
{
|
{
|
||||||
.name = "statecount",
|
.name = "statecount",
|
||||||
.type = FUNCTION_TYPE_STATE_COUNT,
|
.type = FUNCTION_TYPE_STATE_COUNT,
|
||||||
.classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_FORBID_STREAM_FUNC,
|
.classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC |
|
||||||
|
FUNC_MGT_FORBID_STREAM_FUNC,
|
||||||
.translateFunc = translateStateCount,
|
.translateFunc = translateStateCount,
|
||||||
.getEnvFunc = getStateFuncEnv,
|
.getEnvFunc = getStateFuncEnv,
|
||||||
.initFunc = functionSetup,
|
.initFunc = functionSetup,
|
||||||
|
@ -2664,7 +2665,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
{
|
{
|
||||||
.name = "stateduration",
|
.name = "stateduration",
|
||||||
.type = FUNCTION_TYPE_STATE_DURATION,
|
.type = FUNCTION_TYPE_STATE_DURATION,
|
||||||
.classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC | FUNC_MGT_FORBID_STREAM_FUNC,
|
.classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC |
|
||||||
|
FUNC_MGT_FORBID_STREAM_FUNC,
|
||||||
.translateFunc = translateStateDuration,
|
.translateFunc = translateStateDuration,
|
||||||
.getEnvFunc = getStateFuncEnv,
|
.getEnvFunc = getStateFuncEnv,
|
||||||
.initFunc = functionSetup,
|
.initFunc = functionSetup,
|
||||||
|
@ -2675,7 +2677,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
{
|
{
|
||||||
.name = "csum",
|
.name = "csum",
|
||||||
.type = FUNCTION_TYPE_CSUM,
|
.type = FUNCTION_TYPE_CSUM,
|
||||||
.classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_TIMELINE_FUNC |
|
.classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_TIMELINE_FUNC |
|
||||||
FUNC_MGT_FORBID_STREAM_FUNC | FUNC_MGT_CUMULATIVE_FUNC | FUNC_MGT_KEEP_ORDER_FUNC,
|
FUNC_MGT_FORBID_STREAM_FUNC | FUNC_MGT_CUMULATIVE_FUNC | FUNC_MGT_KEEP_ORDER_FUNC,
|
||||||
.translateFunc = translateCsum,
|
.translateFunc = translateCsum,
|
||||||
.getEnvFunc = getCsumFuncEnv,
|
.getEnvFunc = getCsumFuncEnv,
|
||||||
|
|
|
@ -190,6 +190,8 @@ typedef struct SStateInfo {
|
||||||
int64_t count;
|
int64_t count;
|
||||||
int64_t durationStart;
|
int64_t durationStart;
|
||||||
};
|
};
|
||||||
|
int64_t prevTs;
|
||||||
|
bool isPrevTsSet;
|
||||||
} SStateInfo;
|
} SStateInfo;
|
||||||
|
|
||||||
typedef enum {
|
typedef enum {
|
||||||
|
@ -4879,6 +4881,7 @@ int32_t stateCountFunction(SqlFunctionCtx* pCtx) {
|
||||||
SStateInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
SStateInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
||||||
|
|
||||||
SInputColumnInfoData* pInput = &pCtx->input;
|
SInputColumnInfoData* pInput = &pCtx->input;
|
||||||
|
TSKEY* tsList = (int64_t*)pInput->pPTS->pData;
|
||||||
|
|
||||||
SColumnInfoData* pInputCol = pInput->pData[0];
|
SColumnInfoData* pInputCol = pInput->pData[0];
|
||||||
|
|
||||||
|
@ -4891,7 +4894,15 @@ int32_t stateCountFunction(SqlFunctionCtx* pCtx) {
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += 1) {
|
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += 1) {
|
||||||
|
if (pInfo->isPrevTsSet == true && tsList[i] == pInfo->prevTs) {
|
||||||
|
return TSDB_CODE_FUNC_DUP_TIMESTAMP;
|
||||||
|
} else {
|
||||||
|
pInfo->prevTs = tsList[i];
|
||||||
|
}
|
||||||
|
|
||||||
|
pInfo->isPrevTsSet = true;
|
||||||
numOfElems++;
|
numOfElems++;
|
||||||
|
|
||||||
if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
|
if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
|
||||||
colDataAppendNULL(pOutput, i);
|
colDataAppendNULL(pOutput, i);
|
||||||
// handle selectivity
|
// handle selectivity
|
||||||
|
@ -4917,7 +4928,8 @@ int32_t stateCountFunction(SqlFunctionCtx* pCtx) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return numOfElems;
|
pResInfo->numOfRes = numOfElems;
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t stateDurationFunction(SqlFunctionCtx* pCtx) {
|
int32_t stateDurationFunction(SqlFunctionCtx* pCtx) {
|
||||||
|
@ -4940,11 +4952,19 @@ int32_t stateDurationFunction(SqlFunctionCtx* pCtx) {
|
||||||
|
|
||||||
int8_t op = getStateOpType(varDataVal(pCtx->param[1].param.pz));
|
int8_t op = getStateOpType(varDataVal(pCtx->param[1].param.pz));
|
||||||
if (STATE_OPER_INVALID == op) {
|
if (STATE_OPER_INVALID == op) {
|
||||||
return 0;
|
return TSDB_CODE_INVALID_PARA;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += 1) {
|
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += 1) {
|
||||||
|
if (pInfo->isPrevTsSet == true && tsList[i] == pInfo->prevTs) {
|
||||||
|
return TSDB_CODE_FUNC_DUP_TIMESTAMP;
|
||||||
|
} else {
|
||||||
|
pInfo->prevTs = tsList[i];
|
||||||
|
}
|
||||||
|
|
||||||
|
pInfo->isPrevTsSet = true;
|
||||||
numOfElems++;
|
numOfElems++;
|
||||||
|
|
||||||
if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
|
if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
|
||||||
colDataAppendNULL(pOutput, i);
|
colDataAppendNULL(pOutput, i);
|
||||||
// handle selectivity
|
// handle selectivity
|
||||||
|
@ -4974,7 +4994,8 @@ int32_t stateDurationFunction(SqlFunctionCtx* pCtx) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return numOfElems;
|
pResInfo->numOfRes = numOfElems;
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool getCsumFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
|
bool getCsumFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
|
||||||
|
|
Loading…
Reference in New Issue