From d4c783b5901db4d509ad63825e0767ce81c62918 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Wed, 26 Apr 2023 09:48:43 +0800 Subject: [PATCH] check duplicate timestamps --- source/libs/executor/src/timesliceoperator.c | 31 ++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/source/libs/executor/src/timesliceoperator.c b/source/libs/executor/src/timesliceoperator.c index f0e25d8cc5..4d519f82f1 100644 --- a/source/libs/executor/src/timesliceoperator.c +++ b/source/libs/executor/src/timesliceoperator.c @@ -36,6 +36,8 @@ typedef struct STimeSliceOperatorInfo { SColumn tsCol; // primary timestamp column SExprSupp scalarSup; // scalar calculation struct SFillColInfo* pFillColInfo; // fill column info + int64_t prevTs; + bool prevTsSet; } STimeSliceOperatorInfo; static void destroyTimeSliceOperatorInfo(void* param); @@ -166,6 +168,28 @@ static bool isIsfilledPseudoColumn(SExprInfo* pExprInfo) { return (IS_BOOLEAN_TYPE(pExprInfo->base.resSchema.type) && strcasecmp(name, "_isfilled") == 0); } +static bool checkDuplicateTimestamps(STimeSliceOperatorInfo* pSliceInfo, SColumnInfoData* pTsCol, + int32_t curIndex, int32_t rows) { + + + int64_t currentTs = *(int64_t*)colDataGetData(pTsCol, curIndex); + if ((pSliceInfo->prevTsSet == true) && (currentTs == pSliceInfo->prevTs)) { + return true; + } + + pSliceInfo->prevTsSet = true; + pSliceInfo->prevTs = currentTs; + + if (curIndex < rows - 1) { + int64_t nextTs = *(int64_t*)colDataGetData(pTsCol, curIndex + 1); + if (currentTs == nextTs) { + return true; + } + } + + return false; +} + static bool genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp* pExprSup, SSDataBlock* pResBlock, bool beforeTs) { int32_t rows = pResBlock->info.rows; @@ -472,6 +496,11 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { for (int32_t i = 0; i < pBlock->info.rows; ++i) { int64_t ts = *(int64_t*)colDataGetData(pTsCol, i); + // check for duplicate timestamps + if (checkDuplicateTimestamps(pSliceInfo, pTsCol, i, pBlock->info.rows)) { + T_LONG_JMP(pTaskInfo->env, TSDB_CODE_FUNC_DUP_TIMESTAMP); + } + if (pSliceInfo->current > pSliceInfo->win.ekey) { setOperatorCompleted(pOperator); break; @@ -612,6 +641,8 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode pInfo->win = pInterpPhyNode->timeRange; pInfo->interval.interval = pInterpPhyNode->interval; pInfo->current = pInfo->win.skey; + pInfo->prevTsSet = false; + pInfo->prevTs = 0; if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) { STableScanInfo* pScanInfo = (STableScanInfo*)downstream->info;