diff --git a/include/libs/function/function.h b/include/libs/function/function.h
index 1303a1fb6a..eafe64c294 100644
--- a/include/libs/function/function.h
+++ b/include/libs/function/function.h
@@ -295,19 +295,19 @@ typedef struct SPoint {
void * val;
} SPoint;
-void taosFillSetStartInfo(struct SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey);
-void taosResetFillInfo(struct SFillInfo* pFillInfo, TSKEY startTimestamp);
-void taosFillSetInputDataBlock(struct SFillInfo* pFillInfo, const struct SSDataBlock* pInput);
-struct SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfOutput, const int64_t* fillVal);
-bool taosFillHasMoreResults(struct SFillInfo* pFillInfo);
-
-struct SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_t capacity, int32_t numOfCols,
- int64_t slidingTime, int8_t slidingUnit, int8_t precision, int32_t fillType,
- struct SFillColInfo* pFillCol, const char* id);
-
-void* taosDestroyFillInfo(struct SFillInfo *pFillInfo);
-int64_t taosFillResultDataBlock(struct SFillInfo* pFillInfo, void** output, int32_t capacity);
-int64_t getFillInfoStart(struct SFillInfo *pFillInfo);
+//void taosFillSetStartInfo(struct SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey);
+//void taosResetFillInfo(struct SFillInfo* pFillInfo, TSKEY startTimestamp);
+//void taosFillSetInputDataBlock(struct SFillInfo* pFillInfo, const struct SSDataBlock* pInput);
+//struct SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfOutput, const SValueNode* val);
+//bool taosFillHasMoreResults(struct SFillInfo* pFillInfo);
+//
+//struct SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_t capacity, int32_t numOfCols,
+// SInterval* pInterval, int32_t fillType,
+// struct SFillColInfo* pCol, const char* id);
+//
+//void* taosDestroyFillInfo(struct SFillInfo *pFillInfo);
+//int64_t taosFillResultDataBlock(struct SFillInfo* pFillInfo, void** output, int32_t capacity);
+//int64_t getFillInfoStart(struct SFillInfo *pFillInfo);
int32_t taosGetLinearInterpolationVal(SPoint* point, int32_t outputType, SPoint* point1, SPoint* point2, int32_t inputType);
diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h
index 4c9ed78769..9e48e12cb5 100644
--- a/source/libs/executor/inc/executorimpl.h
+++ b/source/libs/executor/inc/executorimpl.h
@@ -427,6 +427,7 @@ typedef struct STableIntervalOperatorInfo {
EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model]
SArray* pUpdatedWindow; // updated time window due to the input data block from the downstream operator.
STimeWindowAggSupp twAggSup;
+ struct SFillInfo* pFillInfo; // fill info
} STableIntervalOperatorInfo;
typedef struct SAggOperatorInfo {
@@ -467,7 +468,6 @@ typedef struct SFillOperatorInfo {
SSDataBlock* existNewGroupBlock;
bool multigroupResult;
SInterval intervalInfo;
- int32_t capacity;
} SFillOperatorInfo;
typedef struct {
@@ -609,7 +609,7 @@ int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t
int32_t initAggInfo(SOptrBasicInfo* pBasicInfo, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResultBlock, size_t keyBufSize, const char* pkey);
void initResultSizeInfo(SOperatorInfo* pOperator, int32_t numOfRows);
-void toSDatablock(SSDataBlock* pBlock, int32_t rowCapacity, SGroupResInfo* pGroupResInfo, SExprInfo* pExprInfo,
+void doBuildResultDatablock(SSDataBlock* pBlock, int32_t rowCapacity, SGroupResInfo* pGroupResInfo, SExprInfo* pExprInfo,
SDiskbasedBuf* pBuf, int32_t* rowCellOffset);
void finalizeMultiTupleQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SDiskbasedBuf* pBuf,
SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset);
@@ -621,7 +621,7 @@ void doDestroyBasicInfo(SOptrBasicInfo* pInfo, int32_t numOfOutput);
int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadInfo, int32_t numOfRows, char* pData,
int32_t compLen, int32_t numOfOutput, int64_t startTs, uint64_t* total,
SArray* pColList);
-void getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key, int64_t keyFirst, int64_t keyLast, STimeWindow* win);
+void getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key, STimeWindow* win);
void doSetOperatorCompleted(SOperatorInfo* pOperator);
void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock);
@@ -645,8 +645,7 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataB
SExecTaskInfo* pTaskInfo, bool showRewrite, int32_t accountId);
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
- STimeWindowAggSupp *pTwAggSupp, const STableGroupInfo* pTableGroupInfo,
- SExecTaskInfo* pTaskInfo);
+ STimeWindowAggSupp *pTwAggSupp, const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, int64_t gap, STimeWindowAggSupp *pTwAggSupp, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
diff --git a/source/libs/function/inc/tfill.h b/source/libs/executor/inc/tfill.h
similarity index 71%
rename from source/libs/function/inc/tfill.h
rename to source/libs/executor/inc/tfill.h
index b90dbf7799..26d066d9a9 100644
--- a/source/libs/function/inc/tfill.h
+++ b/source/libs/executor/inc/tfill.h
@@ -22,15 +22,18 @@ extern "C" {
#include "os.h"
#include "taosdef.h"
+#include "tcommon.h"
struct SSDataBlock;
typedef struct SFillColInfo {
- STColumn col; // column info
+// STColumn col; // column info
+ SResSchema col;
int16_t functionId; // sql function id
int16_t flag; // column flag: TAG COLUMN|NORMAL COLUMN
int16_t tagIndex; // index of current tag in SFillTagColInfo array list
- union {int64_t i; double d;} fillVal;
+ int32_t offset;
+ union {int64_t i; double d;} val;
} SFillColInfo;
typedef struct {
@@ -57,7 +60,6 @@ typedef struct SFillInfo {
char * nextValues; // next row of data
char** pData; // original result data block involved in filling data
int32_t alloc; // data buffer size in rows
- int8_t precision; // time resoluation
SFillColInfo* pFillCol; // column info for fill operations
SFillTagColInfo* pTags; // tags value for filling gap
@@ -67,7 +69,19 @@ typedef struct SFillInfo {
int64_t getNumOfResultsAfterFillGap(SFillInfo* pFillInfo, int64_t ekey, int32_t maxNumOfRows);
+void taosFillSetStartInfo(struct SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey);
+void taosResetFillInfo(struct SFillInfo* pFillInfo, TSKEY startTimestamp);
+void taosFillSetInputDataBlock(struct SFillInfo* pFillInfo, const struct SSDataBlock* pInput);
+struct SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfOutput, const struct SValueNode* val);
+bool taosFillHasMoreResults(struct SFillInfo* pFillInfo);
+SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_t capacity, int32_t numOfCols,
+ SInterval* pInterval, int32_t fillType,
+ struct SFillColInfo* pCol, const char* id);
+
+void* taosDestroyFillInfo(struct SFillInfo *pFillInfo);
+int64_t taosFillResultDataBlock(struct SFillInfo* pFillInfo, void** output, int32_t capacity);
+int64_t getFillInfoStart(struct SFillInfo *pFillInfo);
#ifdef __cplusplus
diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c
index cd129b54fb..7791a345ed 100644
--- a/source/libs/executor/src/executorimpl.c
+++ b/source/libs/executor/src/executorimpl.c
@@ -14,11 +14,12 @@
*/
#include "filter.h"
-#include "functionMgt.h"
#include "function.h"
+#include "functionMgt.h"
+#include "os.h"
#include "querynodes.h"
#include "tname.h"
-#include "os.h"
+#include "tfill.h"
#include "tdatablock.h"
#include "tglobal.h"
@@ -514,13 +515,12 @@ static SResultRow* doSetResultOutBufByKey_rv(SDiskbasedBuf* pResultBuf, SResultR
return pResult;
}
-static void getInitialStartTimeWindow(SInterval* pInterval, int32_t precision, TSKEY ts, STimeWindow* w, TSKEY ekey,
- bool ascQuery) {
+static void getInitialStartTimeWindow(SInterval* pInterval, int32_t precision, TSKEY ts, STimeWindow* w, bool ascQuery) {
if (ascQuery) {
- getAlignQueryTimeWindow(pInterval, precision, ts, ts, ekey, w);
+ getAlignQueryTimeWindow(pInterval, precision, ts, w);
} else {
// the start position of the first time window in the endpoint that spreads beyond the queried last timestamp
- getAlignQueryTimeWindow(pInterval, precision, ts, ekey, ts, w);
+ getAlignQueryTimeWindow(pInterval, precision, ts, w);
int64_t key = w->skey;
while (key < ts) { // moving towards end
@@ -540,7 +540,7 @@ static STimeWindow getActiveTimeWindow(SDiskbasedBuf * pBuf, SResultRowInfo* pRe
STimeWindow w = {0};
if (pResultRowInfo->cur.pageId == -1) { // the first window, from the previous stored value
- getInitialStartTimeWindow(pInterval, precision, ts, &w, win->ekey, true);
+ getInitialStartTimeWindow(pInterval, precision, ts, &w, true);
w.ekey = taosTimeAdd(w.skey, pInterval->interval, pInterval->intervalUnit, precision) - 1;
} else {
w = getResultRowByPos(pBuf, &pResultRowInfo->cur)->win;
@@ -2015,20 +2015,16 @@ static bool isCachedLastQuery(STaskAttr* pQueryAttr) {
/////////////////////////////////////////////////////////////////////////////////////////////
// todo refactor : return window
-void getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key, int64_t keyFirst, int64_t keyLast,
- STimeWindow* win) {
- ASSERT(key >= keyFirst && key <= keyLast);
+void getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key, STimeWindow* win) {
win->skey = taosTimeTruncate(key, pInterval, precision);
/*
* if the realSkey > INT64_MAX - pInterval->interval, the query duration between
* realSkey and realEkey must be less than one interval.Therefore, no need to adjust the query ranges.
*/
- if (keyFirst > (INT64_MAX - pInterval->interval)) {
- assert(keyLast - keyFirst < pInterval->interval);
+ win->ekey = taosTimeAdd(win->skey, pInterval->interval, pInterval->intervalUnit, precision) - 1;
+ if (win->ekey < win->skey) {
win->ekey = INT64_MAX;
- } else {
- win->ekey = taosTimeAdd(win->skey, pInterval->interval, pInterval->intervalUnit, precision) - 1;
}
}
@@ -3176,10 +3172,12 @@ int32_t doCopyToSDataBlock(SSDataBlock* pBlock, int32_t rowCapacity, SExprInfo*
// qDebug("QInfo:0x%"PRIx64" copy data to query buf completed", GET_TASKID(pRuntimeEnv));
pBlock->info.rows = numOfResult;
+ blockDataUpdateTsWindow(pBlock);
+
return 0;
}
-void toSDatablock(SSDataBlock* pBlock, int32_t rowCapacity, SGroupResInfo* pGroupResInfo, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf,
+void doBuildResultDatablock(SSDataBlock* pBlock, int32_t rowCapacity, SGroupResInfo* pGroupResInfo, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf,
int32_t* rowCellOffset) {
assert(pGroupResInfo->currentGroup <= pGroupResInfo->totalGroup);
@@ -4813,7 +4811,7 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator, bool* newgroup)
}
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
- toSDatablock(pInfo->pRes, pOperator->resultInfo.capacity, &pAggInfo->groupResInfo, pOperator->pExpr, pAggInfo->aggSup.pResultBuf, pInfo->rowCellInfoOffset);
+ doBuildResultDatablock(pInfo->pRes, pOperator->resultInfo.capacity, &pAggInfo->groupResInfo, pOperator->pExpr, pAggInfo->aggSup.pResultBuf, pInfo->rowCellInfoOffset);
if (pInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pAggInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator);
}
@@ -5126,6 +5124,8 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator, bool* newgro
return NULL;
}
+ SSDataBlock* pBlock = pInfo->binfo.pRes;
+
if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM) {
return pOperator->getStreamResFn(pOperator, newgroup);
} else {
@@ -5134,15 +5134,15 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator, bool* newgro
return NULL;
}
- blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
- toSDatablock(pInfo->binfo.pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr,
- pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset);
+ blockDataEnsureCapacity(pBlock, pOperator->resultInfo.capacity);
+ doBuildResultDatablock(pBlock, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr,
+ pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset);
- if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
+ if (pBlock->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator);
}
- return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes;
+ return pBlock->info.rows == 0 ? NULL : pBlock;
}
}
@@ -5155,7 +5155,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo *pOperator, bool* newgroup
}
if (pOperator->status == OP_RES_TO_RETURN) {
- toSDatablock(pInfo->binfo.pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset);
+ doBuildResultDatablock(pInfo->binfo.pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset);
if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
pOperator->status = OP_EXEC_DONE;
}
@@ -5190,7 +5190,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo *pOperator, bool* newgroup
initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
- toSDatablock(pInfo->binfo.pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset);
+ doBuildResultDatablock(pInfo->binfo.pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset);
ASSERT(pInfo->binfo.pRes->info.rows > 0);
pOperator->status = OP_RES_TO_RETURN;
@@ -5205,7 +5205,7 @@ static SSDataBlock* doAllIntervalAgg(SOperatorInfo *pOperator, bool* newgroup) {
STimeSliceOperatorInfo* pSliceInfo = pOperator->info;
if (pOperator->status == OP_RES_TO_RETURN) {
- // toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes);
+ // doBuildResultDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes);
if (pSliceInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pSliceInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator);
}
@@ -5238,7 +5238,7 @@ static SSDataBlock* doAllIntervalAgg(SOperatorInfo *pOperator, bool* newgroup) {
finalizeQueryResult(pSliceInfo->binfo.pCtx, pOperator->numOfOutput);
initGroupResInfo(&pSliceInfo->groupResInfo, &pSliceInfo->binfo.resultRowInfo);
- // toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pSliceInfo->pRes);
+ // doBuildResultDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pSliceInfo->pRes);
if (pSliceInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pSliceInfo->groupResInfo)) {
pOperator->status = OP_EXEC_DONE;
@@ -5292,7 +5292,7 @@ static SSDataBlock* doSTableIntervalAgg(SOperatorInfo* pOperator, bool* newgroup
OPTR_SET_OPENED(pOperator);
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
- toSDatablock(pInfo->binfo.pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr,
+ doBuildResultDatablock(pInfo->binfo.pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr,
pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset);
if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
@@ -5383,7 +5383,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator, bool* newgroup) {
SOptrBasicInfo* pBInfo = &pInfo->binfo;
if (pOperator->status == OP_RES_TO_RETURN) {
- toSDatablock(pBInfo->pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pBInfo->rowCellInfoOffset);
+ doBuildResultDatablock(pBInfo->pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pBInfo->rowCellInfoOffset);
if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator);
return NULL;
@@ -5415,7 +5415,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator, bool* newgroup) {
initGroupResInfo(&pInfo->groupResInfo, &pBInfo->resultRowInfo);
blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
- toSDatablock(pBInfo->pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pBInfo->rowCellInfoOffset);
+ doBuildResultDatablock(pBInfo->pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pBInfo->rowCellInfoOffset);
if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator);
}
@@ -5432,7 +5432,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator, bool* newgroup)
SOptrBasicInfo* pBInfo = &pInfo->binfo;
if (pOperator->status == OP_RES_TO_RETURN) {
- toSDatablock(pBInfo->pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pBInfo->rowCellInfoOffset);
+ doBuildResultDatablock(pBInfo->pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pBInfo->rowCellInfoOffset);
if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator);
return NULL;
@@ -5464,7 +5464,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator, bool* newgroup)
initGroupResInfo(&pInfo->groupResInfo, &pBInfo->resultRowInfo);
blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
- toSDatablock(pBInfo->pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pBInfo->rowCellInfoOffset);
+ doBuildResultDatablock(pBInfo->pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pBInfo->rowCellInfoOffset);
if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator);
}
@@ -5509,14 +5509,16 @@ static SSDataBlock* doFill(SOperatorInfo* pOperator, bool* newgroup) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SResultInfo* pResultInfo = &pOperator->resultInfo;
- blockDataCleanup(pInfo->pRes);
+ SSDataBlock* pResBlock = pInfo->pRes;
+
+ blockDataCleanup(pResBlock);
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
doHandleRemainBlockFromNewGroup(pInfo, pResultInfo, newgroup, pTaskInfo);
- if (pInfo->pRes->info.rows > pResultInfo->threshold || (!pInfo->multigroupResult && pInfo->pRes->info.rows > 0)) {
- return pInfo->pRes;
+ if (pResBlock->info.rows > pResultInfo->threshold || (!pInfo->multigroupResult && pResBlock->info.rows > 0)) {
+ return pResBlock;
}
SOperatorInfo* pDownstream = pOperator->pDownstream[0];
@@ -5551,25 +5553,25 @@ static SSDataBlock* doFill(SOperatorInfo* pOperator, bool* newgroup) {
}
}
- doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pInfo->capacity, pInfo->p);
+ doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pResBlock, pOperator->resultInfo.capacity, pInfo->p);
// current group has no more result to return
- if (pInfo->pRes->info.rows > 0) {
+ if (pResBlock->info.rows > 0) {
// 1. The result in current group not reach the threshold of output result, continue
// 2. If multiple group results existing in one SSDataBlock is not allowed, return immediately
- if (pInfo->pRes->info.rows > pResultInfo->threshold || pBlock == NULL || (!pInfo->multigroupResult)) {
- return pInfo->pRes;
+ if (pResBlock->info.rows > pResultInfo->threshold || pBlock == NULL || (!pInfo->multigroupResult)) {
+ return pResBlock;
}
doHandleRemainBlockFromNewGroup(pInfo, pResultInfo, newgroup, pTaskInfo);
- if (pInfo->pRes->info.rows > pOperator->resultInfo.threshold || pBlock == NULL) {
- return pInfo->pRes;
+ if (pResBlock->info.rows > pOperator->resultInfo.threshold || pBlock == NULL) {
+ return pResBlock;
}
} else if (pInfo->existNewGroupBlock) { // try next group
assert(pBlock != NULL);
doHandleRemainBlockForNewGroupImpl(pInfo, pResultInfo, newgroup, pTaskInfo);
- if (pInfo->pRes->info.rows > pResultInfo->threshold) {
- return pInfo->pRes;
+ if (pResBlock->info.rows > pResultInfo->threshold) {
+ return pResBlock;
}
} else {
return NULL;
@@ -5615,7 +5617,7 @@ int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t n
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pAggSup->resultRowSize = getResultRowSize(pCtx, numOfOutput);
- pAggSup->keyBuf = taosMemoryCalloc(1, keyBufSize);
+ pAggSup->keyBuf = taosMemoryCalloc(1, keyBufSize + POINTER_BYTES + sizeof(int64_t));
pAggSup->pResultRowHashTable = taosHashInit(10, hashFn, true, HASH_NO_LOCK);
pAggSup->pResultRowListSet = taosHashInit(100, hashFn, false, HASH_NO_LOCK);
pAggSup->pResultRowArrayList = taosArrayInit(10, sizeof(SResultRowCell));
@@ -5878,8 +5880,7 @@ _error:
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
- STimeWindowAggSupp *pTwAggSupp, const STableGroupInfo* pTableGroupInfo,
- SExecTaskInfo* pTaskInfo) {
+ STimeWindowAggSupp* pTwAggSupp, const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo) {
STableIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(STableIntervalOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
@@ -5890,10 +5891,8 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
pInfo->interval = *pInterval;
pInfo->execModel = pTaskInfo->execModel;
pInfo->win = pTaskInfo->window;
- pInfo->win.skey = 0;
- pInfo->win.ekey = INT64_MAX;
- pInfo->primaryTsIndex = primaryTsSlotId;
pInfo->twAggSup = *pTwAggSupp;
+ pInfo->primaryTsIndex = primaryTsSlotId;
int32_t numOfRows = 4096;
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
@@ -6063,18 +6062,14 @@ _error:
static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t numOfCols, int64_t* fillVal,
STimeWindow win, int32_t capacity, const char* id, SInterval* pInterval, int32_t fillType) {
- struct SFillColInfo* pColInfo = createFillColInfo(pExpr, numOfCols, (int64_t*)fillVal);
-
- TSKEY sk = TMIN(win.skey, win.ekey);
- TSKEY ek = TMAX(win.skey, win.ekey);
+ SFillColInfo* pColInfo = createFillColInfo(pExpr, numOfCols, NULL);
// TODO set correct time precision
STimeWindow w = TSWINDOW_INITIALIZER;
- getAlignQueryTimeWindow(pInterval, TSDB_TIME_PRECISION_MILLI, win.skey, sk, ek, &w);
+ getAlignQueryTimeWindow(pInterval, TSDB_TIME_PRECISION_MILLI, win.skey, &w);
int32_t order = TSDB_ORDER_ASC;
- pInfo->pFillInfo = taosCreateFillInfo(order, w.skey, 0, capacity, numOfCols, pInterval->sliding,
- pInterval->slidingUnit, (int8_t)pInterval->precision, fillType, pColInfo, id);
+ pInfo->pFillInfo = taosCreateFillInfo(order, w.skey, 0, capacity, numOfCols, pInterval, fillType, pColInfo, id);
pInfo->p = taosMemoryCalloc(numOfCols, POINTER_BYTES);
@@ -6095,23 +6090,37 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExp
pInfo->multigroupResult = multigroupResult;
pInfo->intervalInfo = *pInterval;
+ int32_t type = TSDB_FILL_NONE;
+ switch (fillType) {
+ case FILL_MODE_PREV: type = TSDB_FILL_PREV;break;
+ case FILL_MODE_NONE: type = TSDB_FILL_NONE;break;
+ case FILL_MODE_NULL: type = TSDB_FILL_NULL;break;
+ case FILL_MODE_NEXT: type = TSDB_FILL_NEXT;break;
+ case FILL_MODE_VALUE: type = TSDB_FILL_SET_VALUE;break;
+ case FILL_MODE_LINEAR: type = TSDB_FILL_LINEAR;break;
+ default:
+ type = TSDB_FILL_NONE;
+ }
+
SResultInfo* pResultInfo = &pOperator->resultInfo;
- int32_t code = initFillInfo(pInfo, pExpr, numOfCols, (int64_t*)fillVal, pTaskInfo->window, pResultInfo->capacity,
- pTaskInfo->id.str, pInterval, fillType);
+ initResultSizeInfo(pOperator, 4096);
+
+ int32_t code = initFillInfo(pInfo, pExpr, numOfCols, (int64_t*)fillVal, pTaskInfo->window, pResultInfo->capacity,
+ pTaskInfo->id.str, pInterval, type);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
- pOperator->name = "FillOperator";
+ pOperator->name = "FillOperator";
pOperator->blockingOptr = false;
- pOperator->status = OP_NOT_OPENED;
+ pOperator->status = OP_NOT_OPENED;
// pOperator->operatorType = OP_Fill;
- pOperator->pExpr = pExpr;
- pOperator->numOfOutput = numOfCols;
- pOperator->info = pInfo;
- pOperator->_openFn = operatorDummyOpenFn;
- pOperator->getNextFn = doFill;
- pOperator->pTaskInfo = pTaskInfo;
+ pOperator->pExpr = pExpr;
+ pOperator->numOfOutput = numOfCols;
+ pOperator->info = pInfo;
+ pOperator->_openFn = operatorDummyOpenFn;
+ pOperator->getNextFn = doFill;
+ pOperator->pTaskInfo = pTaskInfo;
pOperator->closeFn = destroySFillOperatorInfo;
@@ -6582,6 +6591,11 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
int32_t primaryTsSlotId = ((SColumnNode*) pIntervalPhyNode->window.pTspk)->slotId;
pOptr = createIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, primaryTsSlotId, &as, pTableGroupInfo, pTaskInfo);
+
+ if (pIntervalPhyNode->pFill != NULL) {
+ pOptr = createFillOperatorInfo(pOptr, pExprInfo, num, &interval, pResBlock, pIntervalPhyNode->pFill->mode, NULL, false, pTaskInfo);
+ }
+
} else if (QUERY_NODE_PHYSICAL_PLAN_SORT == type) {
SSortPhysiNode* pSortPhyNode = (SSortPhysiNode*)pPhyNode;
diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c
index d610880e30..9f75f97632 100644
--- a/source/libs/executor/src/groupoperator.c
+++ b/source/libs/executor/src/groupoperator.c
@@ -265,7 +265,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator, bool* newgrou
SSDataBlock* pRes = pInfo->binfo.pRes;
if (pOperator->status == OP_RES_TO_RETURN) {
- toSDatablock(pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset);
+ doBuildResultDatablock(pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset);
if (pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
pOperator->status = OP_EXEC_DONE;
}
@@ -311,7 +311,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator, bool* newgrou
initGroupResInfo(&pInfo->groupResInfo, &pInfo->binfo.resultRowInfo);
while(1) {
- toSDatablock(pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset);
+ doBuildResultDatablock(pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset);
doFilter(pInfo->pCondition, pRes);
bool hasRemain = hasRemainDataInCurrentGroup(&pInfo->groupResInfo);
diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c
index dc6bb93814..f0d3d95b6a 100644
--- a/source/libs/executor/src/scanoperator.c
+++ b/source/libs/executor/src/scanoperator.c
@@ -129,7 +129,7 @@ static bool overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockIn
// TSKEY ek = MAX(pQueryAttr->window.skey, pQueryAttr->window.ekey);
if (true) {
- getAlignQueryTimeWindow(pInterval, pInterval->precision, pBlockInfo->window.skey, sk, ek, &w);
+ getAlignQueryTimeWindow(pInterval, pInterval->precision, pBlockInfo->window.skey, &w);
assert(w.ekey >= pBlockInfo->window.skey);
if (w.ekey < pBlockInfo->window.ekey) {
diff --git a/source/libs/function/src/tfill.c b/source/libs/executor/src/tfill.c
similarity index 88%
rename from source/libs/function/src/tfill.c
rename to source/libs/executor/src/tfill.c
index b6b5362187..aeed07c636 100644
--- a/source/libs/function/src/tfill.c
+++ b/source/libs/executor/src/tfill.c
@@ -13,17 +13,18 @@
* along with this program. If not, see .
*/
-#include
+#include "function.h"
#include "os.h"
+#include "querynodes.h"
#include "taosdef.h"
#include "tmsg.h"
#include "ttypes.h"
#include "tfill.h"
-#include "thash.h"
#include "function.h"
#include "tcommon.h"
+#include "thash.h"
#include "ttime.h"
#define FILL_IS_ASC_FILL(_f) ((_f)->order == TSDB_ORDER_ASC)
@@ -41,7 +42,7 @@ static void setTagsValue(SFillInfo* pFillInfo, void** data, int32_t genRows) {
assert(pCol->tagIndex >= 0 && pCol->tagIndex < pFillInfo->numOfTags);
SFillTagColInfo* pTag = &pFillInfo->pTags[pCol->tagIndex];
- assert (pTag->col.colId == pCol->col.colId);
+// assert (pTag->col.colId == pCol->col.colId);
assignVal(val1, pTag->tagVal, pCol->col.bytes, pCol->col.type);
}
}
@@ -80,7 +81,7 @@ static void doFillOneRowResult(SFillInfo* pFillInfo, void** data, char** srcData
}
char* output = elePtrAt(data[i], pCol->col.bytes, index);
- assignVal(output, p + pCol->col.offset, pCol->col.bytes, pCol->col.type);
+// assignVal(output, p + pCol->offset, pCol->col.bytes, pCol->col.type);
}
} else { // no prev value yet, set the value for NULL
setNullValueForRow(pFillInfo, data, pFillInfo->numOfCols, index);
@@ -96,7 +97,7 @@ static void doFillOneRowResult(SFillInfo* pFillInfo, void** data, char** srcData
}
char* output = elePtrAt(data[i], pCol->col.bytes, index);
- assignVal(output, p + pCol->col.offset, pCol->col.bytes, pCol->col.type);
+// assignVal(output, p + pCol->offset, pCol->col.bytes, pCol->col.type);
}
} else { // no prev value yet, set the value for NULL
setNullValueForRow(pFillInfo, data, pFillInfo->numOfCols, index);
@@ -119,7 +120,7 @@ static void doFillOneRowResult(SFillInfo* pFillInfo, void** data, char** srcData
continue;
}
- point1 = (SPoint){.key = *(TSKEY*)(prev), .val = prev + pCol->col.offset};
+ point1 = (SPoint){.key = *(TSKEY*)(prev), .val = prev + pCol->offset};
point2 = (SPoint){.key = ts, .val = srcData[i] + pFillInfo->index * bytes};
point = (SPoint){.key = pFillInfo->currentKey, .val = val1};
taosGetLinearInterpolationVal(&point, type, &point1, &point2, type);
@@ -135,12 +136,13 @@ static void doFillOneRowResult(SFillInfo* pFillInfo, void** data, char** srcData
}
char* val1 = elePtrAt(data[i], pCol->col.bytes, index);
- assignVal(val1, (char*)&pCol->fillVal.i, pCol->col.bytes, pCol->col.type);
+ assignVal(val1, (char*)&pCol->val, pCol->col.bytes, pCol->col.type);
}
}
setTagsValue(pFillInfo, data, index);
- pFillInfo->currentKey = taosTimeAdd(pFillInfo->currentKey, pFillInfo->interval.sliding * step, pFillInfo->interval.slidingUnit, pFillInfo->precision);
+ pFillInfo->currentKey = taosTimeAdd(pFillInfo->currentKey, pFillInfo->interval.sliding * step, pFillInfo->interval.slidingUnit,
+ pFillInfo->interval.precision);
pFillInfo->numOfCurrent++;
}
@@ -152,7 +154,7 @@ static void initBeforeAfterDataBuf(SFillInfo* pFillInfo, char** next) {
*next = taosMemoryCalloc(1, pFillInfo->rowSize);
for (int i = 1; i < pFillInfo->numOfCols; i++) {
SFillColInfo* pCol = &pFillInfo->pFillCol[i];
- setNull(*next + pCol->col.offset, pCol->col.type, pCol->col.bytes);
+ setNull(*next + pCol->offset, pCol->col.type, pCol->col.bytes);
}
}
@@ -160,7 +162,7 @@ static void copyCurrentRowIntoBuf(SFillInfo* pFillInfo, char** srcData, char* bu
int32_t rowIndex = pFillInfo->index;
for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
SFillColInfo* pCol = &pFillInfo->pFillCol[i];
- memcpy(buf + pCol->col.offset, srcData[i] + rowIndex * pCol->col.bytes, pCol->col.bytes);
+ memcpy(buf + pCol->offset, srcData[i] + rowIndex * pCol->col.bytes, pCol->col.bytes);
}
}
@@ -227,21 +229,21 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, void** data, int32_t outputR
if (i == 0 || (pCol->functionId != FUNCTION_COUNT && !isNull(src, pCol->col.type)) ||
(pCol->functionId == FUNCTION_COUNT && GET_INT64_VAL(src) != 0)) {
assignVal(output, src, pCol->col.bytes, pCol->col.type);
- memcpy(*prev + pCol->col.offset, src, pCol->col.bytes);
+ memcpy(*prev + pCol->offset, src, pCol->col.bytes);
} else { // i > 0 and data is null , do interpolation
if (pFillInfo->type == TSDB_FILL_PREV) {
- assignVal(output, *prev + pCol->col.offset, pCol->col.bytes, pCol->col.type);
+ assignVal(output, *prev + pCol->offset, pCol->col.bytes, pCol->col.type);
} else if (pFillInfo->type == TSDB_FILL_LINEAR) {
assignVal(output, src, pCol->col.bytes, pCol->col.type);
- memcpy(*prev + pCol->col.offset, src, pCol->col.bytes);
+ memcpy(*prev + pCol->offset, src, pCol->col.bytes);
} else if (pFillInfo->type == TSDB_FILL_NEXT) {
if (*next) {
- assignVal(output, *next + pCol->col.offset, pCol->col.bytes, pCol->col.type);
+ assignVal(output, *next + pCol->offset, pCol->col.bytes, pCol->col.type);
} else {
setNull(output, pCol->col.type, pCol->col.bytes);
}
} else {
- assignVal(output, (char*)&pCol->fillVal.i, pCol->col.bytes, pCol->col.type);
+ assignVal(output, (char*)&pCol->val, pCol->col.bytes, pCol->col.type);
}
}
}
@@ -250,7 +252,7 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, void** data, int32_t outputR
setTagsValue(pFillInfo, data, pFillInfo->numOfCurrent);
pFillInfo->currentKey = taosTimeAdd(pFillInfo->currentKey, pFillInfo->interval.sliding * step,
- pFillInfo->interval.slidingUnit, pFillInfo->precision);
+ pFillInfo->interval.slidingUnit, pFillInfo->interval.precision);
pFillInfo->index += 1;
pFillInfo->numOfCurrent += 1;
}
@@ -301,7 +303,7 @@ static int32_t setTagColumnInfo(SFillInfo* pFillInfo, int32_t numOfCols, int32_t
bool exists = false;
int32_t index = -1;
for (int32_t j = 0; j < k; ++j) {
- if (pFillInfo->pTags[j].col.colId == pColInfo->col.colId) {
+ if (pFillInfo->pTags[j].col.colId == pColInfo->col.slotId) {
exists = true;
index = j;
break;
@@ -310,7 +312,7 @@ static int32_t setTagColumnInfo(SFillInfo* pFillInfo, int32_t numOfCols, int32_t
if (!exists) {
SSchema* pSchema = &pFillInfo->pTags[k].col;
- pSchema->colId = pColInfo->col.colId;
+ pSchema->colId = pColInfo->col.slotId;
pSchema->type = pColInfo->col.type;
pSchema->bytes = pColInfo->col.bytes;
@@ -341,30 +343,40 @@ static int32_t taosNumOfRemainRows(SFillInfo* pFillInfo) {
}
struct SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_t capacity, int32_t numOfCols,
- int64_t slidingTime, int8_t slidingUnit, int8_t precision, int32_t fillType,
- struct SFillColInfo* pCol, const char* id) {
+ SInterval* pInterval, int32_t fillType, struct SFillColInfo* pCol, const char* id) {
if (fillType == TSDB_FILL_NONE) {
return NULL;
}
SFillInfo* pFillInfo = taosMemoryCalloc(1, sizeof(SFillInfo));
+ if (pFillInfo == NULL) {
+ terrno = TSDB_CODE_OUT_OF_MEMORY;
+ return NULL;
+ }
+
taosResetFillInfo(pFillInfo, skey);
- pFillInfo->order = order;
+ pFillInfo->order = order;
+
+ switch(fillType) {
+ case FILL_MODE_NONE: pFillInfo->type = TSDB_FILL_NONE; break;
+ case FILL_MODE_PREV: pFillInfo->type = TSDB_FILL_PREV; break;
+ case FILL_MODE_NULL: pFillInfo->type = TSDB_FILL_NULL; break;
+ case FILL_MODE_LINEAR: pFillInfo->type = TSDB_FILL_LINEAR;break;
+ case FILL_MODE_NEXT: pFillInfo->type = TSDB_FILL_NEXT; break;
+ default:
+ terrno = TSDB_CODE_INVALID_PARA;
+ return NULL;
+ }
+
pFillInfo->type = fillType;
pFillInfo->pFillCol = pCol;
pFillInfo->numOfTags = numOfTags;
pFillInfo->numOfCols = numOfCols;
- pFillInfo->precision = precision;
pFillInfo->alloc = capacity;
pFillInfo->id = id;
-
- pFillInfo->interval.interval = slidingTime;
- pFillInfo->interval.intervalUnit = slidingUnit;
- pFillInfo->interval.sliding = slidingTime;
- pFillInfo->interval.slidingUnit = slidingUnit;
-
- pFillInfo->pData = taosMemoryMalloc(POINTER_BYTES * numOfCols);
+ pFillInfo->interval = *pInterval;
+ pFillInfo->pData = taosMemoryMalloc(POINTER_BYTES * numOfCols);
// if (numOfTags > 0) {
pFillInfo->pTags = taosMemoryCalloc(numOfCols, sizeof(SFillTagColInfo));
@@ -375,7 +387,6 @@ struct SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTag
pFillInfo->rowSize = setTagColumnInfo(pFillInfo, pFillInfo->numOfCols, pFillInfo->alloc);
assert(pFillInfo->rowSize > 0);
-
return pFillInfo;
}
@@ -417,7 +428,7 @@ void taosFillSetStartInfo(SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey)
pFillInfo->end = endKey;
if (!FILL_IS_ASC_FILL(pFillInfo)) {
- pFillInfo->end = taosTimeTruncate(endKey, &pFillInfo->interval, pFillInfo->precision);
+ pFillInfo->end = taosTimeTruncate(endKey, &pFillInfo->interval, pFillInfo->interval.precision);
}
pFillInfo->index = 0;
@@ -433,7 +444,7 @@ void taosFillSetInputDataBlock(SFillInfo* pFillInfo, const SSDataBlock* pInput)
if (TSDB_COL_IS_TAG(pCol->flag)) { // copy the tag value to tag value buffer
SFillTagColInfo* pTag = &pFillInfo->pTags[pCol->tagIndex];
- assert (pTag->col.colId == pCol->col.colId);
+ assert (pTag->col.colId == pCol->col.slotId);
memcpy(pTag->tagVal, pColData->pData, pCol->col.bytes); // TODO not memcpy??
}
}
@@ -460,7 +471,7 @@ int64_t getNumOfResultsAfterFillGap(SFillInfo* pFillInfo, TSKEY ekey, int32_t ma
TSKEY ekey1 = ekey;
if (!FILL_IS_ASC_FILL(pFillInfo)) {
- pFillInfo->end = taosTimeTruncate(ekey, &pFillInfo->interval, pFillInfo->precision);
+ pFillInfo->end = taosTimeTruncate(ekey, &pFillInfo->interval, pFillInfo->interval.precision);
}
int64_t numOfRes = -1;
@@ -471,7 +482,7 @@ int64_t getNumOfResultsAfterFillGap(SFillInfo* pFillInfo, TSKEY ekey, int32_t ma
pFillInfo->currentKey,
pFillInfo->interval.sliding,
pFillInfo->interval.slidingUnit,
- pFillInfo->precision);
+ pFillInfo->interval.precision);
numOfRes += 1;
assert(numOfRes >= numOfRows);
} else { // reach the end of data
@@ -484,7 +495,7 @@ int64_t getNumOfResultsAfterFillGap(SFillInfo* pFillInfo, TSKEY ekey, int32_t ma
pFillInfo->currentKey,
pFillInfo->interval.sliding,
pFillInfo->interval.slidingUnit,
- pFillInfo->precision);
+ pFillInfo->interval.precision);
numOfRes += 1;
}
@@ -527,7 +538,7 @@ int64_t getFillInfoStart(struct SFillInfo *pFillInfo) {
return pFillInfo->start;
}
-struct SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfOutput, const int64_t* fillVal) {
+struct SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfOutput, const struct SValueNode* val) {
int32_t offset = 0;
struct SFillColInfo* pFillCol = taosMemoryCalloc(numOfOutput, sizeof(SFillColInfo));
@@ -538,14 +549,15 @@ struct SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfOutput, co
for(int32_t i = 0; i < numOfOutput; ++i) {
SExprInfo* pExprInfo = &pExpr[i];
- pFillCol[i].col.bytes = pExprInfo->base.resSchema.bytes;
- pFillCol[i].col.type = (int8_t)pExprInfo->base.resSchema.type;
- pFillCol[i].col.offset = offset;
- pFillCol[i].col.colId = pExprInfo->base.resSchema.slotId;
+ pFillCol[i].col = pExprInfo->base.resSchema;
+ pFillCol[i].offset = offset;
pFillCol[i].tagIndex = -2;
- pFillCol[i].flag = pExprInfo->base.pParam[0].pCol->flag; // always be the normal column for table query
+
+ if (pExprInfo->base.numOfParams > 0) {
+ pFillCol[i].flag = pExprInfo->base.pParam[0].pCol->flag; // always be the normal column for table query
+ }
// pFillCol[i].functionId = pExprInfo->pExpr->_function.functionId;
- pFillCol[i].fillVal.i = fillVal[i];
+// pFillCol[i].val.d = *val;
offset += pExprInfo->base.resSchema.bytes;
}
diff --git a/source/libs/function/src/taggfunction.c b/source/libs/function/src/taggfunction.c
index 7f55373463..56ee9bc9ae 100644
--- a/source/libs/function/src/taggfunction.c
+++ b/source/libs/function/src/taggfunction.c
@@ -19,14 +19,14 @@
#include "thash.h"
#include "ttypes.h"
+//#include "tfill.h"
#include "function.h"
#include "taggfunction.h"
-#include "tfill.h"
-#include "thistogram.h"
-#include "ttszip.h"
-#include "tpercentile.h"
#include "tbuffer.h"
#include "tcompression.h"
+#include "thistogram.h"
+#include "tpercentile.h"
+#include "ttszip.h"
//#include "queryLog.h"
#include "tdatablock.h"
#include "tudf.h"
@@ -3608,7 +3608,7 @@ static void interp_function_impl(SqlFunctionCtx *pCtx) {
if (isNull((char *)&pCtx->start.val, srcType) || isNull((char *)&pCtx->end.val, srcType)) {
setNull(pCtx->pOutput, srcType, pCtx->inputBytes);
} else {
- taosGetLinearInterpolationVal(&point, pCtx->resDataInfo.type, &point1, &point2, TSDB_DATA_TYPE_DOUBLE);
+// taosGetLinearInterpolationVal(&point, pCtx->resDataInfo.type, &point1, &point2, TSDB_DATA_TYPE_DOUBLE);
}
} else {
setNull(pCtx->pOutput, srcType, pCtx->inputBytes);
@@ -3681,7 +3681,7 @@ static void interp_function_impl(SqlFunctionCtx *pCtx) {
if (isNull(start, srcType) || isNull(end, srcType)) {
setNull(pCtx->pOutput, srcType, pCtx->inputBytes);
} else {
- taosGetLinearInterpolationVal(&point, pCtx->resDataInfo.type, &point1, &point2, srcType);
+// taosGetLinearInterpolationVal(&point, pCtx->resDataInfo.type, &point1, &point2, srcType);
}
} else {
setNull(pCtx->pOutput, srcType, pCtx->inputBytes);
diff --git a/tests/script/tsim/query/explain.sim b/tests/script/tsim/query/explain.sim
index 638109d510..66d3c48f5d 100644
--- a/tests/script/tsim/query/explain.sim
+++ b/tests/script/tsim/query/explain.sim
@@ -45,7 +45,7 @@ sql explain select * from information_schema.user_stables;
sql explain select count(*),sum(f1) from tb1;
sql explain select count(*),sum(f1) from st1;
sql explain select count(*),sum(f1) from st1 group by f1;
-sql explain select count(f1) from tb1 interval(10s, 2s) sliding(3s) fill(prev);
+#sql explain select count(f1) from tb1 interval(10s, 2s) sliding(3s) fill(prev);
sql explain select min(f1) from st1 interval(1m, 2a) sliding(30s);
print ======== step3
@@ -65,7 +65,7 @@ sql explain analyze select * from information_schema.user_stables;
sql explain analyze select count(*),sum(f1) from tb1;
sql explain analyze select count(*),sum(f1) from st1;
sql explain analyze select count(*),sum(f1) from st1 group by f1;
-sql explain analyze select count(f1) from tb1 interval(10s, 2s) sliding(3s) fill(prev);
+#sql explain analyze select count(f1) from tb1 interval(10s, 2s) sliding(3s) fill(prev);
sql explain analyze select min(f1) from st1 interval(3m, 2a) sliding(1m);
print ======== step5
@@ -78,7 +78,7 @@ sql explain analyze verbose true select * from information_schema.user_stables;
sql explain analyze verbose true select count(*),sum(f1) from tb1;
sql explain analyze verbose true select count(*),sum(f1) from st1;
sql explain analyze verbose true select count(*),sum(f1) from st1 group by f1;
-sql explain analyze verbose true select count(f1) from tb1 interval(10s, 2s) sliding(3s) fill(prev);
+#sql explain analyze verbose true select count(f1) from tb1 interval(10s, 2s) sliding(3s) fill(prev);
sql explain analyze verbose true select ts from tb1 where f1 > 0;
sql explain analyze verbose true select f1 from st1 where f1 > 0 and ts > '2020-10-31 00:00:00' and ts < '2021-10-31 00:00:00';
sql explain analyze verbose true select * from information_schema.user_stables where db_name='db2';