From 1f92000c486363a08f6d22fcd8e7f1d1383ca9db Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 20 Apr 2022 14:58:32 +0800 Subject: [PATCH 1/5] fix(query): fix invalid write in distinct query. --- source/libs/executor/src/executorimpl.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index cd129b54fb..9d84f45de7 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -5615,7 +5615,7 @@ int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t n _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pAggSup->resultRowSize = getResultRowSize(pCtx, numOfOutput); - pAggSup->keyBuf = taosMemoryCalloc(1, keyBufSize); + pAggSup->keyBuf = taosMemoryCalloc(1, keyBufSize + POINTER_BYTES + sizeof(int64_t)); pAggSup->pResultRowHashTable = taosHashInit(10, hashFn, true, HASH_NO_LOCK); pAggSup->pResultRowListSet = taosHashInit(100, hashFn, false, HASH_NO_LOCK); pAggSup->pResultRowArrayList = taosArrayInit(10, sizeof(SResultRowCell)); From 343988beeb4d2ee5be791cd84bbe7a56e37aec5c Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 20 Apr 2022 14:59:06 +0800 Subject: [PATCH 2/5] refactor: do some internal refactor. --- include/libs/function/function.h | 26 ++-- source/libs/executor/inc/executorimpl.h | 9 +- .../libs/{function => executor}/inc/tfill.h | 20 ++- source/libs/executor/src/executorimpl.c | 140 ++++++++++-------- source/libs/executor/src/groupoperator.c | 4 +- source/libs/executor/src/scanoperator.c | 2 +- .../libs/{function => executor}/src/tfill.c | 96 ++++++------ 7 files changed, 168 insertions(+), 129 deletions(-) rename source/libs/{function => executor}/inc/tfill.h (71%) rename source/libs/{function => executor}/src/tfill.c (88%) diff --git a/include/libs/function/function.h b/include/libs/function/function.h index 1303a1fb6a..eafe64c294 100644 --- a/include/libs/function/function.h +++ b/include/libs/function/function.h @@ -295,19 +295,19 @@ typedef struct SPoint { void * val; } SPoint; -void taosFillSetStartInfo(struct SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey); -void taosResetFillInfo(struct SFillInfo* pFillInfo, TSKEY startTimestamp); -void taosFillSetInputDataBlock(struct SFillInfo* pFillInfo, const struct SSDataBlock* pInput); -struct SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfOutput, const int64_t* fillVal); -bool taosFillHasMoreResults(struct SFillInfo* pFillInfo); - -struct SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_t capacity, int32_t numOfCols, - int64_t slidingTime, int8_t slidingUnit, int8_t precision, int32_t fillType, - struct SFillColInfo* pFillCol, const char* id); - -void* taosDestroyFillInfo(struct SFillInfo *pFillInfo); -int64_t taosFillResultDataBlock(struct SFillInfo* pFillInfo, void** output, int32_t capacity); -int64_t getFillInfoStart(struct SFillInfo *pFillInfo); +//void taosFillSetStartInfo(struct SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey); +//void taosResetFillInfo(struct SFillInfo* pFillInfo, TSKEY startTimestamp); +//void taosFillSetInputDataBlock(struct SFillInfo* pFillInfo, const struct SSDataBlock* pInput); +//struct SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfOutput, const SValueNode* val); +//bool taosFillHasMoreResults(struct SFillInfo* pFillInfo); +// +//struct SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_t capacity, int32_t numOfCols, +// SInterval* pInterval, int32_t fillType, +// struct SFillColInfo* pCol, const char* id); +// +//void* taosDestroyFillInfo(struct SFillInfo *pFillInfo); +//int64_t taosFillResultDataBlock(struct SFillInfo* pFillInfo, void** output, int32_t capacity); +//int64_t getFillInfoStart(struct SFillInfo *pFillInfo); int32_t taosGetLinearInterpolationVal(SPoint* point, int32_t outputType, SPoint* point1, SPoint* point2, int32_t inputType); diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 4c9ed78769..9e48e12cb5 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -427,6 +427,7 @@ typedef struct STableIntervalOperatorInfo { EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model] SArray* pUpdatedWindow; // updated time window due to the input data block from the downstream operator. STimeWindowAggSupp twAggSup; + struct SFillInfo* pFillInfo; // fill info } STableIntervalOperatorInfo; typedef struct SAggOperatorInfo { @@ -467,7 +468,6 @@ typedef struct SFillOperatorInfo { SSDataBlock* existNewGroupBlock; bool multigroupResult; SInterval intervalInfo; - int32_t capacity; } SFillOperatorInfo; typedef struct { @@ -609,7 +609,7 @@ int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t int32_t initAggInfo(SOptrBasicInfo* pBasicInfo, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, size_t keyBufSize, const char* pkey); void initResultSizeInfo(SOperatorInfo* pOperator, int32_t numOfRows); -void toSDatablock(SSDataBlock* pBlock, int32_t rowCapacity, SGroupResInfo* pGroupResInfo, SExprInfo* pExprInfo, +void doBuildResultDatablock(SSDataBlock* pBlock, int32_t rowCapacity, SGroupResInfo* pGroupResInfo, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf, int32_t* rowCellOffset); void finalizeMultiTupleQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset); @@ -621,7 +621,7 @@ void doDestroyBasicInfo(SOptrBasicInfo* pInfo, int32_t numOfOutput); int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadInfo, int32_t numOfRows, char* pData, int32_t compLen, int32_t numOfOutput, int64_t startTs, uint64_t* total, SArray* pColList); -void getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key, int64_t keyFirst, int64_t keyLast, STimeWindow* win); +void getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key, STimeWindow* win); void doSetOperatorCompleted(SOperatorInfo* pOperator); void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock); @@ -645,8 +645,7 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataB SExecTaskInfo* pTaskInfo, bool showRewrite, int32_t accountId); SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId, - STimeWindowAggSupp *pTwAggSupp, const STableGroupInfo* pTableGroupInfo, - SExecTaskInfo* pTaskInfo); + STimeWindowAggSupp *pTwAggSupp, const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo); SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, int64_t gap, STimeWindowAggSupp *pTwAggSupp, SExecTaskInfo* pTaskInfo); SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, diff --git a/source/libs/function/inc/tfill.h b/source/libs/executor/inc/tfill.h similarity index 71% rename from source/libs/function/inc/tfill.h rename to source/libs/executor/inc/tfill.h index b90dbf7799..26d066d9a9 100644 --- a/source/libs/function/inc/tfill.h +++ b/source/libs/executor/inc/tfill.h @@ -22,15 +22,18 @@ extern "C" { #include "os.h" #include "taosdef.h" +#include "tcommon.h" struct SSDataBlock; typedef struct SFillColInfo { - STColumn col; // column info +// STColumn col; // column info + SResSchema col; int16_t functionId; // sql function id int16_t flag; // column flag: TAG COLUMN|NORMAL COLUMN int16_t tagIndex; // index of current tag in SFillTagColInfo array list - union {int64_t i; double d;} fillVal; + int32_t offset; + union {int64_t i; double d;} val; } SFillColInfo; typedef struct { @@ -57,7 +60,6 @@ typedef struct SFillInfo { char * nextValues; // next row of data char** pData; // original result data block involved in filling data int32_t alloc; // data buffer size in rows - int8_t precision; // time resoluation SFillColInfo* pFillCol; // column info for fill operations SFillTagColInfo* pTags; // tags value for filling gap @@ -67,7 +69,19 @@ typedef struct SFillInfo { int64_t getNumOfResultsAfterFillGap(SFillInfo* pFillInfo, int64_t ekey, int32_t maxNumOfRows); +void taosFillSetStartInfo(struct SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey); +void taosResetFillInfo(struct SFillInfo* pFillInfo, TSKEY startTimestamp); +void taosFillSetInputDataBlock(struct SFillInfo* pFillInfo, const struct SSDataBlock* pInput); +struct SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfOutput, const struct SValueNode* val); +bool taosFillHasMoreResults(struct SFillInfo* pFillInfo); +SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_t capacity, int32_t numOfCols, + SInterval* pInterval, int32_t fillType, + struct SFillColInfo* pCol, const char* id); + +void* taosDestroyFillInfo(struct SFillInfo *pFillInfo); +int64_t taosFillResultDataBlock(struct SFillInfo* pFillInfo, void** output, int32_t capacity); +int64_t getFillInfoStart(struct SFillInfo *pFillInfo); #ifdef __cplusplus diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 9d84f45de7..7791a345ed 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -14,11 +14,12 @@ */ #include "filter.h" -#include "functionMgt.h" #include "function.h" +#include "functionMgt.h" +#include "os.h" #include "querynodes.h" #include "tname.h" -#include "os.h" +#include "tfill.h" #include "tdatablock.h" #include "tglobal.h" @@ -514,13 +515,12 @@ static SResultRow* doSetResultOutBufByKey_rv(SDiskbasedBuf* pResultBuf, SResultR return pResult; } -static void getInitialStartTimeWindow(SInterval* pInterval, int32_t precision, TSKEY ts, STimeWindow* w, TSKEY ekey, - bool ascQuery) { +static void getInitialStartTimeWindow(SInterval* pInterval, int32_t precision, TSKEY ts, STimeWindow* w, bool ascQuery) { if (ascQuery) { - getAlignQueryTimeWindow(pInterval, precision, ts, ts, ekey, w); + getAlignQueryTimeWindow(pInterval, precision, ts, w); } else { // the start position of the first time window in the endpoint that spreads beyond the queried last timestamp - getAlignQueryTimeWindow(pInterval, precision, ts, ekey, ts, w); + getAlignQueryTimeWindow(pInterval, precision, ts, w); int64_t key = w->skey; while (key < ts) { // moving towards end @@ -540,7 +540,7 @@ static STimeWindow getActiveTimeWindow(SDiskbasedBuf * pBuf, SResultRowInfo* pRe STimeWindow w = {0}; if (pResultRowInfo->cur.pageId == -1) { // the first window, from the previous stored value - getInitialStartTimeWindow(pInterval, precision, ts, &w, win->ekey, true); + getInitialStartTimeWindow(pInterval, precision, ts, &w, true); w.ekey = taosTimeAdd(w.skey, pInterval->interval, pInterval->intervalUnit, precision) - 1; } else { w = getResultRowByPos(pBuf, &pResultRowInfo->cur)->win; @@ -2015,20 +2015,16 @@ static bool isCachedLastQuery(STaskAttr* pQueryAttr) { ///////////////////////////////////////////////////////////////////////////////////////////// // todo refactor : return window -void getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key, int64_t keyFirst, int64_t keyLast, - STimeWindow* win) { - ASSERT(key >= keyFirst && key <= keyLast); +void getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key, STimeWindow* win) { win->skey = taosTimeTruncate(key, pInterval, precision); /* * if the realSkey > INT64_MAX - pInterval->interval, the query duration between * realSkey and realEkey must be less than one interval.Therefore, no need to adjust the query ranges. */ - if (keyFirst > (INT64_MAX - pInterval->interval)) { - assert(keyLast - keyFirst < pInterval->interval); + win->ekey = taosTimeAdd(win->skey, pInterval->interval, pInterval->intervalUnit, precision) - 1; + if (win->ekey < win->skey) { win->ekey = INT64_MAX; - } else { - win->ekey = taosTimeAdd(win->skey, pInterval->interval, pInterval->intervalUnit, precision) - 1; } } @@ -3176,10 +3172,12 @@ int32_t doCopyToSDataBlock(SSDataBlock* pBlock, int32_t rowCapacity, SExprInfo* // qDebug("QInfo:0x%"PRIx64" copy data to query buf completed", GET_TASKID(pRuntimeEnv)); pBlock->info.rows = numOfResult; + blockDataUpdateTsWindow(pBlock); + return 0; } -void toSDatablock(SSDataBlock* pBlock, int32_t rowCapacity, SGroupResInfo* pGroupResInfo, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf, +void doBuildResultDatablock(SSDataBlock* pBlock, int32_t rowCapacity, SGroupResInfo* pGroupResInfo, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf, int32_t* rowCellOffset) { assert(pGroupResInfo->currentGroup <= pGroupResInfo->totalGroup); @@ -4813,7 +4811,7 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator, bool* newgroup) } blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); - toSDatablock(pInfo->pRes, pOperator->resultInfo.capacity, &pAggInfo->groupResInfo, pOperator->pExpr, pAggInfo->aggSup.pResultBuf, pInfo->rowCellInfoOffset); + doBuildResultDatablock(pInfo->pRes, pOperator->resultInfo.capacity, &pAggInfo->groupResInfo, pOperator->pExpr, pAggInfo->aggSup.pResultBuf, pInfo->rowCellInfoOffset); if (pInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pAggInfo->groupResInfo)) { doSetOperatorCompleted(pOperator); } @@ -5126,6 +5124,8 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator, bool* newgro return NULL; } + SSDataBlock* pBlock = pInfo->binfo.pRes; + if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM) { return pOperator->getStreamResFn(pOperator, newgroup); } else { @@ -5134,15 +5134,15 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator, bool* newgro return NULL; } - blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); - toSDatablock(pInfo->binfo.pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, - pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset); + blockDataEnsureCapacity(pBlock, pOperator->resultInfo.capacity); + doBuildResultDatablock(pBlock, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, + pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset); - if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { + if (pBlock->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { doSetOperatorCompleted(pOperator); } - return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes; + return pBlock->info.rows == 0 ? NULL : pBlock; } } @@ -5155,7 +5155,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo *pOperator, bool* newgroup } if (pOperator->status == OP_RES_TO_RETURN) { - toSDatablock(pInfo->binfo.pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset); + doBuildResultDatablock(pInfo->binfo.pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset); if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { pOperator->status = OP_EXEC_DONE; } @@ -5190,7 +5190,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo *pOperator, bool* newgroup initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated); blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); - toSDatablock(pInfo->binfo.pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset); + doBuildResultDatablock(pInfo->binfo.pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset); ASSERT(pInfo->binfo.pRes->info.rows > 0); pOperator->status = OP_RES_TO_RETURN; @@ -5205,7 +5205,7 @@ static SSDataBlock* doAllIntervalAgg(SOperatorInfo *pOperator, bool* newgroup) { STimeSliceOperatorInfo* pSliceInfo = pOperator->info; if (pOperator->status == OP_RES_TO_RETURN) { - // toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes); + // doBuildResultDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes); if (pSliceInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pSliceInfo->groupResInfo)) { doSetOperatorCompleted(pOperator); } @@ -5238,7 +5238,7 @@ static SSDataBlock* doAllIntervalAgg(SOperatorInfo *pOperator, bool* newgroup) { finalizeQueryResult(pSliceInfo->binfo.pCtx, pOperator->numOfOutput); initGroupResInfo(&pSliceInfo->groupResInfo, &pSliceInfo->binfo.resultRowInfo); - // toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pSliceInfo->pRes); + // doBuildResultDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pSliceInfo->pRes); if (pSliceInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pSliceInfo->groupResInfo)) { pOperator->status = OP_EXEC_DONE; @@ -5292,7 +5292,7 @@ static SSDataBlock* doSTableIntervalAgg(SOperatorInfo* pOperator, bool* newgroup OPTR_SET_OPENED(pOperator); blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); - toSDatablock(pInfo->binfo.pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, + doBuildResultDatablock(pInfo->binfo.pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset); if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { @@ -5383,7 +5383,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator, bool* newgroup) { SOptrBasicInfo* pBInfo = &pInfo->binfo; if (pOperator->status == OP_RES_TO_RETURN) { - toSDatablock(pBInfo->pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pBInfo->rowCellInfoOffset); + doBuildResultDatablock(pBInfo->pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pBInfo->rowCellInfoOffset); if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { doSetOperatorCompleted(pOperator); return NULL; @@ -5415,7 +5415,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator, bool* newgroup) { initGroupResInfo(&pInfo->groupResInfo, &pBInfo->resultRowInfo); blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity); - toSDatablock(pBInfo->pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pBInfo->rowCellInfoOffset); + doBuildResultDatablock(pBInfo->pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pBInfo->rowCellInfoOffset); if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { doSetOperatorCompleted(pOperator); } @@ -5432,7 +5432,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator, bool* newgroup) SOptrBasicInfo* pBInfo = &pInfo->binfo; if (pOperator->status == OP_RES_TO_RETURN) { - toSDatablock(pBInfo->pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pBInfo->rowCellInfoOffset); + doBuildResultDatablock(pBInfo->pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pBInfo->rowCellInfoOffset); if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { doSetOperatorCompleted(pOperator); return NULL; @@ -5464,7 +5464,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator, bool* newgroup) initGroupResInfo(&pInfo->groupResInfo, &pBInfo->resultRowInfo); blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity); - toSDatablock(pBInfo->pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pBInfo->rowCellInfoOffset); + doBuildResultDatablock(pBInfo->pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pBInfo->rowCellInfoOffset); if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { doSetOperatorCompleted(pOperator); } @@ -5509,14 +5509,16 @@ static SSDataBlock* doFill(SOperatorInfo* pOperator, bool* newgroup) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SResultInfo* pResultInfo = &pOperator->resultInfo; - blockDataCleanup(pInfo->pRes); + SSDataBlock* pResBlock = pInfo->pRes; + + blockDataCleanup(pResBlock); if (pOperator->status == OP_EXEC_DONE) { return NULL; } doHandleRemainBlockFromNewGroup(pInfo, pResultInfo, newgroup, pTaskInfo); - if (pInfo->pRes->info.rows > pResultInfo->threshold || (!pInfo->multigroupResult && pInfo->pRes->info.rows > 0)) { - return pInfo->pRes; + if (pResBlock->info.rows > pResultInfo->threshold || (!pInfo->multigroupResult && pResBlock->info.rows > 0)) { + return pResBlock; } SOperatorInfo* pDownstream = pOperator->pDownstream[0]; @@ -5551,25 +5553,25 @@ static SSDataBlock* doFill(SOperatorInfo* pOperator, bool* newgroup) { } } - doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pInfo->capacity, pInfo->p); + doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pResBlock, pOperator->resultInfo.capacity, pInfo->p); // current group has no more result to return - if (pInfo->pRes->info.rows > 0) { + if (pResBlock->info.rows > 0) { // 1. The result in current group not reach the threshold of output result, continue // 2. If multiple group results existing in one SSDataBlock is not allowed, return immediately - if (pInfo->pRes->info.rows > pResultInfo->threshold || pBlock == NULL || (!pInfo->multigroupResult)) { - return pInfo->pRes; + if (pResBlock->info.rows > pResultInfo->threshold || pBlock == NULL || (!pInfo->multigroupResult)) { + return pResBlock; } doHandleRemainBlockFromNewGroup(pInfo, pResultInfo, newgroup, pTaskInfo); - if (pInfo->pRes->info.rows > pOperator->resultInfo.threshold || pBlock == NULL) { - return pInfo->pRes; + if (pResBlock->info.rows > pOperator->resultInfo.threshold || pBlock == NULL) { + return pResBlock; } } else if (pInfo->existNewGroupBlock) { // try next group assert(pBlock != NULL); doHandleRemainBlockForNewGroupImpl(pInfo, pResultInfo, newgroup, pTaskInfo); - if (pInfo->pRes->info.rows > pResultInfo->threshold) { - return pInfo->pRes; + if (pResBlock->info.rows > pResultInfo->threshold) { + return pResBlock; } } else { return NULL; @@ -5878,8 +5880,7 @@ _error: SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId, - STimeWindowAggSupp *pTwAggSupp, const STableGroupInfo* pTableGroupInfo, - SExecTaskInfo* pTaskInfo) { + STimeWindowAggSupp* pTwAggSupp, const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo) { STableIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(STableIntervalOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { @@ -5890,10 +5891,8 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pInfo->interval = *pInterval; pInfo->execModel = pTaskInfo->execModel; pInfo->win = pTaskInfo->window; - pInfo->win.skey = 0; - pInfo->win.ekey = INT64_MAX; - pInfo->primaryTsIndex = primaryTsSlotId; pInfo->twAggSup = *pTwAggSupp; + pInfo->primaryTsIndex = primaryTsSlotId; int32_t numOfRows = 4096; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; @@ -6063,18 +6062,14 @@ _error: static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t numOfCols, int64_t* fillVal, STimeWindow win, int32_t capacity, const char* id, SInterval* pInterval, int32_t fillType) { - struct SFillColInfo* pColInfo = createFillColInfo(pExpr, numOfCols, (int64_t*)fillVal); - - TSKEY sk = TMIN(win.skey, win.ekey); - TSKEY ek = TMAX(win.skey, win.ekey); + SFillColInfo* pColInfo = createFillColInfo(pExpr, numOfCols, NULL); // TODO set correct time precision STimeWindow w = TSWINDOW_INITIALIZER; - getAlignQueryTimeWindow(pInterval, TSDB_TIME_PRECISION_MILLI, win.skey, sk, ek, &w); + getAlignQueryTimeWindow(pInterval, TSDB_TIME_PRECISION_MILLI, win.skey, &w); int32_t order = TSDB_ORDER_ASC; - pInfo->pFillInfo = taosCreateFillInfo(order, w.skey, 0, capacity, numOfCols, pInterval->sliding, - pInterval->slidingUnit, (int8_t)pInterval->precision, fillType, pColInfo, id); + pInfo->pFillInfo = taosCreateFillInfo(order, w.skey, 0, capacity, numOfCols, pInterval, fillType, pColInfo, id); pInfo->p = taosMemoryCalloc(numOfCols, POINTER_BYTES); @@ -6095,23 +6090,37 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExp pInfo->multigroupResult = multigroupResult; pInfo->intervalInfo = *pInterval; + int32_t type = TSDB_FILL_NONE; + switch (fillType) { + case FILL_MODE_PREV: type = TSDB_FILL_PREV;break; + case FILL_MODE_NONE: type = TSDB_FILL_NONE;break; + case FILL_MODE_NULL: type = TSDB_FILL_NULL;break; + case FILL_MODE_NEXT: type = TSDB_FILL_NEXT;break; + case FILL_MODE_VALUE: type = TSDB_FILL_SET_VALUE;break; + case FILL_MODE_LINEAR: type = TSDB_FILL_LINEAR;break; + default: + type = TSDB_FILL_NONE; + } + SResultInfo* pResultInfo = &pOperator->resultInfo; - int32_t code = initFillInfo(pInfo, pExpr, numOfCols, (int64_t*)fillVal, pTaskInfo->window, pResultInfo->capacity, - pTaskInfo->id.str, pInterval, fillType); + initResultSizeInfo(pOperator, 4096); + + int32_t code = initFillInfo(pInfo, pExpr, numOfCols, (int64_t*)fillVal, pTaskInfo->window, pResultInfo->capacity, + pTaskInfo->id.str, pInterval, type); if (code != TSDB_CODE_SUCCESS) { goto _error; } - pOperator->name = "FillOperator"; + pOperator->name = "FillOperator"; pOperator->blockingOptr = false; - pOperator->status = OP_NOT_OPENED; + pOperator->status = OP_NOT_OPENED; // pOperator->operatorType = OP_Fill; - pOperator->pExpr = pExpr; - pOperator->numOfOutput = numOfCols; - pOperator->info = pInfo; - pOperator->_openFn = operatorDummyOpenFn; - pOperator->getNextFn = doFill; - pOperator->pTaskInfo = pTaskInfo; + pOperator->pExpr = pExpr; + pOperator->numOfOutput = numOfCols; + pOperator->info = pInfo; + pOperator->_openFn = operatorDummyOpenFn; + pOperator->getNextFn = doFill; + pOperator->pTaskInfo = pTaskInfo; pOperator->closeFn = destroySFillOperatorInfo; @@ -6582,6 +6591,11 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo int32_t primaryTsSlotId = ((SColumnNode*) pIntervalPhyNode->window.pTspk)->slotId; pOptr = createIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, primaryTsSlotId, &as, pTableGroupInfo, pTaskInfo); + + if (pIntervalPhyNode->pFill != NULL) { + pOptr = createFillOperatorInfo(pOptr, pExprInfo, num, &interval, pResBlock, pIntervalPhyNode->pFill->mode, NULL, false, pTaskInfo); + } + } else if (QUERY_NODE_PHYSICAL_PLAN_SORT == type) { SSortPhysiNode* pSortPhyNode = (SSortPhysiNode*)pPhyNode; diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index d610880e30..9f75f97632 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -265,7 +265,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator, bool* newgrou SSDataBlock* pRes = pInfo->binfo.pRes; if (pOperator->status == OP_RES_TO_RETURN) { - toSDatablock(pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset); + doBuildResultDatablock(pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset); if (pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { pOperator->status = OP_EXEC_DONE; } @@ -311,7 +311,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator, bool* newgrou initGroupResInfo(&pInfo->groupResInfo, &pInfo->binfo.resultRowInfo); while(1) { - toSDatablock(pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset); + doBuildResultDatablock(pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset); doFilter(pInfo->pCondition, pRes); bool hasRemain = hasRemainDataInCurrentGroup(&pInfo->groupResInfo); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 6b06f3e89b..daa5ac1492 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -129,7 +129,7 @@ static bool overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockIn // TSKEY ek = MAX(pQueryAttr->window.skey, pQueryAttr->window.ekey); if (true) { - getAlignQueryTimeWindow(pInterval, pInterval->precision, pBlockInfo->window.skey, sk, ek, &w); + getAlignQueryTimeWindow(pInterval, pInterval->precision, pBlockInfo->window.skey, &w); assert(w.ekey >= pBlockInfo->window.skey); if (w.ekey < pBlockInfo->window.ekey) { diff --git a/source/libs/function/src/tfill.c b/source/libs/executor/src/tfill.c similarity index 88% rename from source/libs/function/src/tfill.c rename to source/libs/executor/src/tfill.c index b6b5362187..aeed07c636 100644 --- a/source/libs/function/src/tfill.c +++ b/source/libs/executor/src/tfill.c @@ -13,17 +13,18 @@ * along with this program. If not, see . */ -#include +#include "function.h" #include "os.h" +#include "querynodes.h" #include "taosdef.h" #include "tmsg.h" #include "ttypes.h" #include "tfill.h" -#include "thash.h" #include "function.h" #include "tcommon.h" +#include "thash.h" #include "ttime.h" #define FILL_IS_ASC_FILL(_f) ((_f)->order == TSDB_ORDER_ASC) @@ -41,7 +42,7 @@ static void setTagsValue(SFillInfo* pFillInfo, void** data, int32_t genRows) { assert(pCol->tagIndex >= 0 && pCol->tagIndex < pFillInfo->numOfTags); SFillTagColInfo* pTag = &pFillInfo->pTags[pCol->tagIndex]; - assert (pTag->col.colId == pCol->col.colId); +// assert (pTag->col.colId == pCol->col.colId); assignVal(val1, pTag->tagVal, pCol->col.bytes, pCol->col.type); } } @@ -80,7 +81,7 @@ static void doFillOneRowResult(SFillInfo* pFillInfo, void** data, char** srcData } char* output = elePtrAt(data[i], pCol->col.bytes, index); - assignVal(output, p + pCol->col.offset, pCol->col.bytes, pCol->col.type); +// assignVal(output, p + pCol->offset, pCol->col.bytes, pCol->col.type); } } else { // no prev value yet, set the value for NULL setNullValueForRow(pFillInfo, data, pFillInfo->numOfCols, index); @@ -96,7 +97,7 @@ static void doFillOneRowResult(SFillInfo* pFillInfo, void** data, char** srcData } char* output = elePtrAt(data[i], pCol->col.bytes, index); - assignVal(output, p + pCol->col.offset, pCol->col.bytes, pCol->col.type); +// assignVal(output, p + pCol->offset, pCol->col.bytes, pCol->col.type); } } else { // no prev value yet, set the value for NULL setNullValueForRow(pFillInfo, data, pFillInfo->numOfCols, index); @@ -119,7 +120,7 @@ static void doFillOneRowResult(SFillInfo* pFillInfo, void** data, char** srcData continue; } - point1 = (SPoint){.key = *(TSKEY*)(prev), .val = prev + pCol->col.offset}; + point1 = (SPoint){.key = *(TSKEY*)(prev), .val = prev + pCol->offset}; point2 = (SPoint){.key = ts, .val = srcData[i] + pFillInfo->index * bytes}; point = (SPoint){.key = pFillInfo->currentKey, .val = val1}; taosGetLinearInterpolationVal(&point, type, &point1, &point2, type); @@ -135,12 +136,13 @@ static void doFillOneRowResult(SFillInfo* pFillInfo, void** data, char** srcData } char* val1 = elePtrAt(data[i], pCol->col.bytes, index); - assignVal(val1, (char*)&pCol->fillVal.i, pCol->col.bytes, pCol->col.type); + assignVal(val1, (char*)&pCol->val, pCol->col.bytes, pCol->col.type); } } setTagsValue(pFillInfo, data, index); - pFillInfo->currentKey = taosTimeAdd(pFillInfo->currentKey, pFillInfo->interval.sliding * step, pFillInfo->interval.slidingUnit, pFillInfo->precision); + pFillInfo->currentKey = taosTimeAdd(pFillInfo->currentKey, pFillInfo->interval.sliding * step, pFillInfo->interval.slidingUnit, + pFillInfo->interval.precision); pFillInfo->numOfCurrent++; } @@ -152,7 +154,7 @@ static void initBeforeAfterDataBuf(SFillInfo* pFillInfo, char** next) { *next = taosMemoryCalloc(1, pFillInfo->rowSize); for (int i = 1; i < pFillInfo->numOfCols; i++) { SFillColInfo* pCol = &pFillInfo->pFillCol[i]; - setNull(*next + pCol->col.offset, pCol->col.type, pCol->col.bytes); + setNull(*next + pCol->offset, pCol->col.type, pCol->col.bytes); } } @@ -160,7 +162,7 @@ static void copyCurrentRowIntoBuf(SFillInfo* pFillInfo, char** srcData, char* bu int32_t rowIndex = pFillInfo->index; for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) { SFillColInfo* pCol = &pFillInfo->pFillCol[i]; - memcpy(buf + pCol->col.offset, srcData[i] + rowIndex * pCol->col.bytes, pCol->col.bytes); + memcpy(buf + pCol->offset, srcData[i] + rowIndex * pCol->col.bytes, pCol->col.bytes); } } @@ -227,21 +229,21 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, void** data, int32_t outputR if (i == 0 || (pCol->functionId != FUNCTION_COUNT && !isNull(src, pCol->col.type)) || (pCol->functionId == FUNCTION_COUNT && GET_INT64_VAL(src) != 0)) { assignVal(output, src, pCol->col.bytes, pCol->col.type); - memcpy(*prev + pCol->col.offset, src, pCol->col.bytes); + memcpy(*prev + pCol->offset, src, pCol->col.bytes); } else { // i > 0 and data is null , do interpolation if (pFillInfo->type == TSDB_FILL_PREV) { - assignVal(output, *prev + pCol->col.offset, pCol->col.bytes, pCol->col.type); + assignVal(output, *prev + pCol->offset, pCol->col.bytes, pCol->col.type); } else if (pFillInfo->type == TSDB_FILL_LINEAR) { assignVal(output, src, pCol->col.bytes, pCol->col.type); - memcpy(*prev + pCol->col.offset, src, pCol->col.bytes); + memcpy(*prev + pCol->offset, src, pCol->col.bytes); } else if (pFillInfo->type == TSDB_FILL_NEXT) { if (*next) { - assignVal(output, *next + pCol->col.offset, pCol->col.bytes, pCol->col.type); + assignVal(output, *next + pCol->offset, pCol->col.bytes, pCol->col.type); } else { setNull(output, pCol->col.type, pCol->col.bytes); } } else { - assignVal(output, (char*)&pCol->fillVal.i, pCol->col.bytes, pCol->col.type); + assignVal(output, (char*)&pCol->val, pCol->col.bytes, pCol->col.type); } } } @@ -250,7 +252,7 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, void** data, int32_t outputR setTagsValue(pFillInfo, data, pFillInfo->numOfCurrent); pFillInfo->currentKey = taosTimeAdd(pFillInfo->currentKey, pFillInfo->interval.sliding * step, - pFillInfo->interval.slidingUnit, pFillInfo->precision); + pFillInfo->interval.slidingUnit, pFillInfo->interval.precision); pFillInfo->index += 1; pFillInfo->numOfCurrent += 1; } @@ -301,7 +303,7 @@ static int32_t setTagColumnInfo(SFillInfo* pFillInfo, int32_t numOfCols, int32_t bool exists = false; int32_t index = -1; for (int32_t j = 0; j < k; ++j) { - if (pFillInfo->pTags[j].col.colId == pColInfo->col.colId) { + if (pFillInfo->pTags[j].col.colId == pColInfo->col.slotId) { exists = true; index = j; break; @@ -310,7 +312,7 @@ static int32_t setTagColumnInfo(SFillInfo* pFillInfo, int32_t numOfCols, int32_t if (!exists) { SSchema* pSchema = &pFillInfo->pTags[k].col; - pSchema->colId = pColInfo->col.colId; + pSchema->colId = pColInfo->col.slotId; pSchema->type = pColInfo->col.type; pSchema->bytes = pColInfo->col.bytes; @@ -341,30 +343,40 @@ static int32_t taosNumOfRemainRows(SFillInfo* pFillInfo) { } struct SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_t capacity, int32_t numOfCols, - int64_t slidingTime, int8_t slidingUnit, int8_t precision, int32_t fillType, - struct SFillColInfo* pCol, const char* id) { + SInterval* pInterval, int32_t fillType, struct SFillColInfo* pCol, const char* id) { if (fillType == TSDB_FILL_NONE) { return NULL; } SFillInfo* pFillInfo = taosMemoryCalloc(1, sizeof(SFillInfo)); + if (pFillInfo == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + taosResetFillInfo(pFillInfo, skey); - pFillInfo->order = order; + pFillInfo->order = order; + + switch(fillType) { + case FILL_MODE_NONE: pFillInfo->type = TSDB_FILL_NONE; break; + case FILL_MODE_PREV: pFillInfo->type = TSDB_FILL_PREV; break; + case FILL_MODE_NULL: pFillInfo->type = TSDB_FILL_NULL; break; + case FILL_MODE_LINEAR: pFillInfo->type = TSDB_FILL_LINEAR;break; + case FILL_MODE_NEXT: pFillInfo->type = TSDB_FILL_NEXT; break; + default: + terrno = TSDB_CODE_INVALID_PARA; + return NULL; + } + pFillInfo->type = fillType; pFillInfo->pFillCol = pCol; pFillInfo->numOfTags = numOfTags; pFillInfo->numOfCols = numOfCols; - pFillInfo->precision = precision; pFillInfo->alloc = capacity; pFillInfo->id = id; - - pFillInfo->interval.interval = slidingTime; - pFillInfo->interval.intervalUnit = slidingUnit; - pFillInfo->interval.sliding = slidingTime; - pFillInfo->interval.slidingUnit = slidingUnit; - - pFillInfo->pData = taosMemoryMalloc(POINTER_BYTES * numOfCols); + pFillInfo->interval = *pInterval; + pFillInfo->pData = taosMemoryMalloc(POINTER_BYTES * numOfCols); // if (numOfTags > 0) { pFillInfo->pTags = taosMemoryCalloc(numOfCols, sizeof(SFillTagColInfo)); @@ -375,7 +387,6 @@ struct SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTag pFillInfo->rowSize = setTagColumnInfo(pFillInfo, pFillInfo->numOfCols, pFillInfo->alloc); assert(pFillInfo->rowSize > 0); - return pFillInfo; } @@ -417,7 +428,7 @@ void taosFillSetStartInfo(SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey) pFillInfo->end = endKey; if (!FILL_IS_ASC_FILL(pFillInfo)) { - pFillInfo->end = taosTimeTruncate(endKey, &pFillInfo->interval, pFillInfo->precision); + pFillInfo->end = taosTimeTruncate(endKey, &pFillInfo->interval, pFillInfo->interval.precision); } pFillInfo->index = 0; @@ -433,7 +444,7 @@ void taosFillSetInputDataBlock(SFillInfo* pFillInfo, const SSDataBlock* pInput) if (TSDB_COL_IS_TAG(pCol->flag)) { // copy the tag value to tag value buffer SFillTagColInfo* pTag = &pFillInfo->pTags[pCol->tagIndex]; - assert (pTag->col.colId == pCol->col.colId); + assert (pTag->col.colId == pCol->col.slotId); memcpy(pTag->tagVal, pColData->pData, pCol->col.bytes); // TODO not memcpy?? } } @@ -460,7 +471,7 @@ int64_t getNumOfResultsAfterFillGap(SFillInfo* pFillInfo, TSKEY ekey, int32_t ma TSKEY ekey1 = ekey; if (!FILL_IS_ASC_FILL(pFillInfo)) { - pFillInfo->end = taosTimeTruncate(ekey, &pFillInfo->interval, pFillInfo->precision); + pFillInfo->end = taosTimeTruncate(ekey, &pFillInfo->interval, pFillInfo->interval.precision); } int64_t numOfRes = -1; @@ -471,7 +482,7 @@ int64_t getNumOfResultsAfterFillGap(SFillInfo* pFillInfo, TSKEY ekey, int32_t ma pFillInfo->currentKey, pFillInfo->interval.sliding, pFillInfo->interval.slidingUnit, - pFillInfo->precision); + pFillInfo->interval.precision); numOfRes += 1; assert(numOfRes >= numOfRows); } else { // reach the end of data @@ -484,7 +495,7 @@ int64_t getNumOfResultsAfterFillGap(SFillInfo* pFillInfo, TSKEY ekey, int32_t ma pFillInfo->currentKey, pFillInfo->interval.sliding, pFillInfo->interval.slidingUnit, - pFillInfo->precision); + pFillInfo->interval.precision); numOfRes += 1; } @@ -527,7 +538,7 @@ int64_t getFillInfoStart(struct SFillInfo *pFillInfo) { return pFillInfo->start; } -struct SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfOutput, const int64_t* fillVal) { +struct SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfOutput, const struct SValueNode* val) { int32_t offset = 0; struct SFillColInfo* pFillCol = taosMemoryCalloc(numOfOutput, sizeof(SFillColInfo)); @@ -538,14 +549,15 @@ struct SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfOutput, co for(int32_t i = 0; i < numOfOutput; ++i) { SExprInfo* pExprInfo = &pExpr[i]; - pFillCol[i].col.bytes = pExprInfo->base.resSchema.bytes; - pFillCol[i].col.type = (int8_t)pExprInfo->base.resSchema.type; - pFillCol[i].col.offset = offset; - pFillCol[i].col.colId = pExprInfo->base.resSchema.slotId; + pFillCol[i].col = pExprInfo->base.resSchema; + pFillCol[i].offset = offset; pFillCol[i].tagIndex = -2; - pFillCol[i].flag = pExprInfo->base.pParam[0].pCol->flag; // always be the normal column for table query + + if (pExprInfo->base.numOfParams > 0) { + pFillCol[i].flag = pExprInfo->base.pParam[0].pCol->flag; // always be the normal column for table query + } // pFillCol[i].functionId = pExprInfo->pExpr->_function.functionId; - pFillCol[i].fillVal.i = fillVal[i]; +// pFillCol[i].val.d = *val; offset += pExprInfo->base.resSchema.bytes; } From f933aa01cb6789a228a3e6722efbfd8a518760ce Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 20 Apr 2022 15:20:08 +0800 Subject: [PATCH 3/5] fix:fix some compiling errors. --- source/libs/function/src/taggfunction.c | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/source/libs/function/src/taggfunction.c b/source/libs/function/src/taggfunction.c index e001ef4071..757a3b6107 100644 --- a/source/libs/function/src/taggfunction.c +++ b/source/libs/function/src/taggfunction.c @@ -19,14 +19,14 @@ #include "thash.h" #include "ttypes.h" +//#include "tfill.h" #include "function.h" #include "taggfunction.h" -#include "tfill.h" -#include "thistogram.h" -#include "ttszip.h" -#include "tpercentile.h" #include "tbuffer.h" #include "tcompression.h" +#include "thistogram.h" +#include "tpercentile.h" +#include "ttszip.h" //#include "queryLog.h" #include "tdatablock.h" #include "tudf.h" @@ -3681,7 +3681,7 @@ static void interp_function_impl(SqlFunctionCtx *pCtx) { if (isNull(start, srcType) || isNull(end, srcType)) { setNull(pCtx->pOutput, srcType, pCtx->inputBytes); } else { - taosGetLinearInterpolationVal(&point, pCtx->resDataInfo.type, &point1, &point2, srcType); +// taosGetLinearInterpolationVal(&point, pCtx->resDataInfo.type, &point1, &point2, srcType); } } else { setNull(pCtx->pOutput, srcType, pCtx->inputBytes); From 3e80cead0bb04560f7594ad1b1c210632c641fbf Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 20 Apr 2022 15:21:51 +0800 Subject: [PATCH 4/5] fix:fix an link error. --- source/libs/function/src/taggfunction.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/function/src/taggfunction.c b/source/libs/function/src/taggfunction.c index 757a3b6107..c990df943a 100644 --- a/source/libs/function/src/taggfunction.c +++ b/source/libs/function/src/taggfunction.c @@ -3608,7 +3608,7 @@ static void interp_function_impl(SqlFunctionCtx *pCtx) { if (isNull((char *)&pCtx->start.val, srcType) || isNull((char *)&pCtx->end.val, srcType)) { setNull(pCtx->pOutput, srcType, pCtx->inputBytes); } else { - taosGetLinearInterpolationVal(&point, pCtx->resDataInfo.type, &point1, &point2, TSDB_DATA_TYPE_DOUBLE); +// taosGetLinearInterpolationVal(&point, pCtx->resDataInfo.type, &point1, &point2, TSDB_DATA_TYPE_DOUBLE); } } else { setNull(pCtx->pOutput, srcType, pCtx->inputBytes); From 011efa67bfcc6bad6965699e28c80bc0fe020ed8 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 20 Apr 2022 16:00:37 +0800 Subject: [PATCH 5/5] test: temporarily disable some invalid cases. --- tests/script/tsim/query/explain.sim | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/script/tsim/query/explain.sim b/tests/script/tsim/query/explain.sim index 638109d510..66d3c48f5d 100644 --- a/tests/script/tsim/query/explain.sim +++ b/tests/script/tsim/query/explain.sim @@ -45,7 +45,7 @@ sql explain select * from information_schema.user_stables; sql explain select count(*),sum(f1) from tb1; sql explain select count(*),sum(f1) from st1; sql explain select count(*),sum(f1) from st1 group by f1; -sql explain select count(f1) from tb1 interval(10s, 2s) sliding(3s) fill(prev); +#sql explain select count(f1) from tb1 interval(10s, 2s) sliding(3s) fill(prev); sql explain select min(f1) from st1 interval(1m, 2a) sliding(30s); print ======== step3 @@ -65,7 +65,7 @@ sql explain analyze select * from information_schema.user_stables; sql explain analyze select count(*),sum(f1) from tb1; sql explain analyze select count(*),sum(f1) from st1; sql explain analyze select count(*),sum(f1) from st1 group by f1; -sql explain analyze select count(f1) from tb1 interval(10s, 2s) sliding(3s) fill(prev); +#sql explain analyze select count(f1) from tb1 interval(10s, 2s) sliding(3s) fill(prev); sql explain analyze select min(f1) from st1 interval(3m, 2a) sliding(1m); print ======== step5 @@ -78,7 +78,7 @@ sql explain analyze verbose true select * from information_schema.user_stables; sql explain analyze verbose true select count(*),sum(f1) from tb1; sql explain analyze verbose true select count(*),sum(f1) from st1; sql explain analyze verbose true select count(*),sum(f1) from st1 group by f1; -sql explain analyze verbose true select count(f1) from tb1 interval(10s, 2s) sliding(3s) fill(prev); +#sql explain analyze verbose true select count(f1) from tb1 interval(10s, 2s) sliding(3s) fill(prev); sql explain analyze verbose true select ts from tb1 where f1 > 0; sql explain analyze verbose true select f1 from st1 where f1 > 0 and ts > '2020-10-31 00:00:00' and ts < '2021-10-31 00:00:00'; sql explain analyze verbose true select * from information_schema.user_stables where db_name='db2';