Merge pull request #11664 from taosdata/feature/3.0_liaohj
fix(query): fix invalid write in distinct query.
This commit is contained in:
commit
490f7f3b14
|
@ -295,19 +295,19 @@ typedef struct SPoint {
|
||||||
void * val;
|
void * val;
|
||||||
} SPoint;
|
} SPoint;
|
||||||
|
|
||||||
void taosFillSetStartInfo(struct SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey);
|
//void taosFillSetStartInfo(struct SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey);
|
||||||
void taosResetFillInfo(struct SFillInfo* pFillInfo, TSKEY startTimestamp);
|
//void taosResetFillInfo(struct SFillInfo* pFillInfo, TSKEY startTimestamp);
|
||||||
void taosFillSetInputDataBlock(struct SFillInfo* pFillInfo, const struct SSDataBlock* pInput);
|
//void taosFillSetInputDataBlock(struct SFillInfo* pFillInfo, const struct SSDataBlock* pInput);
|
||||||
struct SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfOutput, const int64_t* fillVal);
|
//struct SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfOutput, const SValueNode* val);
|
||||||
bool taosFillHasMoreResults(struct SFillInfo* pFillInfo);
|
//bool taosFillHasMoreResults(struct SFillInfo* pFillInfo);
|
||||||
|
//
|
||||||
struct SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_t capacity, int32_t numOfCols,
|
//struct SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_t capacity, int32_t numOfCols,
|
||||||
int64_t slidingTime, int8_t slidingUnit, int8_t precision, int32_t fillType,
|
// SInterval* pInterval, int32_t fillType,
|
||||||
struct SFillColInfo* pFillCol, const char* id);
|
// struct SFillColInfo* pCol, const char* id);
|
||||||
|
//
|
||||||
void* taosDestroyFillInfo(struct SFillInfo *pFillInfo);
|
//void* taosDestroyFillInfo(struct SFillInfo *pFillInfo);
|
||||||
int64_t taosFillResultDataBlock(struct SFillInfo* pFillInfo, void** output, int32_t capacity);
|
//int64_t taosFillResultDataBlock(struct SFillInfo* pFillInfo, void** output, int32_t capacity);
|
||||||
int64_t getFillInfoStart(struct SFillInfo *pFillInfo);
|
//int64_t getFillInfoStart(struct SFillInfo *pFillInfo);
|
||||||
|
|
||||||
int32_t taosGetLinearInterpolationVal(SPoint* point, int32_t outputType, SPoint* point1, SPoint* point2, int32_t inputType);
|
int32_t taosGetLinearInterpolationVal(SPoint* point, int32_t outputType, SPoint* point1, SPoint* point2, int32_t inputType);
|
||||||
|
|
||||||
|
|
|
@ -427,6 +427,7 @@ typedef struct STableIntervalOperatorInfo {
|
||||||
EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model]
|
EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model]
|
||||||
SArray* pUpdatedWindow; // updated time window due to the input data block from the downstream operator.
|
SArray* pUpdatedWindow; // updated time window due to the input data block from the downstream operator.
|
||||||
STimeWindowAggSupp twAggSup;
|
STimeWindowAggSupp twAggSup;
|
||||||
|
struct SFillInfo* pFillInfo; // fill info
|
||||||
} STableIntervalOperatorInfo;
|
} STableIntervalOperatorInfo;
|
||||||
|
|
||||||
typedef struct SAggOperatorInfo {
|
typedef struct SAggOperatorInfo {
|
||||||
|
@ -467,7 +468,6 @@ typedef struct SFillOperatorInfo {
|
||||||
SSDataBlock* existNewGroupBlock;
|
SSDataBlock* existNewGroupBlock;
|
||||||
bool multigroupResult;
|
bool multigroupResult;
|
||||||
SInterval intervalInfo;
|
SInterval intervalInfo;
|
||||||
int32_t capacity;
|
|
||||||
} SFillOperatorInfo;
|
} SFillOperatorInfo;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
@ -609,7 +609,7 @@ int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t
|
||||||
int32_t initAggInfo(SOptrBasicInfo* pBasicInfo, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols,
|
int32_t initAggInfo(SOptrBasicInfo* pBasicInfo, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||||
SSDataBlock* pResultBlock, size_t keyBufSize, const char* pkey);
|
SSDataBlock* pResultBlock, size_t keyBufSize, const char* pkey);
|
||||||
void initResultSizeInfo(SOperatorInfo* pOperator, int32_t numOfRows);
|
void initResultSizeInfo(SOperatorInfo* pOperator, int32_t numOfRows);
|
||||||
void toSDatablock(SSDataBlock* pBlock, int32_t rowCapacity, SGroupResInfo* pGroupResInfo, SExprInfo* pExprInfo,
|
void doBuildResultDatablock(SSDataBlock* pBlock, int32_t rowCapacity, SGroupResInfo* pGroupResInfo, SExprInfo* pExprInfo,
|
||||||
SDiskbasedBuf* pBuf, int32_t* rowCellOffset);
|
SDiskbasedBuf* pBuf, int32_t* rowCellOffset);
|
||||||
void finalizeMultiTupleQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SDiskbasedBuf* pBuf,
|
void finalizeMultiTupleQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SDiskbasedBuf* pBuf,
|
||||||
SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset);
|
SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset);
|
||||||
|
@ -621,7 +621,7 @@ void doDestroyBasicInfo(SOptrBasicInfo* pInfo, int32_t numOfOutput);
|
||||||
int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadInfo, int32_t numOfRows, char* pData,
|
int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadInfo, int32_t numOfRows, char* pData,
|
||||||
int32_t compLen, int32_t numOfOutput, int64_t startTs, uint64_t* total,
|
int32_t compLen, int32_t numOfOutput, int64_t startTs, uint64_t* total,
|
||||||
SArray* pColList);
|
SArray* pColList);
|
||||||
void getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key, int64_t keyFirst, int64_t keyLast, STimeWindow* win);
|
void getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key, STimeWindow* win);
|
||||||
|
|
||||||
void doSetOperatorCompleted(SOperatorInfo* pOperator);
|
void doSetOperatorCompleted(SOperatorInfo* pOperator);
|
||||||
void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock);
|
void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock);
|
||||||
|
@ -645,8 +645,7 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataB
|
||||||
SExecTaskInfo* pTaskInfo, bool showRewrite, int32_t accountId);
|
SExecTaskInfo* pTaskInfo, bool showRewrite, int32_t accountId);
|
||||||
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||||
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
|
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
|
||||||
STimeWindowAggSupp *pTwAggSupp, const STableGroupInfo* pTableGroupInfo,
|
STimeWindowAggSupp *pTwAggSupp, const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo);
|
||||||
SExecTaskInfo* pTaskInfo);
|
|
||||||
SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||||
SSDataBlock* pResBlock, int64_t gap, STimeWindowAggSupp *pTwAggSupp, SExecTaskInfo* pTaskInfo);
|
SSDataBlock* pResBlock, int64_t gap, STimeWindowAggSupp *pTwAggSupp, SExecTaskInfo* pTaskInfo);
|
||||||
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||||
|
|
|
@ -22,15 +22,18 @@ extern "C" {
|
||||||
|
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
#include "taosdef.h"
|
#include "taosdef.h"
|
||||||
|
#include "tcommon.h"
|
||||||
|
|
||||||
struct SSDataBlock;
|
struct SSDataBlock;
|
||||||
|
|
||||||
typedef struct SFillColInfo {
|
typedef struct SFillColInfo {
|
||||||
STColumn col; // column info
|
// STColumn col; // column info
|
||||||
|
SResSchema col;
|
||||||
int16_t functionId; // sql function id
|
int16_t functionId; // sql function id
|
||||||
int16_t flag; // column flag: TAG COLUMN|NORMAL COLUMN
|
int16_t flag; // column flag: TAG COLUMN|NORMAL COLUMN
|
||||||
int16_t tagIndex; // index of current tag in SFillTagColInfo array list
|
int16_t tagIndex; // index of current tag in SFillTagColInfo array list
|
||||||
union {int64_t i; double d;} fillVal;
|
int32_t offset;
|
||||||
|
union {int64_t i; double d;} val;
|
||||||
} SFillColInfo;
|
} SFillColInfo;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
@ -57,7 +60,6 @@ typedef struct SFillInfo {
|
||||||
char * nextValues; // next row of data
|
char * nextValues; // next row of data
|
||||||
char** pData; // original result data block involved in filling data
|
char** pData; // original result data block involved in filling data
|
||||||
int32_t alloc; // data buffer size in rows
|
int32_t alloc; // data buffer size in rows
|
||||||
int8_t precision; // time resoluation
|
|
||||||
|
|
||||||
SFillColInfo* pFillCol; // column info for fill operations
|
SFillColInfo* pFillCol; // column info for fill operations
|
||||||
SFillTagColInfo* pTags; // tags value for filling gap
|
SFillTagColInfo* pTags; // tags value for filling gap
|
||||||
|
@ -67,7 +69,19 @@ typedef struct SFillInfo {
|
||||||
int64_t getNumOfResultsAfterFillGap(SFillInfo* pFillInfo, int64_t ekey, int32_t maxNumOfRows);
|
int64_t getNumOfResultsAfterFillGap(SFillInfo* pFillInfo, int64_t ekey, int32_t maxNumOfRows);
|
||||||
|
|
||||||
|
|
||||||
|
void taosFillSetStartInfo(struct SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey);
|
||||||
|
void taosResetFillInfo(struct SFillInfo* pFillInfo, TSKEY startTimestamp);
|
||||||
|
void taosFillSetInputDataBlock(struct SFillInfo* pFillInfo, const struct SSDataBlock* pInput);
|
||||||
|
struct SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfOutput, const struct SValueNode* val);
|
||||||
|
bool taosFillHasMoreResults(struct SFillInfo* pFillInfo);
|
||||||
|
|
||||||
|
SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_t capacity, int32_t numOfCols,
|
||||||
|
SInterval* pInterval, int32_t fillType,
|
||||||
|
struct SFillColInfo* pCol, const char* id);
|
||||||
|
|
||||||
|
void* taosDestroyFillInfo(struct SFillInfo *pFillInfo);
|
||||||
|
int64_t taosFillResultDataBlock(struct SFillInfo* pFillInfo, void** output, int32_t capacity);
|
||||||
|
int64_t getFillInfoStart(struct SFillInfo *pFillInfo);
|
||||||
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
|
@ -14,11 +14,12 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "filter.h"
|
#include "filter.h"
|
||||||
#include "functionMgt.h"
|
|
||||||
#include "function.h"
|
#include "function.h"
|
||||||
|
#include "functionMgt.h"
|
||||||
|
#include "os.h"
|
||||||
#include "querynodes.h"
|
#include "querynodes.h"
|
||||||
#include "tname.h"
|
#include "tname.h"
|
||||||
#include "os.h"
|
#include "tfill.h"
|
||||||
|
|
||||||
#include "tdatablock.h"
|
#include "tdatablock.h"
|
||||||
#include "tglobal.h"
|
#include "tglobal.h"
|
||||||
|
@ -514,13 +515,12 @@ static SResultRow* doSetResultOutBufByKey_rv(SDiskbasedBuf* pResultBuf, SResultR
|
||||||
return pResult;
|
return pResult;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void getInitialStartTimeWindow(SInterval* pInterval, int32_t precision, TSKEY ts, STimeWindow* w, TSKEY ekey,
|
static void getInitialStartTimeWindow(SInterval* pInterval, int32_t precision, TSKEY ts, STimeWindow* w, bool ascQuery) {
|
||||||
bool ascQuery) {
|
|
||||||
if (ascQuery) {
|
if (ascQuery) {
|
||||||
getAlignQueryTimeWindow(pInterval, precision, ts, ts, ekey, w);
|
getAlignQueryTimeWindow(pInterval, precision, ts, w);
|
||||||
} else {
|
} else {
|
||||||
// the start position of the first time window in the endpoint that spreads beyond the queried last timestamp
|
// the start position of the first time window in the endpoint that spreads beyond the queried last timestamp
|
||||||
getAlignQueryTimeWindow(pInterval, precision, ts, ekey, ts, w);
|
getAlignQueryTimeWindow(pInterval, precision, ts, w);
|
||||||
|
|
||||||
int64_t key = w->skey;
|
int64_t key = w->skey;
|
||||||
while (key < ts) { // moving towards end
|
while (key < ts) { // moving towards end
|
||||||
|
@ -540,7 +540,7 @@ static STimeWindow getActiveTimeWindow(SDiskbasedBuf * pBuf, SResultRowInfo* pRe
|
||||||
STimeWindow w = {0};
|
STimeWindow w = {0};
|
||||||
|
|
||||||
if (pResultRowInfo->cur.pageId == -1) { // the first window, from the previous stored value
|
if (pResultRowInfo->cur.pageId == -1) { // the first window, from the previous stored value
|
||||||
getInitialStartTimeWindow(pInterval, precision, ts, &w, win->ekey, true);
|
getInitialStartTimeWindow(pInterval, precision, ts, &w, true);
|
||||||
w.ekey = taosTimeAdd(w.skey, pInterval->interval, pInterval->intervalUnit, precision) - 1;
|
w.ekey = taosTimeAdd(w.skey, pInterval->interval, pInterval->intervalUnit, precision) - 1;
|
||||||
} else {
|
} else {
|
||||||
w = getResultRowByPos(pBuf, &pResultRowInfo->cur)->win;
|
w = getResultRowByPos(pBuf, &pResultRowInfo->cur)->win;
|
||||||
|
@ -2015,20 +2015,16 @@ static bool isCachedLastQuery(STaskAttr* pQueryAttr) {
|
||||||
|
|
||||||
/////////////////////////////////////////////////////////////////////////////////////////////
|
/////////////////////////////////////////////////////////////////////////////////////////////
|
||||||
// todo refactor : return window
|
// todo refactor : return window
|
||||||
void getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key, int64_t keyFirst, int64_t keyLast,
|
void getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key, STimeWindow* win) {
|
||||||
STimeWindow* win) {
|
|
||||||
ASSERT(key >= keyFirst && key <= keyLast);
|
|
||||||
win->skey = taosTimeTruncate(key, pInterval, precision);
|
win->skey = taosTimeTruncate(key, pInterval, precision);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* if the realSkey > INT64_MAX - pInterval->interval, the query duration between
|
* if the realSkey > INT64_MAX - pInterval->interval, the query duration between
|
||||||
* realSkey and realEkey must be less than one interval.Therefore, no need to adjust the query ranges.
|
* realSkey and realEkey must be less than one interval.Therefore, no need to adjust the query ranges.
|
||||||
*/
|
*/
|
||||||
if (keyFirst > (INT64_MAX - pInterval->interval)) {
|
win->ekey = taosTimeAdd(win->skey, pInterval->interval, pInterval->intervalUnit, precision) - 1;
|
||||||
assert(keyLast - keyFirst < pInterval->interval);
|
if (win->ekey < win->skey) {
|
||||||
win->ekey = INT64_MAX;
|
win->ekey = INT64_MAX;
|
||||||
} else {
|
|
||||||
win->ekey = taosTimeAdd(win->skey, pInterval->interval, pInterval->intervalUnit, precision) - 1;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3176,10 +3172,12 @@ int32_t doCopyToSDataBlock(SSDataBlock* pBlock, int32_t rowCapacity, SExprInfo*
|
||||||
|
|
||||||
// qDebug("QInfo:0x%"PRIx64" copy data to query buf completed", GET_TASKID(pRuntimeEnv));
|
// qDebug("QInfo:0x%"PRIx64" copy data to query buf completed", GET_TASKID(pRuntimeEnv));
|
||||||
pBlock->info.rows = numOfResult;
|
pBlock->info.rows = numOfResult;
|
||||||
|
blockDataUpdateTsWindow(pBlock);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void toSDatablock(SSDataBlock* pBlock, int32_t rowCapacity, SGroupResInfo* pGroupResInfo, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf,
|
void doBuildResultDatablock(SSDataBlock* pBlock, int32_t rowCapacity, SGroupResInfo* pGroupResInfo, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf,
|
||||||
int32_t* rowCellOffset) {
|
int32_t* rowCellOffset) {
|
||||||
assert(pGroupResInfo->currentGroup <= pGroupResInfo->totalGroup);
|
assert(pGroupResInfo->currentGroup <= pGroupResInfo->totalGroup);
|
||||||
|
|
||||||
|
@ -4813,7 +4811,7 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator, bool* newgroup)
|
||||||
}
|
}
|
||||||
|
|
||||||
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
|
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
|
||||||
toSDatablock(pInfo->pRes, pOperator->resultInfo.capacity, &pAggInfo->groupResInfo, pOperator->pExpr, pAggInfo->aggSup.pResultBuf, pInfo->rowCellInfoOffset);
|
doBuildResultDatablock(pInfo->pRes, pOperator->resultInfo.capacity, &pAggInfo->groupResInfo, pOperator->pExpr, pAggInfo->aggSup.pResultBuf, pInfo->rowCellInfoOffset);
|
||||||
if (pInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pAggInfo->groupResInfo)) {
|
if (pInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pAggInfo->groupResInfo)) {
|
||||||
doSetOperatorCompleted(pOperator);
|
doSetOperatorCompleted(pOperator);
|
||||||
}
|
}
|
||||||
|
@ -5126,6 +5124,8 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator, bool* newgro
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SSDataBlock* pBlock = pInfo->binfo.pRes;
|
||||||
|
|
||||||
if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM) {
|
if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM) {
|
||||||
return pOperator->getStreamResFn(pOperator, newgroup);
|
return pOperator->getStreamResFn(pOperator, newgroup);
|
||||||
} else {
|
} else {
|
||||||
|
@ -5134,15 +5134,15 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator, bool* newgro
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
|
blockDataEnsureCapacity(pBlock, pOperator->resultInfo.capacity);
|
||||||
toSDatablock(pInfo->binfo.pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr,
|
doBuildResultDatablock(pBlock, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr,
|
||||||
pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset);
|
pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset);
|
||||||
|
|
||||||
if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
|
if (pBlock->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
|
||||||
doSetOperatorCompleted(pOperator);
|
doSetOperatorCompleted(pOperator);
|
||||||
}
|
}
|
||||||
|
|
||||||
return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes;
|
return pBlock->info.rows == 0 ? NULL : pBlock;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -5155,7 +5155,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo *pOperator, bool* newgroup
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pOperator->status == OP_RES_TO_RETURN) {
|
if (pOperator->status == OP_RES_TO_RETURN) {
|
||||||
toSDatablock(pInfo->binfo.pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset);
|
doBuildResultDatablock(pInfo->binfo.pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset);
|
||||||
if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
|
if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
|
||||||
pOperator->status = OP_EXEC_DONE;
|
pOperator->status = OP_EXEC_DONE;
|
||||||
}
|
}
|
||||||
|
@ -5190,7 +5190,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo *pOperator, bool* newgroup
|
||||||
|
|
||||||
initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
|
initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
|
||||||
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
|
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
|
||||||
toSDatablock(pInfo->binfo.pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset);
|
doBuildResultDatablock(pInfo->binfo.pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset);
|
||||||
|
|
||||||
ASSERT(pInfo->binfo.pRes->info.rows > 0);
|
ASSERT(pInfo->binfo.pRes->info.rows > 0);
|
||||||
pOperator->status = OP_RES_TO_RETURN;
|
pOperator->status = OP_RES_TO_RETURN;
|
||||||
|
@ -5205,7 +5205,7 @@ static SSDataBlock* doAllIntervalAgg(SOperatorInfo *pOperator, bool* newgroup) {
|
||||||
|
|
||||||
STimeSliceOperatorInfo* pSliceInfo = pOperator->info;
|
STimeSliceOperatorInfo* pSliceInfo = pOperator->info;
|
||||||
if (pOperator->status == OP_RES_TO_RETURN) {
|
if (pOperator->status == OP_RES_TO_RETURN) {
|
||||||
// toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes);
|
// doBuildResultDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes);
|
||||||
if (pSliceInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pSliceInfo->groupResInfo)) {
|
if (pSliceInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pSliceInfo->groupResInfo)) {
|
||||||
doSetOperatorCompleted(pOperator);
|
doSetOperatorCompleted(pOperator);
|
||||||
}
|
}
|
||||||
|
@ -5238,7 +5238,7 @@ static SSDataBlock* doAllIntervalAgg(SOperatorInfo *pOperator, bool* newgroup) {
|
||||||
finalizeQueryResult(pSliceInfo->binfo.pCtx, pOperator->numOfOutput);
|
finalizeQueryResult(pSliceInfo->binfo.pCtx, pOperator->numOfOutput);
|
||||||
|
|
||||||
initGroupResInfo(&pSliceInfo->groupResInfo, &pSliceInfo->binfo.resultRowInfo);
|
initGroupResInfo(&pSliceInfo->groupResInfo, &pSliceInfo->binfo.resultRowInfo);
|
||||||
// toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pSliceInfo->pRes);
|
// doBuildResultDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pSliceInfo->pRes);
|
||||||
|
|
||||||
if (pSliceInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pSliceInfo->groupResInfo)) {
|
if (pSliceInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pSliceInfo->groupResInfo)) {
|
||||||
pOperator->status = OP_EXEC_DONE;
|
pOperator->status = OP_EXEC_DONE;
|
||||||
|
@ -5292,7 +5292,7 @@ static SSDataBlock* doSTableIntervalAgg(SOperatorInfo* pOperator, bool* newgroup
|
||||||
OPTR_SET_OPENED(pOperator);
|
OPTR_SET_OPENED(pOperator);
|
||||||
|
|
||||||
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
|
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
|
||||||
toSDatablock(pInfo->binfo.pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr,
|
doBuildResultDatablock(pInfo->binfo.pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr,
|
||||||
pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset);
|
pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset);
|
||||||
|
|
||||||
if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
|
if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
|
||||||
|
@ -5383,7 +5383,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator, bool* newgroup) {
|
||||||
SOptrBasicInfo* pBInfo = &pInfo->binfo;
|
SOptrBasicInfo* pBInfo = &pInfo->binfo;
|
||||||
|
|
||||||
if (pOperator->status == OP_RES_TO_RETURN) {
|
if (pOperator->status == OP_RES_TO_RETURN) {
|
||||||
toSDatablock(pBInfo->pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pBInfo->rowCellInfoOffset);
|
doBuildResultDatablock(pBInfo->pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pBInfo->rowCellInfoOffset);
|
||||||
if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
|
if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
|
||||||
doSetOperatorCompleted(pOperator);
|
doSetOperatorCompleted(pOperator);
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -5415,7 +5415,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator, bool* newgroup) {
|
||||||
|
|
||||||
initGroupResInfo(&pInfo->groupResInfo, &pBInfo->resultRowInfo);
|
initGroupResInfo(&pInfo->groupResInfo, &pBInfo->resultRowInfo);
|
||||||
blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
|
blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
|
||||||
toSDatablock(pBInfo->pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pBInfo->rowCellInfoOffset);
|
doBuildResultDatablock(pBInfo->pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pBInfo->rowCellInfoOffset);
|
||||||
if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
|
if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
|
||||||
doSetOperatorCompleted(pOperator);
|
doSetOperatorCompleted(pOperator);
|
||||||
}
|
}
|
||||||
|
@ -5432,7 +5432,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator, bool* newgroup)
|
||||||
SOptrBasicInfo* pBInfo = &pInfo->binfo;
|
SOptrBasicInfo* pBInfo = &pInfo->binfo;
|
||||||
|
|
||||||
if (pOperator->status == OP_RES_TO_RETURN) {
|
if (pOperator->status == OP_RES_TO_RETURN) {
|
||||||
toSDatablock(pBInfo->pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pBInfo->rowCellInfoOffset);
|
doBuildResultDatablock(pBInfo->pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pBInfo->rowCellInfoOffset);
|
||||||
if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
|
if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
|
||||||
doSetOperatorCompleted(pOperator);
|
doSetOperatorCompleted(pOperator);
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -5464,7 +5464,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator, bool* newgroup)
|
||||||
|
|
||||||
initGroupResInfo(&pInfo->groupResInfo, &pBInfo->resultRowInfo);
|
initGroupResInfo(&pInfo->groupResInfo, &pBInfo->resultRowInfo);
|
||||||
blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
|
blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
|
||||||
toSDatablock(pBInfo->pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pBInfo->rowCellInfoOffset);
|
doBuildResultDatablock(pBInfo->pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pBInfo->rowCellInfoOffset);
|
||||||
if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
|
if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
|
||||||
doSetOperatorCompleted(pOperator);
|
doSetOperatorCompleted(pOperator);
|
||||||
}
|
}
|
||||||
|
@ -5509,14 +5509,16 @@ static SSDataBlock* doFill(SOperatorInfo* pOperator, bool* newgroup) {
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
|
||||||
SResultInfo* pResultInfo = &pOperator->resultInfo;
|
SResultInfo* pResultInfo = &pOperator->resultInfo;
|
||||||
blockDataCleanup(pInfo->pRes);
|
SSDataBlock* pResBlock = pInfo->pRes;
|
||||||
|
|
||||||
|
blockDataCleanup(pResBlock);
|
||||||
if (pOperator->status == OP_EXEC_DONE) {
|
if (pOperator->status == OP_EXEC_DONE) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
doHandleRemainBlockFromNewGroup(pInfo, pResultInfo, newgroup, pTaskInfo);
|
doHandleRemainBlockFromNewGroup(pInfo, pResultInfo, newgroup, pTaskInfo);
|
||||||
if (pInfo->pRes->info.rows > pResultInfo->threshold || (!pInfo->multigroupResult && pInfo->pRes->info.rows > 0)) {
|
if (pResBlock->info.rows > pResultInfo->threshold || (!pInfo->multigroupResult && pResBlock->info.rows > 0)) {
|
||||||
return pInfo->pRes;
|
return pResBlock;
|
||||||
}
|
}
|
||||||
|
|
||||||
SOperatorInfo* pDownstream = pOperator->pDownstream[0];
|
SOperatorInfo* pDownstream = pOperator->pDownstream[0];
|
||||||
|
@ -5551,25 +5553,25 @@ static SSDataBlock* doFill(SOperatorInfo* pOperator, bool* newgroup) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pInfo->capacity, pInfo->p);
|
doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pResBlock, pOperator->resultInfo.capacity, pInfo->p);
|
||||||
|
|
||||||
// current group has no more result to return
|
// current group has no more result to return
|
||||||
if (pInfo->pRes->info.rows > 0) {
|
if (pResBlock->info.rows > 0) {
|
||||||
// 1. The result in current group not reach the threshold of output result, continue
|
// 1. The result in current group not reach the threshold of output result, continue
|
||||||
// 2. If multiple group results existing in one SSDataBlock is not allowed, return immediately
|
// 2. If multiple group results existing in one SSDataBlock is not allowed, return immediately
|
||||||
if (pInfo->pRes->info.rows > pResultInfo->threshold || pBlock == NULL || (!pInfo->multigroupResult)) {
|
if (pResBlock->info.rows > pResultInfo->threshold || pBlock == NULL || (!pInfo->multigroupResult)) {
|
||||||
return pInfo->pRes;
|
return pResBlock;
|
||||||
}
|
}
|
||||||
|
|
||||||
doHandleRemainBlockFromNewGroup(pInfo, pResultInfo, newgroup, pTaskInfo);
|
doHandleRemainBlockFromNewGroup(pInfo, pResultInfo, newgroup, pTaskInfo);
|
||||||
if (pInfo->pRes->info.rows > pOperator->resultInfo.threshold || pBlock == NULL) {
|
if (pResBlock->info.rows > pOperator->resultInfo.threshold || pBlock == NULL) {
|
||||||
return pInfo->pRes;
|
return pResBlock;
|
||||||
}
|
}
|
||||||
} else if (pInfo->existNewGroupBlock) { // try next group
|
} else if (pInfo->existNewGroupBlock) { // try next group
|
||||||
assert(pBlock != NULL);
|
assert(pBlock != NULL);
|
||||||
doHandleRemainBlockForNewGroupImpl(pInfo, pResultInfo, newgroup, pTaskInfo);
|
doHandleRemainBlockForNewGroupImpl(pInfo, pResultInfo, newgroup, pTaskInfo);
|
||||||
if (pInfo->pRes->info.rows > pResultInfo->threshold) {
|
if (pResBlock->info.rows > pResultInfo->threshold) {
|
||||||
return pInfo->pRes;
|
return pResBlock;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -5615,7 +5617,7 @@ int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t n
|
||||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||||
|
|
||||||
pAggSup->resultRowSize = getResultRowSize(pCtx, numOfOutput);
|
pAggSup->resultRowSize = getResultRowSize(pCtx, numOfOutput);
|
||||||
pAggSup->keyBuf = taosMemoryCalloc(1, keyBufSize);
|
pAggSup->keyBuf = taosMemoryCalloc(1, keyBufSize + POINTER_BYTES + sizeof(int64_t));
|
||||||
pAggSup->pResultRowHashTable = taosHashInit(10, hashFn, true, HASH_NO_LOCK);
|
pAggSup->pResultRowHashTable = taosHashInit(10, hashFn, true, HASH_NO_LOCK);
|
||||||
pAggSup->pResultRowListSet = taosHashInit(100, hashFn, false, HASH_NO_LOCK);
|
pAggSup->pResultRowListSet = taosHashInit(100, hashFn, false, HASH_NO_LOCK);
|
||||||
pAggSup->pResultRowArrayList = taosArrayInit(10, sizeof(SResultRowCell));
|
pAggSup->pResultRowArrayList = taosArrayInit(10, sizeof(SResultRowCell));
|
||||||
|
@ -5878,8 +5880,7 @@ _error:
|
||||||
|
|
||||||
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||||
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
|
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
|
||||||
STimeWindowAggSupp *pTwAggSupp, const STableGroupInfo* pTableGroupInfo,
|
STimeWindowAggSupp* pTwAggSupp, const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo) {
|
||||||
SExecTaskInfo* pTaskInfo) {
|
|
||||||
STableIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(STableIntervalOperatorInfo));
|
STableIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(STableIntervalOperatorInfo));
|
||||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||||
if (pInfo == NULL || pOperator == NULL) {
|
if (pInfo == NULL || pOperator == NULL) {
|
||||||
|
@ -5890,10 +5891,8 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
|
||||||
pInfo->interval = *pInterval;
|
pInfo->interval = *pInterval;
|
||||||
pInfo->execModel = pTaskInfo->execModel;
|
pInfo->execModel = pTaskInfo->execModel;
|
||||||
pInfo->win = pTaskInfo->window;
|
pInfo->win = pTaskInfo->window;
|
||||||
pInfo->win.skey = 0;
|
|
||||||
pInfo->win.ekey = INT64_MAX;
|
|
||||||
pInfo->primaryTsIndex = primaryTsSlotId;
|
|
||||||
pInfo->twAggSup = *pTwAggSupp;
|
pInfo->twAggSup = *pTwAggSupp;
|
||||||
|
pInfo->primaryTsIndex = primaryTsSlotId;
|
||||||
|
|
||||||
int32_t numOfRows = 4096;
|
int32_t numOfRows = 4096;
|
||||||
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
|
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
|
||||||
|
@ -6063,18 +6062,14 @@ _error:
|
||||||
|
|
||||||
static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t numOfCols, int64_t* fillVal,
|
static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t numOfCols, int64_t* fillVal,
|
||||||
STimeWindow win, int32_t capacity, const char* id, SInterval* pInterval, int32_t fillType) {
|
STimeWindow win, int32_t capacity, const char* id, SInterval* pInterval, int32_t fillType) {
|
||||||
struct SFillColInfo* pColInfo = createFillColInfo(pExpr, numOfCols, (int64_t*)fillVal);
|
SFillColInfo* pColInfo = createFillColInfo(pExpr, numOfCols, NULL);
|
||||||
|
|
||||||
TSKEY sk = TMIN(win.skey, win.ekey);
|
|
||||||
TSKEY ek = TMAX(win.skey, win.ekey);
|
|
||||||
|
|
||||||
// TODO set correct time precision
|
// TODO set correct time precision
|
||||||
STimeWindow w = TSWINDOW_INITIALIZER;
|
STimeWindow w = TSWINDOW_INITIALIZER;
|
||||||
getAlignQueryTimeWindow(pInterval, TSDB_TIME_PRECISION_MILLI, win.skey, sk, ek, &w);
|
getAlignQueryTimeWindow(pInterval, TSDB_TIME_PRECISION_MILLI, win.skey, &w);
|
||||||
|
|
||||||
int32_t order = TSDB_ORDER_ASC;
|
int32_t order = TSDB_ORDER_ASC;
|
||||||
pInfo->pFillInfo = taosCreateFillInfo(order, w.skey, 0, capacity, numOfCols, pInterval->sliding,
|
pInfo->pFillInfo = taosCreateFillInfo(order, w.skey, 0, capacity, numOfCols, pInterval, fillType, pColInfo, id);
|
||||||
pInterval->slidingUnit, (int8_t)pInterval->precision, fillType, pColInfo, id);
|
|
||||||
|
|
||||||
pInfo->p = taosMemoryCalloc(numOfCols, POINTER_BYTES);
|
pInfo->p = taosMemoryCalloc(numOfCols, POINTER_BYTES);
|
||||||
|
|
||||||
|
@ -6095,23 +6090,37 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExp
|
||||||
pInfo->multigroupResult = multigroupResult;
|
pInfo->multigroupResult = multigroupResult;
|
||||||
pInfo->intervalInfo = *pInterval;
|
pInfo->intervalInfo = *pInterval;
|
||||||
|
|
||||||
|
int32_t type = TSDB_FILL_NONE;
|
||||||
|
switch (fillType) {
|
||||||
|
case FILL_MODE_PREV: type = TSDB_FILL_PREV;break;
|
||||||
|
case FILL_MODE_NONE: type = TSDB_FILL_NONE;break;
|
||||||
|
case FILL_MODE_NULL: type = TSDB_FILL_NULL;break;
|
||||||
|
case FILL_MODE_NEXT: type = TSDB_FILL_NEXT;break;
|
||||||
|
case FILL_MODE_VALUE: type = TSDB_FILL_SET_VALUE;break;
|
||||||
|
case FILL_MODE_LINEAR: type = TSDB_FILL_LINEAR;break;
|
||||||
|
default:
|
||||||
|
type = TSDB_FILL_NONE;
|
||||||
|
}
|
||||||
|
|
||||||
SResultInfo* pResultInfo = &pOperator->resultInfo;
|
SResultInfo* pResultInfo = &pOperator->resultInfo;
|
||||||
int32_t code = initFillInfo(pInfo, pExpr, numOfCols, (int64_t*)fillVal, pTaskInfo->window, pResultInfo->capacity,
|
initResultSizeInfo(pOperator, 4096);
|
||||||
pTaskInfo->id.str, pInterval, fillType);
|
|
||||||
|
int32_t code = initFillInfo(pInfo, pExpr, numOfCols, (int64_t*)fillVal, pTaskInfo->window, pResultInfo->capacity,
|
||||||
|
pTaskInfo->id.str, pInterval, type);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
pOperator->name = "FillOperator";
|
pOperator->name = "FillOperator";
|
||||||
pOperator->blockingOptr = false;
|
pOperator->blockingOptr = false;
|
||||||
pOperator->status = OP_NOT_OPENED;
|
pOperator->status = OP_NOT_OPENED;
|
||||||
// pOperator->operatorType = OP_Fill;
|
// pOperator->operatorType = OP_Fill;
|
||||||
pOperator->pExpr = pExpr;
|
pOperator->pExpr = pExpr;
|
||||||
pOperator->numOfOutput = numOfCols;
|
pOperator->numOfOutput = numOfCols;
|
||||||
pOperator->info = pInfo;
|
pOperator->info = pInfo;
|
||||||
pOperator->_openFn = operatorDummyOpenFn;
|
pOperator->_openFn = operatorDummyOpenFn;
|
||||||
pOperator->getNextFn = doFill;
|
pOperator->getNextFn = doFill;
|
||||||
pOperator->pTaskInfo = pTaskInfo;
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
|
|
||||||
pOperator->closeFn = destroySFillOperatorInfo;
|
pOperator->closeFn = destroySFillOperatorInfo;
|
||||||
|
|
||||||
|
@ -6582,6 +6591,11 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
|
|
||||||
int32_t primaryTsSlotId = ((SColumnNode*) pIntervalPhyNode->window.pTspk)->slotId;
|
int32_t primaryTsSlotId = ((SColumnNode*) pIntervalPhyNode->window.pTspk)->slotId;
|
||||||
pOptr = createIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, primaryTsSlotId, &as, pTableGroupInfo, pTaskInfo);
|
pOptr = createIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, primaryTsSlotId, &as, pTableGroupInfo, pTaskInfo);
|
||||||
|
|
||||||
|
if (pIntervalPhyNode->pFill != NULL) {
|
||||||
|
pOptr = createFillOperatorInfo(pOptr, pExprInfo, num, &interval, pResBlock, pIntervalPhyNode->pFill->mode, NULL, false, pTaskInfo);
|
||||||
|
}
|
||||||
|
|
||||||
} else if (QUERY_NODE_PHYSICAL_PLAN_SORT == type) {
|
} else if (QUERY_NODE_PHYSICAL_PLAN_SORT == type) {
|
||||||
SSortPhysiNode* pSortPhyNode = (SSortPhysiNode*)pPhyNode;
|
SSortPhysiNode* pSortPhyNode = (SSortPhysiNode*)pPhyNode;
|
||||||
|
|
||||||
|
|
|
@ -265,7 +265,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator, bool* newgrou
|
||||||
SSDataBlock* pRes = pInfo->binfo.pRes;
|
SSDataBlock* pRes = pInfo->binfo.pRes;
|
||||||
|
|
||||||
if (pOperator->status == OP_RES_TO_RETURN) {
|
if (pOperator->status == OP_RES_TO_RETURN) {
|
||||||
toSDatablock(pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset);
|
doBuildResultDatablock(pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset);
|
||||||
if (pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
|
if (pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
|
||||||
pOperator->status = OP_EXEC_DONE;
|
pOperator->status = OP_EXEC_DONE;
|
||||||
}
|
}
|
||||||
|
@ -311,7 +311,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator, bool* newgrou
|
||||||
initGroupResInfo(&pInfo->groupResInfo, &pInfo->binfo.resultRowInfo);
|
initGroupResInfo(&pInfo->groupResInfo, &pInfo->binfo.resultRowInfo);
|
||||||
|
|
||||||
while(1) {
|
while(1) {
|
||||||
toSDatablock(pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset);
|
doBuildResultDatablock(pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset);
|
||||||
doFilter(pInfo->pCondition, pRes);
|
doFilter(pInfo->pCondition, pRes);
|
||||||
|
|
||||||
bool hasRemain = hasRemainDataInCurrentGroup(&pInfo->groupResInfo);
|
bool hasRemain = hasRemainDataInCurrentGroup(&pInfo->groupResInfo);
|
||||||
|
|
|
@ -129,7 +129,7 @@ static bool overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockIn
|
||||||
// TSKEY ek = MAX(pQueryAttr->window.skey, pQueryAttr->window.ekey);
|
// TSKEY ek = MAX(pQueryAttr->window.skey, pQueryAttr->window.ekey);
|
||||||
|
|
||||||
if (true) {
|
if (true) {
|
||||||
getAlignQueryTimeWindow(pInterval, pInterval->precision, pBlockInfo->window.skey, sk, ek, &w);
|
getAlignQueryTimeWindow(pInterval, pInterval->precision, pBlockInfo->window.skey, &w);
|
||||||
assert(w.ekey >= pBlockInfo->window.skey);
|
assert(w.ekey >= pBlockInfo->window.skey);
|
||||||
|
|
||||||
if (w.ekey < pBlockInfo->window.ekey) {
|
if (w.ekey < pBlockInfo->window.ekey) {
|
||||||
|
|
|
@ -13,17 +13,18 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include <function.h>
|
#include "function.h"
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
|
#include "querynodes.h"
|
||||||
|
|
||||||
#include "taosdef.h"
|
#include "taosdef.h"
|
||||||
#include "tmsg.h"
|
#include "tmsg.h"
|
||||||
#include "ttypes.h"
|
#include "ttypes.h"
|
||||||
|
|
||||||
#include "tfill.h"
|
#include "tfill.h"
|
||||||
#include "thash.h"
|
|
||||||
#include "function.h"
|
#include "function.h"
|
||||||
#include "tcommon.h"
|
#include "tcommon.h"
|
||||||
|
#include "thash.h"
|
||||||
#include "ttime.h"
|
#include "ttime.h"
|
||||||
|
|
||||||
#define FILL_IS_ASC_FILL(_f) ((_f)->order == TSDB_ORDER_ASC)
|
#define FILL_IS_ASC_FILL(_f) ((_f)->order == TSDB_ORDER_ASC)
|
||||||
|
@ -41,7 +42,7 @@ static void setTagsValue(SFillInfo* pFillInfo, void** data, int32_t genRows) {
|
||||||
assert(pCol->tagIndex >= 0 && pCol->tagIndex < pFillInfo->numOfTags);
|
assert(pCol->tagIndex >= 0 && pCol->tagIndex < pFillInfo->numOfTags);
|
||||||
SFillTagColInfo* pTag = &pFillInfo->pTags[pCol->tagIndex];
|
SFillTagColInfo* pTag = &pFillInfo->pTags[pCol->tagIndex];
|
||||||
|
|
||||||
assert (pTag->col.colId == pCol->col.colId);
|
// assert (pTag->col.colId == pCol->col.colId);
|
||||||
assignVal(val1, pTag->tagVal, pCol->col.bytes, pCol->col.type);
|
assignVal(val1, pTag->tagVal, pCol->col.bytes, pCol->col.type);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -80,7 +81,7 @@ static void doFillOneRowResult(SFillInfo* pFillInfo, void** data, char** srcData
|
||||||
}
|
}
|
||||||
|
|
||||||
char* output = elePtrAt(data[i], pCol->col.bytes, index);
|
char* output = elePtrAt(data[i], pCol->col.bytes, index);
|
||||||
assignVal(output, p + pCol->col.offset, pCol->col.bytes, pCol->col.type);
|
// assignVal(output, p + pCol->offset, pCol->col.bytes, pCol->col.type);
|
||||||
}
|
}
|
||||||
} else { // no prev value yet, set the value for NULL
|
} else { // no prev value yet, set the value for NULL
|
||||||
setNullValueForRow(pFillInfo, data, pFillInfo->numOfCols, index);
|
setNullValueForRow(pFillInfo, data, pFillInfo->numOfCols, index);
|
||||||
|
@ -96,7 +97,7 @@ static void doFillOneRowResult(SFillInfo* pFillInfo, void** data, char** srcData
|
||||||
}
|
}
|
||||||
|
|
||||||
char* output = elePtrAt(data[i], pCol->col.bytes, index);
|
char* output = elePtrAt(data[i], pCol->col.bytes, index);
|
||||||
assignVal(output, p + pCol->col.offset, pCol->col.bytes, pCol->col.type);
|
// assignVal(output, p + pCol->offset, pCol->col.bytes, pCol->col.type);
|
||||||
}
|
}
|
||||||
} else { // no prev value yet, set the value for NULL
|
} else { // no prev value yet, set the value for NULL
|
||||||
setNullValueForRow(pFillInfo, data, pFillInfo->numOfCols, index);
|
setNullValueForRow(pFillInfo, data, pFillInfo->numOfCols, index);
|
||||||
|
@ -119,7 +120,7 @@ static void doFillOneRowResult(SFillInfo* pFillInfo, void** data, char** srcData
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
point1 = (SPoint){.key = *(TSKEY*)(prev), .val = prev + pCol->col.offset};
|
point1 = (SPoint){.key = *(TSKEY*)(prev), .val = prev + pCol->offset};
|
||||||
point2 = (SPoint){.key = ts, .val = srcData[i] + pFillInfo->index * bytes};
|
point2 = (SPoint){.key = ts, .val = srcData[i] + pFillInfo->index * bytes};
|
||||||
point = (SPoint){.key = pFillInfo->currentKey, .val = val1};
|
point = (SPoint){.key = pFillInfo->currentKey, .val = val1};
|
||||||
taosGetLinearInterpolationVal(&point, type, &point1, &point2, type);
|
taosGetLinearInterpolationVal(&point, type, &point1, &point2, type);
|
||||||
|
@ -135,12 +136,13 @@ static void doFillOneRowResult(SFillInfo* pFillInfo, void** data, char** srcData
|
||||||
}
|
}
|
||||||
|
|
||||||
char* val1 = elePtrAt(data[i], pCol->col.bytes, index);
|
char* val1 = elePtrAt(data[i], pCol->col.bytes, index);
|
||||||
assignVal(val1, (char*)&pCol->fillVal.i, pCol->col.bytes, pCol->col.type);
|
assignVal(val1, (char*)&pCol->val, pCol->col.bytes, pCol->col.type);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
setTagsValue(pFillInfo, data, index);
|
setTagsValue(pFillInfo, data, index);
|
||||||
pFillInfo->currentKey = taosTimeAdd(pFillInfo->currentKey, pFillInfo->interval.sliding * step, pFillInfo->interval.slidingUnit, pFillInfo->precision);
|
pFillInfo->currentKey = taosTimeAdd(pFillInfo->currentKey, pFillInfo->interval.sliding * step, pFillInfo->interval.slidingUnit,
|
||||||
|
pFillInfo->interval.precision);
|
||||||
pFillInfo->numOfCurrent++;
|
pFillInfo->numOfCurrent++;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -152,7 +154,7 @@ static void initBeforeAfterDataBuf(SFillInfo* pFillInfo, char** next) {
|
||||||
*next = taosMemoryCalloc(1, pFillInfo->rowSize);
|
*next = taosMemoryCalloc(1, pFillInfo->rowSize);
|
||||||
for (int i = 1; i < pFillInfo->numOfCols; i++) {
|
for (int i = 1; i < pFillInfo->numOfCols; i++) {
|
||||||
SFillColInfo* pCol = &pFillInfo->pFillCol[i];
|
SFillColInfo* pCol = &pFillInfo->pFillCol[i];
|
||||||
setNull(*next + pCol->col.offset, pCol->col.type, pCol->col.bytes);
|
setNull(*next + pCol->offset, pCol->col.type, pCol->col.bytes);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -160,7 +162,7 @@ static void copyCurrentRowIntoBuf(SFillInfo* pFillInfo, char** srcData, char* bu
|
||||||
int32_t rowIndex = pFillInfo->index;
|
int32_t rowIndex = pFillInfo->index;
|
||||||
for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
|
for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
|
||||||
SFillColInfo* pCol = &pFillInfo->pFillCol[i];
|
SFillColInfo* pCol = &pFillInfo->pFillCol[i];
|
||||||
memcpy(buf + pCol->col.offset, srcData[i] + rowIndex * pCol->col.bytes, pCol->col.bytes);
|
memcpy(buf + pCol->offset, srcData[i] + rowIndex * pCol->col.bytes, pCol->col.bytes);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -227,21 +229,21 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, void** data, int32_t outputR
|
||||||
if (i == 0 || (pCol->functionId != FUNCTION_COUNT && !isNull(src, pCol->col.type)) ||
|
if (i == 0 || (pCol->functionId != FUNCTION_COUNT && !isNull(src, pCol->col.type)) ||
|
||||||
(pCol->functionId == FUNCTION_COUNT && GET_INT64_VAL(src) != 0)) {
|
(pCol->functionId == FUNCTION_COUNT && GET_INT64_VAL(src) != 0)) {
|
||||||
assignVal(output, src, pCol->col.bytes, pCol->col.type);
|
assignVal(output, src, pCol->col.bytes, pCol->col.type);
|
||||||
memcpy(*prev + pCol->col.offset, src, pCol->col.bytes);
|
memcpy(*prev + pCol->offset, src, pCol->col.bytes);
|
||||||
} else { // i > 0 and data is null , do interpolation
|
} else { // i > 0 and data is null , do interpolation
|
||||||
if (pFillInfo->type == TSDB_FILL_PREV) {
|
if (pFillInfo->type == TSDB_FILL_PREV) {
|
||||||
assignVal(output, *prev + pCol->col.offset, pCol->col.bytes, pCol->col.type);
|
assignVal(output, *prev + pCol->offset, pCol->col.bytes, pCol->col.type);
|
||||||
} else if (pFillInfo->type == TSDB_FILL_LINEAR) {
|
} else if (pFillInfo->type == TSDB_FILL_LINEAR) {
|
||||||
assignVal(output, src, pCol->col.bytes, pCol->col.type);
|
assignVal(output, src, pCol->col.bytes, pCol->col.type);
|
||||||
memcpy(*prev + pCol->col.offset, src, pCol->col.bytes);
|
memcpy(*prev + pCol->offset, src, pCol->col.bytes);
|
||||||
} else if (pFillInfo->type == TSDB_FILL_NEXT) {
|
} else if (pFillInfo->type == TSDB_FILL_NEXT) {
|
||||||
if (*next) {
|
if (*next) {
|
||||||
assignVal(output, *next + pCol->col.offset, pCol->col.bytes, pCol->col.type);
|
assignVal(output, *next + pCol->offset, pCol->col.bytes, pCol->col.type);
|
||||||
} else {
|
} else {
|
||||||
setNull(output, pCol->col.type, pCol->col.bytes);
|
setNull(output, pCol->col.type, pCol->col.bytes);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
assignVal(output, (char*)&pCol->fillVal.i, pCol->col.bytes, pCol->col.type);
|
assignVal(output, (char*)&pCol->val, pCol->col.bytes, pCol->col.type);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -250,7 +252,7 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, void** data, int32_t outputR
|
||||||
setTagsValue(pFillInfo, data, pFillInfo->numOfCurrent);
|
setTagsValue(pFillInfo, data, pFillInfo->numOfCurrent);
|
||||||
|
|
||||||
pFillInfo->currentKey = taosTimeAdd(pFillInfo->currentKey, pFillInfo->interval.sliding * step,
|
pFillInfo->currentKey = taosTimeAdd(pFillInfo->currentKey, pFillInfo->interval.sliding * step,
|
||||||
pFillInfo->interval.slidingUnit, pFillInfo->precision);
|
pFillInfo->interval.slidingUnit, pFillInfo->interval.precision);
|
||||||
pFillInfo->index += 1;
|
pFillInfo->index += 1;
|
||||||
pFillInfo->numOfCurrent += 1;
|
pFillInfo->numOfCurrent += 1;
|
||||||
}
|
}
|
||||||
|
@ -301,7 +303,7 @@ static int32_t setTagColumnInfo(SFillInfo* pFillInfo, int32_t numOfCols, int32_t
|
||||||
bool exists = false;
|
bool exists = false;
|
||||||
int32_t index = -1;
|
int32_t index = -1;
|
||||||
for (int32_t j = 0; j < k; ++j) {
|
for (int32_t j = 0; j < k; ++j) {
|
||||||
if (pFillInfo->pTags[j].col.colId == pColInfo->col.colId) {
|
if (pFillInfo->pTags[j].col.colId == pColInfo->col.slotId) {
|
||||||
exists = true;
|
exists = true;
|
||||||
index = j;
|
index = j;
|
||||||
break;
|
break;
|
||||||
|
@ -310,7 +312,7 @@ static int32_t setTagColumnInfo(SFillInfo* pFillInfo, int32_t numOfCols, int32_t
|
||||||
|
|
||||||
if (!exists) {
|
if (!exists) {
|
||||||
SSchema* pSchema = &pFillInfo->pTags[k].col;
|
SSchema* pSchema = &pFillInfo->pTags[k].col;
|
||||||
pSchema->colId = pColInfo->col.colId;
|
pSchema->colId = pColInfo->col.slotId;
|
||||||
pSchema->type = pColInfo->col.type;
|
pSchema->type = pColInfo->col.type;
|
||||||
pSchema->bytes = pColInfo->col.bytes;
|
pSchema->bytes = pColInfo->col.bytes;
|
||||||
|
|
||||||
|
@ -341,30 +343,40 @@ static int32_t taosNumOfRemainRows(SFillInfo* pFillInfo) {
|
||||||
}
|
}
|
||||||
|
|
||||||
struct SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_t capacity, int32_t numOfCols,
|
struct SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_t capacity, int32_t numOfCols,
|
||||||
int64_t slidingTime, int8_t slidingUnit, int8_t precision, int32_t fillType,
|
SInterval* pInterval, int32_t fillType, struct SFillColInfo* pCol, const char* id) {
|
||||||
struct SFillColInfo* pCol, const char* id) {
|
|
||||||
if (fillType == TSDB_FILL_NONE) {
|
if (fillType == TSDB_FILL_NONE) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
SFillInfo* pFillInfo = taosMemoryCalloc(1, sizeof(SFillInfo));
|
SFillInfo* pFillInfo = taosMemoryCalloc(1, sizeof(SFillInfo));
|
||||||
|
if (pFillInfo == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
taosResetFillInfo(pFillInfo, skey);
|
taosResetFillInfo(pFillInfo, skey);
|
||||||
|
|
||||||
pFillInfo->order = order;
|
pFillInfo->order = order;
|
||||||
|
|
||||||
|
switch(fillType) {
|
||||||
|
case FILL_MODE_NONE: pFillInfo->type = TSDB_FILL_NONE; break;
|
||||||
|
case FILL_MODE_PREV: pFillInfo->type = TSDB_FILL_PREV; break;
|
||||||
|
case FILL_MODE_NULL: pFillInfo->type = TSDB_FILL_NULL; break;
|
||||||
|
case FILL_MODE_LINEAR: pFillInfo->type = TSDB_FILL_LINEAR;break;
|
||||||
|
case FILL_MODE_NEXT: pFillInfo->type = TSDB_FILL_NEXT; break;
|
||||||
|
default:
|
||||||
|
terrno = TSDB_CODE_INVALID_PARA;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
pFillInfo->type = fillType;
|
pFillInfo->type = fillType;
|
||||||
pFillInfo->pFillCol = pCol;
|
pFillInfo->pFillCol = pCol;
|
||||||
pFillInfo->numOfTags = numOfTags;
|
pFillInfo->numOfTags = numOfTags;
|
||||||
pFillInfo->numOfCols = numOfCols;
|
pFillInfo->numOfCols = numOfCols;
|
||||||
pFillInfo->precision = precision;
|
|
||||||
pFillInfo->alloc = capacity;
|
pFillInfo->alloc = capacity;
|
||||||
pFillInfo->id = id;
|
pFillInfo->id = id;
|
||||||
|
pFillInfo->interval = *pInterval;
|
||||||
pFillInfo->interval.interval = slidingTime;
|
pFillInfo->pData = taosMemoryMalloc(POINTER_BYTES * numOfCols);
|
||||||
pFillInfo->interval.intervalUnit = slidingUnit;
|
|
||||||
pFillInfo->interval.sliding = slidingTime;
|
|
||||||
pFillInfo->interval.slidingUnit = slidingUnit;
|
|
||||||
|
|
||||||
pFillInfo->pData = taosMemoryMalloc(POINTER_BYTES * numOfCols);
|
|
||||||
|
|
||||||
// if (numOfTags > 0) {
|
// if (numOfTags > 0) {
|
||||||
pFillInfo->pTags = taosMemoryCalloc(numOfCols, sizeof(SFillTagColInfo));
|
pFillInfo->pTags = taosMemoryCalloc(numOfCols, sizeof(SFillTagColInfo));
|
||||||
|
@ -375,7 +387,6 @@ struct SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTag
|
||||||
|
|
||||||
pFillInfo->rowSize = setTagColumnInfo(pFillInfo, pFillInfo->numOfCols, pFillInfo->alloc);
|
pFillInfo->rowSize = setTagColumnInfo(pFillInfo, pFillInfo->numOfCols, pFillInfo->alloc);
|
||||||
assert(pFillInfo->rowSize > 0);
|
assert(pFillInfo->rowSize > 0);
|
||||||
|
|
||||||
return pFillInfo;
|
return pFillInfo;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -417,7 +428,7 @@ void taosFillSetStartInfo(SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey)
|
||||||
|
|
||||||
pFillInfo->end = endKey;
|
pFillInfo->end = endKey;
|
||||||
if (!FILL_IS_ASC_FILL(pFillInfo)) {
|
if (!FILL_IS_ASC_FILL(pFillInfo)) {
|
||||||
pFillInfo->end = taosTimeTruncate(endKey, &pFillInfo->interval, pFillInfo->precision);
|
pFillInfo->end = taosTimeTruncate(endKey, &pFillInfo->interval, pFillInfo->interval.precision);
|
||||||
}
|
}
|
||||||
|
|
||||||
pFillInfo->index = 0;
|
pFillInfo->index = 0;
|
||||||
|
@ -433,7 +444,7 @@ void taosFillSetInputDataBlock(SFillInfo* pFillInfo, const SSDataBlock* pInput)
|
||||||
|
|
||||||
if (TSDB_COL_IS_TAG(pCol->flag)) { // copy the tag value to tag value buffer
|
if (TSDB_COL_IS_TAG(pCol->flag)) { // copy the tag value to tag value buffer
|
||||||
SFillTagColInfo* pTag = &pFillInfo->pTags[pCol->tagIndex];
|
SFillTagColInfo* pTag = &pFillInfo->pTags[pCol->tagIndex];
|
||||||
assert (pTag->col.colId == pCol->col.colId);
|
assert (pTag->col.colId == pCol->col.slotId);
|
||||||
memcpy(pTag->tagVal, pColData->pData, pCol->col.bytes); // TODO not memcpy??
|
memcpy(pTag->tagVal, pColData->pData, pCol->col.bytes); // TODO not memcpy??
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -460,7 +471,7 @@ int64_t getNumOfResultsAfterFillGap(SFillInfo* pFillInfo, TSKEY ekey, int32_t ma
|
||||||
|
|
||||||
TSKEY ekey1 = ekey;
|
TSKEY ekey1 = ekey;
|
||||||
if (!FILL_IS_ASC_FILL(pFillInfo)) {
|
if (!FILL_IS_ASC_FILL(pFillInfo)) {
|
||||||
pFillInfo->end = taosTimeTruncate(ekey, &pFillInfo->interval, pFillInfo->precision);
|
pFillInfo->end = taosTimeTruncate(ekey, &pFillInfo->interval, pFillInfo->interval.precision);
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t numOfRes = -1;
|
int64_t numOfRes = -1;
|
||||||
|
@ -471,7 +482,7 @@ int64_t getNumOfResultsAfterFillGap(SFillInfo* pFillInfo, TSKEY ekey, int32_t ma
|
||||||
pFillInfo->currentKey,
|
pFillInfo->currentKey,
|
||||||
pFillInfo->interval.sliding,
|
pFillInfo->interval.sliding,
|
||||||
pFillInfo->interval.slidingUnit,
|
pFillInfo->interval.slidingUnit,
|
||||||
pFillInfo->precision);
|
pFillInfo->interval.precision);
|
||||||
numOfRes += 1;
|
numOfRes += 1;
|
||||||
assert(numOfRes >= numOfRows);
|
assert(numOfRes >= numOfRows);
|
||||||
} else { // reach the end of data
|
} else { // reach the end of data
|
||||||
|
@ -484,7 +495,7 @@ int64_t getNumOfResultsAfterFillGap(SFillInfo* pFillInfo, TSKEY ekey, int32_t ma
|
||||||
pFillInfo->currentKey,
|
pFillInfo->currentKey,
|
||||||
pFillInfo->interval.sliding,
|
pFillInfo->interval.sliding,
|
||||||
pFillInfo->interval.slidingUnit,
|
pFillInfo->interval.slidingUnit,
|
||||||
pFillInfo->precision);
|
pFillInfo->interval.precision);
|
||||||
numOfRes += 1;
|
numOfRes += 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -527,7 +538,7 @@ int64_t getFillInfoStart(struct SFillInfo *pFillInfo) {
|
||||||
return pFillInfo->start;
|
return pFillInfo->start;
|
||||||
}
|
}
|
||||||
|
|
||||||
struct SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfOutput, const int64_t* fillVal) {
|
struct SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfOutput, const struct SValueNode* val) {
|
||||||
int32_t offset = 0;
|
int32_t offset = 0;
|
||||||
|
|
||||||
struct SFillColInfo* pFillCol = taosMemoryCalloc(numOfOutput, sizeof(SFillColInfo));
|
struct SFillColInfo* pFillCol = taosMemoryCalloc(numOfOutput, sizeof(SFillColInfo));
|
||||||
|
@ -538,14 +549,15 @@ struct SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfOutput, co
|
||||||
for(int32_t i = 0; i < numOfOutput; ++i) {
|
for(int32_t i = 0; i < numOfOutput; ++i) {
|
||||||
SExprInfo* pExprInfo = &pExpr[i];
|
SExprInfo* pExprInfo = &pExpr[i];
|
||||||
|
|
||||||
pFillCol[i].col.bytes = pExprInfo->base.resSchema.bytes;
|
pFillCol[i].col = pExprInfo->base.resSchema;
|
||||||
pFillCol[i].col.type = (int8_t)pExprInfo->base.resSchema.type;
|
pFillCol[i].offset = offset;
|
||||||
pFillCol[i].col.offset = offset;
|
|
||||||
pFillCol[i].col.colId = pExprInfo->base.resSchema.slotId;
|
|
||||||
pFillCol[i].tagIndex = -2;
|
pFillCol[i].tagIndex = -2;
|
||||||
pFillCol[i].flag = pExprInfo->base.pParam[0].pCol->flag; // always be the normal column for table query
|
|
||||||
|
if (pExprInfo->base.numOfParams > 0) {
|
||||||
|
pFillCol[i].flag = pExprInfo->base.pParam[0].pCol->flag; // always be the normal column for table query
|
||||||
|
}
|
||||||
// pFillCol[i].functionId = pExprInfo->pExpr->_function.functionId;
|
// pFillCol[i].functionId = pExprInfo->pExpr->_function.functionId;
|
||||||
pFillCol[i].fillVal.i = fillVal[i];
|
// pFillCol[i].val.d = *val;
|
||||||
|
|
||||||
offset += pExprInfo->base.resSchema.bytes;
|
offset += pExprInfo->base.resSchema.bytes;
|
||||||
}
|
}
|
|
@ -19,14 +19,14 @@
|
||||||
#include "thash.h"
|
#include "thash.h"
|
||||||
#include "ttypes.h"
|
#include "ttypes.h"
|
||||||
|
|
||||||
|
//#include "tfill.h"
|
||||||
#include "function.h"
|
#include "function.h"
|
||||||
#include "taggfunction.h"
|
#include "taggfunction.h"
|
||||||
#include "tfill.h"
|
|
||||||
#include "thistogram.h"
|
|
||||||
#include "ttszip.h"
|
|
||||||
#include "tpercentile.h"
|
|
||||||
#include "tbuffer.h"
|
#include "tbuffer.h"
|
||||||
#include "tcompression.h"
|
#include "tcompression.h"
|
||||||
|
#include "thistogram.h"
|
||||||
|
#include "tpercentile.h"
|
||||||
|
#include "ttszip.h"
|
||||||
//#include "queryLog.h"
|
//#include "queryLog.h"
|
||||||
#include "tdatablock.h"
|
#include "tdatablock.h"
|
||||||
#include "tudf.h"
|
#include "tudf.h"
|
||||||
|
@ -3608,7 +3608,7 @@ static void interp_function_impl(SqlFunctionCtx *pCtx) {
|
||||||
if (isNull((char *)&pCtx->start.val, srcType) || isNull((char *)&pCtx->end.val, srcType)) {
|
if (isNull((char *)&pCtx->start.val, srcType) || isNull((char *)&pCtx->end.val, srcType)) {
|
||||||
setNull(pCtx->pOutput, srcType, pCtx->inputBytes);
|
setNull(pCtx->pOutput, srcType, pCtx->inputBytes);
|
||||||
} else {
|
} else {
|
||||||
taosGetLinearInterpolationVal(&point, pCtx->resDataInfo.type, &point1, &point2, TSDB_DATA_TYPE_DOUBLE);
|
// taosGetLinearInterpolationVal(&point, pCtx->resDataInfo.type, &point1, &point2, TSDB_DATA_TYPE_DOUBLE);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
setNull(pCtx->pOutput, srcType, pCtx->inputBytes);
|
setNull(pCtx->pOutput, srcType, pCtx->inputBytes);
|
||||||
|
@ -3681,7 +3681,7 @@ static void interp_function_impl(SqlFunctionCtx *pCtx) {
|
||||||
if (isNull(start, srcType) || isNull(end, srcType)) {
|
if (isNull(start, srcType) || isNull(end, srcType)) {
|
||||||
setNull(pCtx->pOutput, srcType, pCtx->inputBytes);
|
setNull(pCtx->pOutput, srcType, pCtx->inputBytes);
|
||||||
} else {
|
} else {
|
||||||
taosGetLinearInterpolationVal(&point, pCtx->resDataInfo.type, &point1, &point2, srcType);
|
// taosGetLinearInterpolationVal(&point, pCtx->resDataInfo.type, &point1, &point2, srcType);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
setNull(pCtx->pOutput, srcType, pCtx->inputBytes);
|
setNull(pCtx->pOutput, srcType, pCtx->inputBytes);
|
||||||
|
|
|
@ -45,7 +45,7 @@ sql explain select * from information_schema.user_stables;
|
||||||
sql explain select count(*),sum(f1) from tb1;
|
sql explain select count(*),sum(f1) from tb1;
|
||||||
sql explain select count(*),sum(f1) from st1;
|
sql explain select count(*),sum(f1) from st1;
|
||||||
sql explain select count(*),sum(f1) from st1 group by f1;
|
sql explain select count(*),sum(f1) from st1 group by f1;
|
||||||
sql explain select count(f1) from tb1 interval(10s, 2s) sliding(3s) fill(prev);
|
#sql explain select count(f1) from tb1 interval(10s, 2s) sliding(3s) fill(prev);
|
||||||
sql explain select min(f1) from st1 interval(1m, 2a) sliding(30s);
|
sql explain select min(f1) from st1 interval(1m, 2a) sliding(30s);
|
||||||
|
|
||||||
print ======== step3
|
print ======== step3
|
||||||
|
@ -65,7 +65,7 @@ sql explain analyze select * from information_schema.user_stables;
|
||||||
sql explain analyze select count(*),sum(f1) from tb1;
|
sql explain analyze select count(*),sum(f1) from tb1;
|
||||||
sql explain analyze select count(*),sum(f1) from st1;
|
sql explain analyze select count(*),sum(f1) from st1;
|
||||||
sql explain analyze select count(*),sum(f1) from st1 group by f1;
|
sql explain analyze select count(*),sum(f1) from st1 group by f1;
|
||||||
sql explain analyze select count(f1) from tb1 interval(10s, 2s) sliding(3s) fill(prev);
|
#sql explain analyze select count(f1) from tb1 interval(10s, 2s) sliding(3s) fill(prev);
|
||||||
sql explain analyze select min(f1) from st1 interval(3m, 2a) sliding(1m);
|
sql explain analyze select min(f1) from st1 interval(3m, 2a) sliding(1m);
|
||||||
|
|
||||||
print ======== step5
|
print ======== step5
|
||||||
|
@ -78,7 +78,7 @@ sql explain analyze verbose true select * from information_schema.user_stables;
|
||||||
sql explain analyze verbose true select count(*),sum(f1) from tb1;
|
sql explain analyze verbose true select count(*),sum(f1) from tb1;
|
||||||
sql explain analyze verbose true select count(*),sum(f1) from st1;
|
sql explain analyze verbose true select count(*),sum(f1) from st1;
|
||||||
sql explain analyze verbose true select count(*),sum(f1) from st1 group by f1;
|
sql explain analyze verbose true select count(*),sum(f1) from st1 group by f1;
|
||||||
sql explain analyze verbose true select count(f1) from tb1 interval(10s, 2s) sliding(3s) fill(prev);
|
#sql explain analyze verbose true select count(f1) from tb1 interval(10s, 2s) sliding(3s) fill(prev);
|
||||||
sql explain analyze verbose true select ts from tb1 where f1 > 0;
|
sql explain analyze verbose true select ts from tb1 where f1 > 0;
|
||||||
sql explain analyze verbose true select f1 from st1 where f1 > 0 and ts > '2020-10-31 00:00:00' and ts < '2021-10-31 00:00:00';
|
sql explain analyze verbose true select f1 from st1 where f1 > 0 and ts > '2020-10-31 00:00:00' and ts < '2021-10-31 00:00:00';
|
||||||
sql explain analyze verbose true select * from information_schema.user_stables where db_name='db2';
|
sql explain analyze verbose true select * from information_schema.user_stables where db_name='db2';
|
||||||
|
|
Loading…
Reference in New Issue