From 3fba8182a5ab831980b4ddddb8b755ae78e63666 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 20 Jul 2022 13:02:33 +0800 Subject: [PATCH 1/4] fix(query): fix time window generating bug. --- source/common/src/ttime.c | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/source/common/src/ttime.c b/source/common/src/ttime.c index 944ee6a731..9b044d6e93 100644 --- a/source/common/src/ttime.c +++ b/source/common/src/ttime.c @@ -844,11 +844,14 @@ int64_t taosTimeTruncate(int64_t t, const SInterval* pInterval, int32_t precisio } else { // try to move current window to the left-hande-side, due to the offset effect. int64_t end = taosTimeAdd(start, pInterval->interval, pInterval->intervalUnit, precision) - 1; - ASSERT(end >= t); - end = taosTimeAdd(end, -pInterval->sliding, pInterval->slidingUnit, precision); - if (end >= t) { - start = taosTimeAdd(start, -pInterval->sliding, pInterval->slidingUnit, precision); + + int64_t newEnd = end; + while(newEnd >= t) { + end = newEnd; + newEnd = taosTimeAdd(newEnd, -pInterval->sliding, pInterval->slidingUnit, precision); } + + start = taosTimeAdd(end, -pInterval->interval + 1, pInterval->intervalUnit, precision); } } From facf3c86489667465391bbc9d9ef68bf682fa45e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 20 Jul 2022 14:07:48 +0800 Subject: [PATCH 2/4] fix(query): add limit/offset for order by operator. --- source/libs/executor/inc/executorimpl.h | 6 +- source/libs/executor/src/cachescanoperator.c | 2 +- source/libs/executor/src/executorimpl.c | 18 +++--- source/libs/executor/src/groupoperator.c | 2 +- source/libs/executor/src/joinoperator.c | 2 +- source/libs/executor/src/scanoperator.c | 6 +- source/libs/executor/src/sortoperator.c | 58 +++++++++++++------ source/libs/executor/src/timewindowoperator.c | 20 +++---- 8 files changed, 67 insertions(+), 47 deletions(-) diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index d7c283c70d..9142af4a6d 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -744,8 +744,8 @@ typedef struct SSortOperatorInfo { int64_t startTs; // sort start time uint64_t sortElapsed; // sort elapsed time, time to flush to disk not included. - - SNode* pCondition; + SLimitInfo limitInfo; + SNode* pCondition; } SSortOperatorInfo; typedef struct STagFilterOperatorInfo { @@ -785,7 +785,7 @@ int32_t initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr); void cleanupExprSupp(SExprSupp* pSup); int32_t initAggInfo(SExprSupp *pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize, const char* pkey); -void initResultSizeInfo(SOperatorInfo* pOperator, int32_t numOfRows); +void initResultSizeInfo(SResultInfo * pResultInfo, int32_t numOfRows); void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo, SDiskbasedBuf* pBuf); int32_t handleLimitOffset(SOperatorInfo *pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock, bool holdDataInBuf); bool hasLimitOffsetInfo(SLimitInfo* pLimitInfo); diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index c46485a332..56a5e253d8 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -50,7 +50,7 @@ SOperatorInfo* createLastrowScanOperator(SLastRowScanPhysiNode* pScanNode, SRead STableListInfo* pTableList = &pTaskInfo->tableqinfoList; - initResultSizeInfo(pOperator, 1024); + initResultSizeInfo(&pOperator->resultInfo, 1024); blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); pInfo->pUidList = taosArrayInit(4, sizeof(int64_t)); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 0036a3f080..df039bc0e2 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -3601,13 +3601,13 @@ int32_t initAggInfo(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInf return TSDB_CODE_SUCCESS; } -void initResultSizeInfo(SOperatorInfo* pOperator, int32_t numOfRows) { +void initResultSizeInfo(SResultInfo * pResultInfo, int32_t numOfRows) { ASSERT(numOfRows != 0); - pOperator->resultInfo.capacity = numOfRows; - pOperator->resultInfo.threshold = numOfRows * 0.75; + pResultInfo->capacity = numOfRows; + pResultInfo->threshold = numOfRows * 0.75; - if (pOperator->resultInfo.threshold == 0) { - pOperator->resultInfo.threshold = numOfRows; + if (pResultInfo->threshold == 0) { + pResultInfo->threshold = numOfRows; } } @@ -3670,7 +3670,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* int32_t numOfRows = 1024; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; - initResultSizeInfo(pOperator, numOfRows); + initResultSizeInfo(&pOperator->resultInfo, numOfRows); int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str); if (code != TSDB_CODE_SUCCESS) { goto _error; @@ -3825,7 +3825,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys if (numOfRows * pResBlock->info.rowSize > TWOMB) { numOfRows = TWOMB / pResBlock->info.rowSize; } - initResultSizeInfo(pOperator, numOfRows); + initResultSizeInfo(&pOperator->resultInfo, numOfRows); initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str); initBasicInfo(&pInfo->binfo, pResBlock); @@ -4003,7 +4003,7 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy numOfRows = TWOMB / pResBlock->info.rowSize; } - initResultSizeInfo(pOperator, numOfRows); + initResultSizeInfo(&pOperator->resultInfo, numOfRows); initAggInfo(pSup, &pInfo->aggSup, pExprInfo, numOfExpr, keyBufSize, pTaskInfo->id.str); initBasicInfo(&pInfo->binfo, pResBlock); @@ -4080,7 +4080,7 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* int32_t type = convertFillType(pPhyFillNode->mode); SResultInfo* pResultInfo = &pOperator->resultInfo; - initResultSizeInfo(pOperator, 4096); + initResultSizeInfo(&pOperator->resultInfo, 4096); pInfo->primaryTsCol = ((SColumnNode*)pPhyFillNode->pWStartTs)->slotId; int32_t numOfOutputCols = 0; diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 2964948e70..20630fd6ff 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -406,7 +406,7 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx goto _error; } - initResultSizeInfo(pOperator, 4096); + initResultSizeInfo(&pOperator->resultInfo, 4096); initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, pInfo->groupKeyLen, pTaskInfo->id.str); initBasicInfo(&pInfo->binfo, pResultBlock); initResultRowInfo(&pInfo->binfo.resultRowInfo); diff --git a/source/libs/executor/src/joinoperator.c b/source/libs/executor/src/joinoperator.c index b864fae47f..497de8347c 100644 --- a/source/libs/executor/src/joinoperator.c +++ b/source/libs/executor/src/joinoperator.c @@ -41,7 +41,7 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t int32_t numOfCols = 0; SExprInfo* pExprInfo = createExprInfo(pJoinNode->pTargets, NULL, &numOfCols); - initResultSizeInfo(pOperator, 4096); + initResultSizeInfo(&pOperator->resultInfo, 4096); pInfo->pRes = pResBlock; pOperator->name = "MergeJoinOperator"; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index dc44de9cf3..1f77dfb093 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2299,7 +2299,7 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan pInfo->pCondition = pScanNode->node.pConditions; pInfo->scanCols = colList; - initResultSizeInfo(pOperator, 4096); + initResultSizeInfo(&pOperator->resultInfo, 4096); tNameAssign(&pInfo->name, &pScanNode->tableName); const char* name = tNameGetTableName(&pInfo->name); @@ -2529,7 +2529,7 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi pOperator->info = pInfo; pOperator->pTaskInfo = pTaskInfo; - initResultSizeInfo(pOperator, 4096); + initResultSizeInfo(&pOperator->resultInfo, 4096); blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); pOperator->fpSet = @@ -3073,7 +3073,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN pOperator->info = pInfo; pOperator->exprSupp.numOfExprs = numOfCols; pOperator->pTaskInfo = pTaskInfo; - initResultSizeInfo(pOperator, 1024); + initResultSizeInfo(&pOperator->resultInfo, 1024); pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableMergeScan, NULL, NULL, destroyTableMergeScanOperatorInfo, NULL, diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index a3b79d9597..f019ee94b8 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -26,7 +26,7 @@ static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput); SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortNode, SExecTaskInfo* pTaskInfo) { SSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); - if (pInfo == NULL || pOperator == NULL /* || rowSize > 100 * 1024 * 1024*/) { + if (pInfo == NULL || pOperator == NULL) { goto _error; } @@ -41,13 +41,15 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* extractColMatchInfo(pSortNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID); pOperator->exprSupp.pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pOperator->exprSupp.rowEntryInfoOffset); + + initResultSizeInfo(&pOperator->resultInfo, 1024); + pInfo->binfo.pRes = pResBlock; - - initResultSizeInfo(pOperator, 1024); - - pInfo->pSortInfo = createSortInfo(pSortNode->pSortKeys); + pInfo->pSortInfo = createSortInfo(pSortNode->pSortKeys); pInfo->pCondition = pSortNode->node.pConditions; pInfo->pColMatchInfo = pColMatchColInfo; + initLimitInfo(pSortNode->node.pLimit, pSortNode->node.pSlimit, &pInfo->limitInfo); + pOperator->name = "SortOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SORT; pOperator->blocking = true; @@ -208,26 +210,44 @@ SSDataBlock* doSort(SOperatorInfo* pOperator) { SSDataBlock* pBlock = NULL; while (1) { pBlock = getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity, - pInfo->pColMatchInfo, pInfo); - if (pBlock != NULL) { - doFilter(pInfo->pCondition, pBlock); - } - + pInfo->pColMatchInfo, pInfo); if (pBlock == NULL) { doSetOperatorCompleted(pOperator); - break; + return NULL; } - if (blockDataGetNumOfRows(pBlock) > 0) { + doFilter(pInfo->pCondition, pBlock); + if (blockDataGetNumOfRows(pBlock) == 0) { + continue; + } + + // todo add the limit/offset info + if (pInfo->limitInfo.remainOffset > 0) { + if (pInfo->limitInfo.remainOffset >= blockDataGetNumOfRows(pBlock)) { + pInfo->limitInfo.remainOffset -= pBlock->info.rows; + continue; + } + + blockDataTrimFirstNRows(pBlock, pInfo->limitInfo.remainOffset); + pInfo->limitInfo.remainOffset = 0; + } + + if (pInfo->limitInfo.limit.limit > 0 && + pInfo->limitInfo.limit.limit <= pInfo->limitInfo.numOfOutputRows + blockDataGetNumOfRows(pBlock)) { + int32_t remain = pInfo->limitInfo.limit.limit - pInfo->limitInfo.numOfOutputRows; + blockDataKeepFirstNRows(pBlock, remain); + } + + size_t numOfRows = blockDataGetNumOfRows(pBlock); + pInfo->limitInfo.numOfOutputRows += numOfRows; + pOperator->resultInfo.totalRows += numOfRows; + + if (numOfRows > 0) { break; } } - if (pBlock != NULL) { - pOperator->resultInfo.totalRows += pBlock->info.rows; - } - - return pBlock; + return blockDataGetNumOfRows(pBlock) > 0? pBlock:NULL; } void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) { @@ -479,7 +499,7 @@ SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSort pOperator->exprSupp.pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pOperator->exprSupp.rowEntryInfoOffset); pInfo->binfo.pRes = pResBlock; - initResultSizeInfo(pOperator, 1024); + initResultSizeInfo(&pOperator->resultInfo, 1024); pInfo->pSortInfo = createSortInfo(pSortPhyNode->pSortKeys); ; @@ -711,7 +731,7 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size extractColMatchInfo(pMergePhyNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID); SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0); SSDataBlock* pInputBlock = createResDataBlock(pChildNode->pOutputDataBlockDesc); - initResultSizeInfo(pOperator, 1024); + initResultSizeInfo(&pOperator->resultInfo, 1024); pInfo->groupSort = pMergePhyNode->groupSort; pInfo->binfo.pRes = pResBlock; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 3933ded7a9..042f807271 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1683,7 +1683,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* SExprSupp* pSup = &pOperator->exprSupp; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; - initResultSizeInfo(pOperator, 4096); + initResultSizeInfo(&pOperator->resultInfo, 4096); int32_t code = initAggInfo(pSup, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str); initBasicInfo(&pInfo->binfo, pResBlock); @@ -1764,7 +1764,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExpr int32_t numOfRows = 4096; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; - initResultSizeInfo(pOperator, numOfRows); + initResultSizeInfo(&pOperator->resultInfo, numOfRows); int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str); initBasicInfo(&pInfo->binfo, pResBlock); initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pInfo->win); @@ -2224,7 +2224,7 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode pInfo->tsCol = extractColumnFromColumnNode((SColumnNode*)pInterpPhyNode->pTimeSeries); pInfo->fillType = convertFillType(pInterpPhyNode->fillMode); - initResultSizeInfo(pOperator, 4096); + initResultSizeInfo(&pOperator->resultInfo, 4096); pInfo->pFillColInfo = createFillColInfo(pExprInfo, numOfExprs, (SNodeListNode*)pInterpPhyNode->pFillValues); pInfo->pRes = createResDataBlock(pPhyNode->pOutputDataBlockDesc); @@ -2272,7 +2272,7 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInf size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; - initResultSizeInfo(pOperator, 4096); + initResultSizeInfo(&pOperator->resultInfo, 4096); initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExpr, numOfCols, keyBufSize, pTaskInfo->id.str); initBasicInfo(&pInfo->binfo, pResBlock); @@ -2320,7 +2320,7 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo } size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; - initResultSizeInfo(pOperator, 4096); + initResultSizeInfo(&pOperator->resultInfo, 4096); int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str); if (code != TSDB_CODE_SUCCESS) { @@ -2896,7 +2896,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, ASSERT(pInfo->twAggSup.calTrigger != STREAM_TRIGGER_MAX_DELAY); pInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; - initResultSizeInfo(pOperator, 4096); + initResultSizeInfo(&pOperator->resultInfo, 4096); if (pIntervalPhyNode->window.pExprs != NULL) { int32_t numOfScalar = 0; SExprInfo* pScalarExprInfo = createExprInfo(pIntervalPhyNode->window.pExprs, NULL, &numOfScalar); @@ -3072,7 +3072,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh goto _error; } - initResultSizeInfo(pOperator, 4096); + initResultSizeInfo(&pOperator->resultInfo, 4096); if (pSessionNode->window.pExprs != NULL) { int32_t numOfScalar = 0; SExprInfo* pScalarExprInfo = createExprInfo(pSessionNode->window.pExprs, NULL, &numOfScalar); @@ -4336,7 +4336,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys SExprInfo* pExprInfo = createExprInfo(pStateNode->window.pFuncs, NULL, &numOfCols); pInfo->stateCol = extractColumnFromColumnNode(pColNode); - initResultSizeInfo(pOperator, 4096); + initResultSizeInfo(&pOperator->resultInfo, 4096); if (pStateNode->window.pExprs != NULL) { int32_t numOfScalar = 0; SExprInfo* pScalarExprInfo = createExprInfo(pStateNode->window.pExprs, NULL, &numOfScalar); @@ -4586,7 +4586,7 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, iaInfo->primaryTsIndex = primaryTsSlotId; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; - initResultSizeInfo(pOperator, 4096); + initResultSizeInfo(&pOperator->resultInfo, 4096); int32_t code = initAggInfo(&pOperator->exprSupp, &iaInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str); @@ -4892,7 +4892,7 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprI SExprSupp* pExprSupp = &pOperator->exprSupp; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; - initResultSizeInfo(pOperator, 4096); + initResultSizeInfo(&pOperator->resultInfo, 4096); int32_t code = initAggInfo(pExprSupp, &iaInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str); initBasicInfo(&iaInfo->binfo, pResBlock); From df58a9bb30341843081ad3cc3a4072423665eaee Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 20 Jul 2022 15:30:52 +0800 Subject: [PATCH 3/4] fix(query): handle fraction of ts in add ts offset. --- source/common/src/ttime.c | 6 ++++-- source/libs/executor/inc/executorimpl.h | 2 +- source/libs/executor/src/executil.c | 4 ++-- source/libs/executor/src/executorimpl.c | 17 +++++++++-------- source/libs/executor/src/scanoperator.c | 4 ++-- tests/script/tsim/query/interval-offset.sim | 1 + 6 files changed, 19 insertions(+), 15 deletions(-) diff --git a/source/common/src/ttime.c b/source/common/src/ttime.c index 9b044d6e93..8f150441f9 100644 --- a/source/common/src/ttime.c +++ b/source/common/src/ttime.c @@ -700,6 +700,8 @@ int64_t taosTimeAdd(int64_t t, int64_t duration, char unit, int32_t precision) { numOfMonth *= 12; } + int64_t fraction = t % TSDB_TICK_PER_SECOND(precision); + struct tm tm; time_t tt = (time_t)(t / TSDB_TICK_PER_SECOND(precision)); taosLocalTime(&tt, &tm); @@ -707,7 +709,7 @@ int64_t taosTimeAdd(int64_t t, int64_t duration, char unit, int32_t precision) { tm.tm_year = mon / 12; tm.tm_mon = mon % 12; - return (int64_t)(taosMktime(&tm) * TSDB_TICK_PER_SECOND(precision)); + return (int64_t)(taosMktime(&tm) * TSDB_TICK_PER_SECOND(precision) + fraction); } int64_t taosTimeSub(int64_t t, int64_t duration, char unit, int32_t precision) { @@ -851,7 +853,7 @@ int64_t taosTimeTruncate(int64_t t, const SInterval* pInterval, int32_t precisio newEnd = taosTimeAdd(newEnd, -pInterval->sliding, pInterval->slidingUnit, precision); } - start = taosTimeAdd(end, -pInterval->interval + 1, pInterval->intervalUnit, precision); + start = taosTimeAdd(end, -pInterval->interval, pInterval->intervalUnit, precision) + 1; } } diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 9142af4a6d..2cc4058b3b 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -797,7 +797,7 @@ void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWin int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadInfo, int32_t numOfRows, char* pData, int32_t compLen, int32_t numOfOutput, int64_t startTs, uint64_t* total, SArray* pColList); -void getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key, STimeWindow* win); +STimeWindow getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key); STimeWindow getFirstQualifiedTimeWindow(int64_t ts, STimeWindow* pWindow, SInterval* pInterval, int32_t order); int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t *order, int32_t* scanFlag); diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 978bef1607..60799e0528 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -824,10 +824,10 @@ int32_t convertFillType(int32_t mode) { static void getInitialStartTimeWindow(SInterval* pInterval, TSKEY ts, STimeWindow* w, bool ascQuery) { if (ascQuery) { - getAlignQueryTimeWindow(pInterval, pInterval->precision, ts, w); + *w = getAlignQueryTimeWindow(pInterval, pInterval->precision, ts); } else { // the start position of the first time window in the endpoint that spreads beyond the queried last timestamp - getAlignQueryTimeWindow(pInterval, pInterval->precision, ts, w); + *w = getAlignQueryTimeWindow(pInterval, pInterval->precision, ts); int64_t key = w->skey; while (key < ts) { // moving towards end diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index df039bc0e2..44d1cc0965 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -834,18 +834,20 @@ bool isTaskKilled(SExecTaskInfo* pTaskInfo) { void setTaskKilled(SExecTaskInfo* pTaskInfo) { pTaskInfo->code = TSDB_CODE_TSC_QUERY_CANCELLED; } ///////////////////////////////////////////////////////////////////////////////////////////// -// todo refactor : return window -void getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key, STimeWindow* win) { - win->skey = taosTimeTruncate(key, pInterval, precision); +STimeWindow getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key) { + STimeWindow win = {0}; + win.skey = taosTimeTruncate(key, pInterval, precision); /* * if the realSkey > INT64_MAX - pInterval->interval, the query duration between * realSkey and realEkey must be less than one interval.Therefore, no need to adjust the query ranges. */ - win->ekey = taosTimeAdd(win->skey, pInterval->interval, pInterval->intervalUnit, precision) - 1; - if (win->ekey < win->skey) { - win->ekey = INT64_MAX; + win.ekey = taosTimeAdd(win.skey, pInterval->interval, pInterval->intervalUnit, precision) - 1; + if (win.ekey < win.skey) { + win.ekey = INT64_MAX; } + + return win; } #if 0 @@ -4042,8 +4044,7 @@ static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t STimeWindow win, int32_t capacity, const char* id, SInterval* pInterval, int32_t fillType) { SFillColInfo* pColInfo = createFillColInfo(pExpr, numOfCols, pValNode); - STimeWindow w = TSWINDOW_INITIALIZER; - getAlignQueryTimeWindow(pInterval, pInterval->precision, win.skey, &w); + STimeWindow w = getAlignQueryTimeWindow(pInterval, pInterval->precision, win.skey); w = getFirstQualifiedTimeWindow(win.skey, &w, pInterval, TSDB_ORDER_ASC); int32_t order = TSDB_ORDER_ASC; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 1f77dfb093..ae5996c2fa 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -125,7 +125,7 @@ static bool overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockIn } if (order == TSDB_ORDER_ASC) { - getAlignQueryTimeWindow(pInterval, pInterval->precision, pBlockInfo->window.skey, &w); + w = getAlignQueryTimeWindow(pInterval, pInterval->precision, pBlockInfo->window.skey); assert(w.ekey >= pBlockInfo->window.skey); if (w.ekey < pBlockInfo->window.ekey) { @@ -144,7 +144,7 @@ static bool overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockIn } } } else { - getAlignQueryTimeWindow(pInterval, pInterval->precision, pBlockInfo->window.ekey, &w); + w = getAlignQueryTimeWindow(pInterval, pInterval->precision, pBlockInfo->window.ekey); assert(w.skey <= pBlockInfo->window.ekey); if (w.skey > pBlockInfo->window.skey) { diff --git a/tests/script/tsim/query/interval-offset.sim b/tests/script/tsim/query/interval-offset.sim index b6dffb5fe3..1399be7b53 100644 --- a/tests/script/tsim/query/interval-offset.sim +++ b/tests/script/tsim/query/interval-offset.sim @@ -185,6 +185,7 @@ print ===> rows1: $data10 $data11 $data12 $data13 $data14 print ===> rows2: $data20 $data21 $data22 $data23 $data24 print ===> rows3: $data30 $data31 $data32 $data33 $data34 if $rows != 4 then + print expect 4, actual: $rows return -1 endi if $data00 != @21-12-08 00:00:00.000@ then From fc7887e6b7b957648275f93fdaef3f57c867e6d2 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 20 Jul 2022 17:24:49 +0800 Subject: [PATCH 4/4] fix(query): update the sim. --- include/common/ttime.h | 1 - source/common/src/ttime.c | 26 -------------------------- source/libs/scalar/src/sclvector.c | 2 +- tests/script/tsim/scalar/scalar.sim | 9 ++++++--- 4 files changed, 7 insertions(+), 31 deletions(-) diff --git a/include/common/ttime.h b/include/common/ttime.h index 2f4129f979..8f4f4f15d8 100644 --- a/include/common/ttime.h +++ b/include/common/ttime.h @@ -73,7 +73,6 @@ static FORCE_INLINE int64_t taosGetTimestampToday(int32_t precision) { } int64_t taosTimeAdd(int64_t t, int64_t duration, char unit, int32_t precision); -int64_t taosTimeSub(int64_t t, int64_t duration, char unit, int32_t precision); int64_t taosTimeTruncate(int64_t t, const SInterval* pInterval, int32_t precision); int32_t taosTimeCountInterval(int64_t skey, int64_t ekey, int64_t interval, char unit, int32_t precision); diff --git a/source/common/src/ttime.c b/source/common/src/ttime.c index 8f150441f9..b1e4321053 100644 --- a/source/common/src/ttime.c +++ b/source/common/src/ttime.c @@ -712,32 +712,6 @@ int64_t taosTimeAdd(int64_t t, int64_t duration, char unit, int32_t precision) { return (int64_t)(taosMktime(&tm) * TSDB_TICK_PER_SECOND(precision) + fraction); } -int64_t taosTimeSub(int64_t t, int64_t duration, char unit, int32_t precision) { - if (duration == 0) { - return t; - } - - if (unit != 'n' && unit != 'y') { - return t - duration; - } - - // The following code handles the y/n time duration - int64_t numOfMonth = duration; - if (unit == 'y') { - numOfMonth *= 12; - } - - struct tm tm; - time_t tt = (time_t)(t / TSDB_TICK_PER_SECOND(precision)); - taosLocalTime(&tt, &tm); - int32_t mon = tm.tm_year * 12 + tm.tm_mon - (int32_t)numOfMonth; - tm.tm_year = mon / 12; - tm.tm_mon = mon % 12; - - return (int64_t)(taosMktime(&tm) * TSDB_TICK_PER_SECOND(precision)); -} - - int32_t taosTimeCountInterval(int64_t skey, int64_t ekey, int64_t interval, char unit, int32_t precision) { if (ekey < skey) { int64_t tmp = ekey; diff --git a/source/libs/scalar/src/sclvector.c b/source/libs/scalar/src/sclvector.c index 76de4da4fd..254c99f05d 100644 --- a/source/libs/scalar/src/sclvector.c +++ b/source/libs/scalar/src/sclvector.c @@ -1195,7 +1195,7 @@ static void vectorMathTsSubHelper(SColumnInfoData* pLeftCol, SColumnInfoData* pR colDataAppendNULL(pOutputCol, i); continue; // TODO set null or ignore } - *output = taosTimeSub(getVectorBigintValueFnLeft(pLeftCol->pData, i), getVectorBigintValueFnRight(pRightCol->pData, 0), + *output = taosTimeAdd(getVectorBigintValueFnLeft(pLeftCol->pData, i), -getVectorBigintValueFnRight(pRightCol->pData, 0), pRightCol->info.scale, pRightCol->info.precision); } diff --git a/tests/script/tsim/scalar/scalar.sim b/tests/script/tsim/scalar/scalar.sim index 32224e33ba..29cc67ec24 100644 --- a/tests/script/tsim/scalar/scalar.sim +++ b/tests/script/tsim/scalar/scalar.sim @@ -43,7 +43,8 @@ sql select cast(1 as timestamp)+1n; if $rows != 1 then return -1 endi -if $data00 != @70-02-01 08:00:00.000@ then +if $data00 != @70-02-01 08:00:00.001@ then + print expect 70-02-01 08:00:00.001, actual: $data00 return -1 endi @@ -52,11 +53,13 @@ if $rows != 1 then return -1 endi -sql select cast(1 as timestamp)-1y; +# there is an *bug* in print timestamp that smaller than 0, so let's try value that is greater than 0. +sql select cast(1 as timestamp)+1y; if $rows != 1 then return -1 endi -if $data00 != @69-01-01 08:00:00.000@ then +if $data00 != @71-01-01 08:00:00.001@ then + print expect 71-01-01 08:00:00.001 , actual: $data00 return -1 endi