Merge pull request #15801 from taosdata/feature/3.0_fill_enh
fix(query): fix bug in fill
This commit is contained in:
commit
3319c0937b
|
@ -27,10 +27,6 @@ else ()
|
|||
cat("${TD_SUPPORT_DIR}/taosadapter_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
|
||||
endif()
|
||||
|
||||
if(TD_LINUX_64 AND JEMALLOC_ENABLED)
|
||||
cat("${TD_SUPPORT_DIR}/jemalloc_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
|
||||
endif()
|
||||
|
||||
# pthread
|
||||
if(${BUILD_PTHREAD})
|
||||
cat("${TD_SUPPORT_DIR}/pthread_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
|
||||
|
@ -396,19 +392,6 @@ if(${BUILD_WITH_SQLITE})
|
|||
endif(NOT TD_WINDOWS)
|
||||
endif(${BUILD_WITH_SQLITE})
|
||||
|
||||
# jemalloc
|
||||
IF (TD_LINUX_64 AND JEMALLOC_ENABLED)
|
||||
include(ExternalProject)
|
||||
ExternalProject_Add(jemalloc
|
||||
PREFIX "jemalloc"
|
||||
SOURCE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/jemalloc
|
||||
BUILD_IN_SOURCE 1
|
||||
CONFIGURE_COMMAND ./autogen.sh COMMAND ./configure --prefix=${CMAKE_BINARY_DIR}/build/
|
||||
BUILD_COMMAND ${MAKE}
|
||||
)
|
||||
INCLUDE_DIRECTORIES(${CMAKE_BINARY_DIR}/build/include)
|
||||
ENDIF ()
|
||||
|
||||
# addr2line
|
||||
if(${BUILD_ADDR2LINE})
|
||||
if(NOT ${TD_WINDOWS})
|
||||
|
|
|
@ -213,6 +213,8 @@ typedef struct SWindowLogicNode {
|
|||
typedef struct SFillLogicNode {
|
||||
SLogicNode node;
|
||||
EFillMode mode;
|
||||
SNodeList* pFillExprs;
|
||||
SNodeList* pNotFillExprs;
|
||||
SNode* pWStartTs;
|
||||
SNode* pValues; // SNodeListNode
|
||||
STimeWindow timeRange;
|
||||
|
@ -440,9 +442,10 @@ typedef SIntervalPhysiNode SStreamSemiIntervalPhysiNode;
|
|||
typedef struct SFillPhysiNode {
|
||||
SPhysiNode node;
|
||||
EFillMode mode;
|
||||
SNodeList* pFillExprs;
|
||||
SNodeList* pNotFillExprs;
|
||||
SNode* pWStartTs; // SColumnNode
|
||||
SNode* pValues; // SNodeListNode
|
||||
SNodeList* pTargets;
|
||||
STimeWindow timeRange;
|
||||
EOrder inputTsOrder;
|
||||
} SFillPhysiNode;
|
||||
|
|
|
@ -53,7 +53,13 @@ typedef struct SExprNode {
|
|||
bool orderAlias;
|
||||
} SExprNode;
|
||||
|
||||
typedef enum EColumnType { COLUMN_TYPE_COLUMN = 1, COLUMN_TYPE_TAG, COLUMN_TYPE_TBNAME } EColumnType;
|
||||
typedef enum EColumnType {
|
||||
COLUMN_TYPE_COLUMN = 1,
|
||||
COLUMN_TYPE_TAG,
|
||||
COLUMN_TYPE_TBNAME,
|
||||
COLUMN_TYPE_WINDOW_PC,
|
||||
COLUMN_TYPE_GROUP_KEY
|
||||
} EColumnType;
|
||||
|
||||
typedef struct SColumnNode {
|
||||
SExprNode node; // QUERY_NODE_COLUMN
|
||||
|
@ -293,6 +299,7 @@ typedef enum ESqlClause {
|
|||
SQL_CLAUSE_WHERE,
|
||||
SQL_CLAUSE_PARTITION_BY,
|
||||
SQL_CLAUSE_WINDOW,
|
||||
SQL_CLAUSE_FILL,
|
||||
SQL_CLAUSE_GROUP_BY,
|
||||
SQL_CLAUSE_HAVING,
|
||||
SQL_CLAUSE_DISTINCT,
|
||||
|
|
|
@ -619,15 +619,20 @@ typedef struct SIndefOperatorInfo {
|
|||
typedef struct SFillOperatorInfo {
|
||||
struct SFillInfo* pFillInfo;
|
||||
SSDataBlock* pRes;
|
||||
SSDataBlock* pFinalRes;
|
||||
int64_t totalInputRows;
|
||||
void** p;
|
||||
SSDataBlock* existNewGroupBlock;
|
||||
bool multigroupResult;
|
||||
STimeWindow win;
|
||||
SNode* pCondition;
|
||||
SArray* pColMatchColInfo;
|
||||
int32_t primaryTsCol;
|
||||
int32_t primarySrcSlotId;
|
||||
uint64_t curGroupId; // current handled group id
|
||||
SExprInfo* pExprInfo;
|
||||
int32_t numOfExpr;
|
||||
SExprInfo* pNotFillExprInfo;
|
||||
int32_t numOfNotFillExpr;
|
||||
} SFillOperatorInfo;
|
||||
|
||||
typedef struct SGroupbyOperatorInfo {
|
||||
|
|
|
@ -28,8 +28,7 @@ struct SSDataBlock;
|
|||
|
||||
typedef struct SFillColInfo {
|
||||
SExprInfo *pExpr;
|
||||
int16_t flag; // column flag: TAG COLUMN|NORMAL COLUMN
|
||||
int16_t tagIndex; // index of current tag in SFillTagColInfo array list
|
||||
bool notFillCol; // denote if this column needs fill operation
|
||||
SVariant fillVal;
|
||||
} SFillColInfo;
|
||||
|
||||
|
@ -46,25 +45,27 @@ typedef struct {
|
|||
char* tagVal;
|
||||
} SFillTagColInfo;
|
||||
|
||||
typedef struct {
|
||||
int64_t key;
|
||||
SArray* pRowVal;
|
||||
} SRowVal;
|
||||
|
||||
typedef struct SFillInfo {
|
||||
TSKEY start; // start timestamp
|
||||
TSKEY end; // endKey for fill
|
||||
TSKEY currentKey; // current active timestamp, the value may be changed during the fill procedure.
|
||||
int32_t tsSlotId; // primary time stamp slot id
|
||||
int32_t srcTsSlotId; // timestamp column id in the source data block.
|
||||
int32_t order; // order [TSDB_ORDER_ASC|TSDB_ORDER_DESC]
|
||||
int32_t type; // fill type
|
||||
int32_t numOfRows; // number of rows in the input data block
|
||||
int32_t index; // active row index
|
||||
int32_t numOfTotal; // number of filled rows in one round
|
||||
int32_t numOfCurrent; // number of filled rows in current results
|
||||
|
||||
int32_t numOfTags; // number of tags
|
||||
int32_t numOfCols; // number of columns, including the tags columns
|
||||
int32_t rowSize; // size of each row
|
||||
SInterval interval;
|
||||
|
||||
SArray *prev;
|
||||
SArray *next;
|
||||
SRowVal prev;
|
||||
SRowVal next;
|
||||
SSDataBlock *pSrcBlock;
|
||||
int32_t alloc; // data buffer size in rows
|
||||
|
||||
|
@ -79,10 +80,10 @@ int64_t getNumOfResultsAfterFillGap(SFillInfo* pFillInfo, int64_t ekey, int32_t
|
|||
void taosFillSetStartInfo(struct SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey);
|
||||
void taosResetFillInfo(struct SFillInfo* pFillInfo, TSKEY startTimestamp);
|
||||
void taosFillSetInputDataBlock(struct SFillInfo* pFillInfo, const struct SSDataBlock* pInput);
|
||||
struct SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfOutput, const struct SNodeListNode* val);
|
||||
struct SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfFillExpr, SExprInfo* pNotFillExpr, int32_t numOfNotFillCols, const struct SNodeListNode* val);
|
||||
bool taosFillHasMoreResults(struct SFillInfo* pFillInfo);
|
||||
|
||||
SFillInfo* taosCreateFillInfo(TSKEY skey, int32_t numOfTags, int32_t capacity, int32_t numOfCols,
|
||||
SFillInfo* taosCreateFillInfo(TSKEY skey, int32_t numOfFillCols, int32_t numOfNotFillCols, int32_t capacity,
|
||||
SInterval* pInterval, int32_t fillType, struct SFillColInfo* pCol, int32_t slotId,
|
||||
int32_t order, const char* id);
|
||||
|
||||
|
|
|
@ -3216,36 +3216,71 @@ int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDa
|
|||
}
|
||||
}
|
||||
|
||||
static void doHandleRemainBlockForNewGroupImpl(SFillOperatorInfo* pInfo, SResultInfo* pResultInfo,
|
||||
static void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t order, int32_t scanFlag);
|
||||
static void doHandleRemainBlockForNewGroupImpl(SOperatorInfo *pOperator, SFillOperatorInfo* pInfo, SResultInfo* pResultInfo,
|
||||
SExecTaskInfo* pTaskInfo) {
|
||||
pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows;
|
||||
SSDataBlock* pResBlock = pInfo->pFinalRes;
|
||||
|
||||
int32_t order = TSDB_ORDER_ASC;
|
||||
int32_t scanFlag = MAIN_SCAN;
|
||||
getTableScanInfo(pOperator, &order, &scanFlag);
|
||||
|
||||
int64_t ekey =
|
||||
Q_STATUS_EQUAL(pTaskInfo->status, TASK_COMPLETED) ? pInfo->win.ekey : pInfo->existNewGroupBlock->info.window.ekey;
|
||||
taosResetFillInfo(pInfo->pFillInfo, getFillInfoStart(pInfo->pFillInfo));
|
||||
|
||||
taosFillSetStartInfo(pInfo->pFillInfo, pInfo->existNewGroupBlock->info.rows, ekey);
|
||||
taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->existNewGroupBlock);
|
||||
doApplyScalarCalculation(pOperator, pInfo->existNewGroupBlock, order, scanFlag);
|
||||
|
||||
int32_t numOfResultRows = pResultInfo->capacity - pInfo->pRes->info.rows;
|
||||
taosFillResultDataBlock(pInfo->pFillInfo, pInfo->pRes, numOfResultRows);
|
||||
taosFillSetStartInfo(pInfo->pFillInfo, pInfo->pRes->info.rows, ekey);
|
||||
taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->pRes);
|
||||
|
||||
int32_t numOfResultRows = pResultInfo->capacity - pResBlock->info.rows;
|
||||
taosFillResultDataBlock(pInfo->pFillInfo, pResBlock, numOfResultRows);
|
||||
|
||||
pInfo->curGroupId = pInfo->existNewGroupBlock->info.groupId;
|
||||
pInfo->existNewGroupBlock = NULL;
|
||||
}
|
||||
|
||||
static void doHandleRemainBlockFromNewGroup(SFillOperatorInfo* pInfo, SResultInfo* pResultInfo,
|
||||
static void doHandleRemainBlockFromNewGroup(SOperatorInfo* pOperator, SFillOperatorInfo* pInfo, SResultInfo* pResultInfo,
|
||||
SExecTaskInfo* pTaskInfo) {
|
||||
if (taosFillHasMoreResults(pInfo->pFillInfo)) {
|
||||
int32_t numOfResultRows = pResultInfo->capacity - pInfo->pRes->info.rows;
|
||||
taosFillResultDataBlock(pInfo->pFillInfo, pInfo->pRes, numOfResultRows);
|
||||
int32_t numOfResultRows = pResultInfo->capacity - pInfo->pFinalRes->info.rows;
|
||||
taosFillResultDataBlock(pInfo->pFillInfo, pInfo->pFinalRes, numOfResultRows);
|
||||
pInfo->pRes->info.groupId = pInfo->curGroupId;
|
||||
return;
|
||||
}
|
||||
|
||||
// handle the cached new group data block
|
||||
if (pInfo->existNewGroupBlock) {
|
||||
doHandleRemainBlockForNewGroupImpl(pInfo, pResultInfo, pTaskInfo);
|
||||
doHandleRemainBlockForNewGroupImpl(pOperator, pInfo, pResultInfo, pTaskInfo);
|
||||
}
|
||||
}
|
||||
|
||||
static void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t order, int32_t scanFlag) {
|
||||
SFillOperatorInfo* pInfo = pOperator->info;
|
||||
SExprSupp* pSup = &pOperator->exprSupp;
|
||||
SSDataBlock* pResBlock = pInfo->pFinalRes;
|
||||
|
||||
setInputDataBlock(pOperator, pSup->pCtx, pBlock, order, scanFlag, false);
|
||||
projectApplyFunctions(pSup->pExprInfo, pInfo->pRes, pBlock, pSup->pCtx, pSup->numOfExprs, NULL);
|
||||
pInfo->pRes->info.groupId = pBlock->info.groupId;
|
||||
|
||||
SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, pInfo->primaryTsCol);
|
||||
SColumnInfoData* pSrc = taosArrayGet(pBlock->pDataBlock, pInfo->primarySrcSlotId);
|
||||
colDataAssign(pDst, pSrc, pInfo->pRes->info.rows, &pResBlock->info);
|
||||
|
||||
for(int32_t i = 0; i < pInfo->numOfNotFillExpr; ++i) {
|
||||
SFillColInfo* pCol = &pInfo->pFillInfo->pFillCol[i + pInfo->numOfExpr];
|
||||
ASSERT(pCol->notFillCol);
|
||||
|
||||
SExprInfo* pExpr = pCol->pExpr;
|
||||
int32_t srcSlotId = pExpr->base.pParam[0].pCol->slotId;
|
||||
int32_t dstSlotId = pExpr->base.resSchema.slotId;
|
||||
|
||||
SColumnInfoData* pDst1 = taosArrayGet(pInfo->pRes->pDataBlock, dstSlotId);
|
||||
SColumnInfoData* pSrc1 = taosArrayGet(pBlock->pDataBlock, srcSlotId);
|
||||
colDataAssign(pDst1, pSrc1, pInfo->pRes->info.rows, &pResBlock->info);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -3254,11 +3289,16 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) {
|
|||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
|
||||
SResultInfo* pResultInfo = &pOperator->resultInfo;
|
||||
SSDataBlock* pResBlock = pInfo->pRes;
|
||||
SSDataBlock* pResBlock = pInfo->pFinalRes;
|
||||
|
||||
blockDataCleanup(pResBlock);
|
||||
blockDataCleanup(pInfo->pRes);
|
||||
|
||||
doHandleRemainBlockFromNewGroup(pInfo, pResultInfo, pTaskInfo);
|
||||
int32_t order = TSDB_ORDER_ASC;
|
||||
int32_t scanFlag = MAIN_SCAN;
|
||||
getTableScanInfo(pOperator, &order, &scanFlag);
|
||||
|
||||
doHandleRemainBlockFromNewGroup(pOperator, pInfo, pResultInfo, pTaskInfo);
|
||||
if (pResBlock->info.rows > 0) {
|
||||
pResBlock->info.groupId = pInfo->curGroupId;
|
||||
return pResBlock;
|
||||
|
@ -3269,21 +3309,21 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) {
|
|||
SSDataBlock* pBlock = pDownstream->fpSet.getNextFn(pDownstream);
|
||||
if (pBlock == NULL) {
|
||||
if (pInfo->totalInputRows == 0) {
|
||||
pOperator->status = OP_EXEC_DONE;
|
||||
doSetOperatorCompleted(pOperator);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
taosFillSetStartInfo(pInfo->pFillInfo, 0, pInfo->win.ekey);
|
||||
} else {
|
||||
blockDataUpdateTsWindow(pBlock, pInfo->primaryTsCol);
|
||||
doApplyScalarCalculation(pOperator, pBlock, order, scanFlag);
|
||||
|
||||
if (pInfo->curGroupId == 0 || pInfo->curGroupId == pBlock->info.groupId) {
|
||||
pInfo->curGroupId = pBlock->info.groupId; // the first data block
|
||||
if (pInfo->curGroupId == 0 || pInfo->curGroupId == pInfo->pRes->info.groupId) {
|
||||
pInfo->curGroupId = pInfo->pRes->info.groupId; // the first data block
|
||||
pInfo->totalInputRows += pInfo->pRes->info.rows;
|
||||
|
||||
pInfo->totalInputRows += pBlock->info.rows;
|
||||
|
||||
taosFillSetStartInfo(pInfo->pFillInfo, pBlock->info.rows, pBlock->info.window.ekey);
|
||||
taosFillSetInputDataBlock(pInfo->pFillInfo, pBlock);
|
||||
taosFillSetStartInfo(pInfo->pFillInfo, pInfo->pRes->info.rows, pBlock->info.window.ekey);
|
||||
taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->pRes);
|
||||
} else if (pInfo->curGroupId != pBlock->info.groupId) { // the new group data block
|
||||
pInfo->existNewGroupBlock = pBlock;
|
||||
|
||||
|
@ -3293,8 +3333,6 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) {
|
|||
}
|
||||
}
|
||||
|
||||
blockDataEnsureCapacity(pResBlock, pOperator->resultInfo.capacity);
|
||||
|
||||
int32_t numOfResultRows = pOperator->resultInfo.capacity - pResBlock->info.rows;
|
||||
taosFillResultDataBlock(pInfo->pFillInfo, pResBlock, numOfResultRows);
|
||||
|
||||
|
@ -3307,14 +3345,18 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) {
|
|||
return pResBlock;
|
||||
}
|
||||
|
||||
doHandleRemainBlockFromNewGroup(pInfo, pResultInfo, pTaskInfo);
|
||||
doHandleRemainBlockFromNewGroup(pOperator, pInfo, pResultInfo, pTaskInfo);
|
||||
if (pResBlock->info.rows >= pOperator->resultInfo.threshold || pBlock == NULL) {
|
||||
pResBlock->info.groupId = pInfo->curGroupId;
|
||||
return pResBlock;
|
||||
}
|
||||
} else if (pInfo->existNewGroupBlock) { // try next group
|
||||
assert(pBlock != NULL);
|
||||
doHandleRemainBlockForNewGroupImpl(pInfo, pResultInfo, pTaskInfo);
|
||||
|
||||
blockDataCleanup(pResBlock);
|
||||
blockDataCleanup(pInfo->pRes);
|
||||
|
||||
doHandleRemainBlockForNewGroupImpl(pOperator, pInfo, pResultInfo, pTaskInfo);
|
||||
if (pResBlock->info.rows > pResultInfo->threshold) {
|
||||
pResBlock->info.groupId = pInfo->curGroupId;
|
||||
return pResBlock;
|
||||
|
@ -3605,6 +3647,13 @@ void destroyFillOperatorInfo(void* param, int32_t numOfOutput) {
|
|||
SFillOperatorInfo* pInfo = (SFillOperatorInfo*)param;
|
||||
pInfo->pFillInfo = taosDestroyFillInfo(pInfo->pFillInfo);
|
||||
pInfo->pRes = blockDataDestroy(pInfo->pRes);
|
||||
pInfo->pFinalRes = blockDataDestroy(pInfo->pFinalRes);
|
||||
|
||||
if (pInfo->pNotFillExprInfo != NULL) {
|
||||
destroyExprInfo(pInfo->pNotFillExprInfo, pInfo->numOfNotFillExpr);
|
||||
taosMemoryFree(pInfo->pNotFillExprInfo);
|
||||
}
|
||||
|
||||
taosMemoryFreeClear(pInfo->p);
|
||||
taosArrayDestroy(pInfo->pColMatchColInfo);
|
||||
taosMemoryFreeClear(param);
|
||||
|
@ -3637,16 +3686,16 @@ void doDestroyExchangeOperatorInfo(void* param) {
|
|||
taosMemoryFreeClear(param);
|
||||
}
|
||||
|
||||
static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t numOfCols, SNodeListNode* pValNode,
|
||||
STimeWindow win, int32_t capacity, const char* id, SInterval* pInterval, int32_t fillType,
|
||||
int32_t order) {
|
||||
SFillColInfo* pColInfo = createFillColInfo(pExpr, numOfCols, pValNode);
|
||||
static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t numOfCols, SExprInfo* pNotFillExpr,
|
||||
int32_t numOfNotFillCols, SNodeListNode* pValNode, STimeWindow win, int32_t capacity,
|
||||
const char* id, SInterval* pInterval, int32_t fillType, int32_t order) {
|
||||
SFillColInfo* pColInfo = createFillColInfo(pExpr, numOfCols, pNotFillExpr, numOfNotFillCols, pValNode);
|
||||
|
||||
STimeWindow w = getAlignQueryTimeWindow(pInterval, pInterval->precision, win.skey);
|
||||
w = getFirstQualifiedTimeWindow(win.skey, &w, pInterval, TSDB_ORDER_ASC);
|
||||
|
||||
pInfo->pFillInfo =
|
||||
taosCreateFillInfo(w.skey, 0, capacity, numOfCols, pInterval, fillType, pColInfo, pInfo->primaryTsCol, order, id);
|
||||
taosCreateFillInfo(w.skey, numOfCols, numOfNotFillCols, capacity, pInterval, fillType, pColInfo, pInfo->primaryTsCol, order, id);
|
||||
|
||||
pInfo->win = win;
|
||||
pInfo->p = taosMemoryCalloc(numOfCols, POINTER_BYTES);
|
||||
|
@ -3668,9 +3717,10 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode*
|
|||
goto _error;
|
||||
}
|
||||
|
||||
int32_t num = 0;
|
||||
SSDataBlock* pResBlock = createResDataBlock(pPhyFillNode->node.pOutputDataBlockDesc);
|
||||
SExprInfo* pExprInfo = createExprInfo(pPhyFillNode->pTargets, NULL, &num);
|
||||
SExprInfo* pExprInfo = createExprInfo(pPhyFillNode->pFillExprs, NULL, &pInfo->numOfExpr);
|
||||
pInfo->pNotFillExprInfo = createExprInfo(pPhyFillNode->pNotFillExprs, NULL, &pInfo->numOfNotFillExpr);
|
||||
|
||||
SInterval* pInterval =
|
||||
QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL == downstream->operatorType
|
||||
? &((SMergeAlignedIntervalAggOperatorInfo*)downstream->info)->intervalAggOperatorInfo->interval
|
||||
|
@ -3681,19 +3731,27 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode*
|
|||
|
||||
SResultInfo* pResultInfo = &pOperator->resultInfo;
|
||||
initResultSizeInfo(&pOperator->resultInfo, 4096);
|
||||
pInfo->primaryTsCol = ((SColumnNode*)pPhyFillNode->pWStartTs)->slotId;
|
||||
blockDataEnsureCapacity(pResBlock, pOperator->resultInfo.capacity);
|
||||
initExprSupp(&pOperator->exprSupp, pExprInfo, pInfo->numOfExpr);
|
||||
|
||||
pInfo->primaryTsCol = ((STargetNode*)pPhyFillNode->pWStartTs)->slotId;
|
||||
pInfo->primarySrcSlotId = ((SColumnNode*)((STargetNode*)pPhyFillNode->pWStartTs)->pExpr)->slotId;
|
||||
|
||||
int32_t numOfOutputCols = 0;
|
||||
SArray* pColMatchColInfo = extractColMatchInfo(pPhyFillNode->pTargets, pPhyFillNode->node.pOutputDataBlockDesc,
|
||||
SArray* pColMatchColInfo = extractColMatchInfo(pPhyFillNode->pFillExprs, pPhyFillNode->node.pOutputDataBlockDesc,
|
||||
&numOfOutputCols, COL_MATCH_FROM_SLOT_ID);
|
||||
|
||||
int32_t code = initFillInfo(pInfo, pExprInfo, num, (SNodeListNode*)pPhyFillNode->pValues, pPhyFillNode->timeRange,
|
||||
pResultInfo->capacity, pTaskInfo->id.str, pInterval, type, order);
|
||||
int32_t code =
|
||||
initFillInfo(pInfo, pExprInfo, pInfo->numOfExpr, pInfo->pNotFillExprInfo, pInfo->numOfNotFillExpr, (SNodeListNode*)pPhyFillNode->pValues,
|
||||
pPhyFillNode->timeRange, pResultInfo->capacity, pTaskInfo->id.str, pInterval, type, order);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
|
||||
pInfo->pRes = pResBlock;
|
||||
pInfo->pFinalRes = createOneDataBlock(pResBlock, false);
|
||||
blockDataEnsureCapacity(pInfo->pFinalRes, pOperator->resultInfo.capacity);
|
||||
|
||||
pInfo->pCondition = pPhyFillNode->node.pConditions;
|
||||
pInfo->pColMatchColInfo = pColMatchColInfo;
|
||||
pOperator->name = "FillOperator";
|
||||
|
@ -3701,7 +3759,7 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode*
|
|||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_FILL;
|
||||
pOperator->exprSupp.pExprInfo = pExprInfo;
|
||||
pOperator->exprSupp.numOfExprs = num;
|
||||
pOperator->exprSupp.numOfExprs = pInfo->numOfExpr;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
|
||||
|
|
|
@ -33,39 +33,29 @@
|
|||
#define DO_INTERPOLATION(_v1, _v2, _k1, _k2, _k) \
|
||||
((_v1) + ((_v2) - (_v1)) * (((double)(_k)) - ((double)(_k1))) / (((double)(_k2)) - ((double)(_k1))))
|
||||
|
||||
static void setTagsValue(SFillInfo* pFillInfo, void** data, int32_t genRows) {
|
||||
for (int32_t j = 0; j < pFillInfo->numOfCols; ++j) {
|
||||
SFillColInfo* pCol = &pFillInfo->pFillCol[j];
|
||||
if (TSDB_COL_IS_NORMAL_COL(pCol->flag) || TSDB_COL_IS_UD_COL(pCol->flag)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
SResSchema* pSchema = &pCol->pExpr->base.resSchema;
|
||||
char* val1 = elePtrAt(data[j], pSchema->bytes, genRows);
|
||||
|
||||
assert(pCol->tagIndex >= 0 && pCol->tagIndex < pFillInfo->numOfTags);
|
||||
SFillTagColInfo* pTag = &pFillInfo->pTags[pCol->tagIndex];
|
||||
assignVal(val1, pTag->tagVal, pSchema->bytes, pSchema->type);
|
||||
}
|
||||
}
|
||||
|
||||
static void setNullRow(SSDataBlock* pBlock, int64_t ts, int32_t rowIndex) {
|
||||
// the first are always the timestamp column, so start from the second column.
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); ++i) {
|
||||
SColumnInfoData* p = taosArrayGet(pBlock->pDataBlock, i);
|
||||
if (p->info.type == TSDB_DATA_TYPE_TIMESTAMP) { // handle timestamp
|
||||
colDataAppend(p, rowIndex, (const char*)&ts, false);
|
||||
} else {
|
||||
colDataAppendNULL(p, rowIndex);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#define GET_DEST_SLOT_ID(_p) ((_p)->pExpr->base.resSchema.slotId)
|
||||
#define GET_SRC_SLOT_ID(_p) ((_p)->pExpr->base.pParam[0].pCol->slotId)
|
||||
|
||||
static void doSetVal(SColumnInfoData* pDstColInfoData, int32_t rowIndex, const SGroupKeys* pKey);
|
||||
|
||||
static void setNullRow(SSDataBlock* pBlock, SFillInfo* pFillInfo, int32_t rowIndex) {
|
||||
for(int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
|
||||
SFillColInfo* pCol = &pFillInfo->pFillCol[i];
|
||||
int32_t dstSlotId = GET_DEST_SLOT_ID(pCol);
|
||||
SColumnInfoData* pDstColInfo = taosArrayGet(pBlock->pDataBlock, dstSlotId);
|
||||
if (pCol->notFillCol) {
|
||||
if (pDstColInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP) {
|
||||
colDataAppend(pDstColInfo, rowIndex, (const char*)&pFillInfo->currentKey, false);
|
||||
} else {
|
||||
SArray* p = FILL_IS_ASC_FILL(pFillInfo) ? pFillInfo->prev.pRowVal : pFillInfo->next.pRowVal;
|
||||
SGroupKeys* pKey = taosArrayGet(p, i);
|
||||
doSetVal(pDstColInfo, rowIndex, pKey);
|
||||
}
|
||||
} else {
|
||||
colDataAppendNULL(pDstColInfo, rowIndex);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static void doSetUserSpecifiedValue(SColumnInfoData* pDst, SVariant* pVar, int32_t rowIndex, int64_t currentKey) {
|
||||
if (pDst->info.type == TSDB_DATA_TYPE_FLOAT) {
|
||||
float v = 0;
|
||||
|
@ -96,13 +86,10 @@ static void doFillOneRow(SFillInfo* pFillInfo, SSDataBlock* pBlock, SSDataBlock*
|
|||
|
||||
// set the other values
|
||||
if (pFillInfo->type == TSDB_FILL_PREV) {
|
||||
SArray* p = FILL_IS_ASC_FILL(pFillInfo) ? pFillInfo->prev : pFillInfo->next;
|
||||
SArray* p = FILL_IS_ASC_FILL(pFillInfo)? pFillInfo->prev.pRowVal : pFillInfo->next.pRowVal;
|
||||
|
||||
for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
|
||||
SFillColInfo* pCol = &pFillInfo->pFillCol[i];
|
||||
if (TSDB_COL_IS_TAG(pCol->flag)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
SColumnInfoData* pDstColInfoData = taosArrayGet(pBlock->pDataBlock, GET_DEST_SLOT_ID(pCol));
|
||||
|
||||
|
@ -114,14 +101,10 @@ static void doFillOneRow(SFillInfo* pFillInfo, SSDataBlock* pBlock, SSDataBlock*
|
|||
}
|
||||
}
|
||||
} else if (pFillInfo->type == TSDB_FILL_NEXT) {
|
||||
SArray* p = FILL_IS_ASC_FILL(pFillInfo) ? pFillInfo->next : pFillInfo->prev;
|
||||
SArray* p = FILL_IS_ASC_FILL(pFillInfo) ? pFillInfo->next.pRowVal : pFillInfo->prev.pRowVal;
|
||||
// todo refactor: start from 0 not 1
|
||||
for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
|
||||
SFillColInfo* pCol = &pFillInfo->pFillCol[i];
|
||||
if (TSDB_COL_IS_TAG(pCol->flag)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
SColumnInfoData* pDstColInfoData = taosArrayGet(pBlock->pDataBlock, GET_DEST_SLOT_ID(pCol));
|
||||
|
||||
if (pDstColInfoData->info.type == TSDB_DATA_TYPE_TIMESTAMP) {
|
||||
|
@ -134,59 +117,70 @@ static void doFillOneRow(SFillInfo* pFillInfo, SSDataBlock* pBlock, SSDataBlock*
|
|||
} else if (pFillInfo->type == TSDB_FILL_LINEAR) {
|
||||
// TODO : linear interpolation supports NULL value
|
||||
if (outOfBound) {
|
||||
setNullRow(pBlock, pFillInfo->currentKey, index);
|
||||
setNullRow(pBlock, pFillInfo, index);
|
||||
} else {
|
||||
for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
|
||||
SFillColInfo* pCol = &pFillInfo->pFillCol[i];
|
||||
if (TSDB_COL_IS_TAG(pCol->flag)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
int32_t dstSlotId = GET_DEST_SLOT_ID(pCol);
|
||||
SColumnInfoData* pDstCol = taosArrayGet(pBlock->pDataBlock, dstSlotId);
|
||||
int16_t type = pDstCol->info.type;
|
||||
|
||||
int16_t type = pDstCol->info.type;
|
||||
if (type == TSDB_DATA_TYPE_TIMESTAMP) {
|
||||
colDataAppend(pDstCol, index, (const char*)&pFillInfo->currentKey, false);
|
||||
continue;
|
||||
if (pCol->notFillCol) {
|
||||
if (type == TSDB_DATA_TYPE_TIMESTAMP) {
|
||||
colDataAppend(pDstCol, index, (const char*)&pFillInfo->currentKey, false);
|
||||
} else {
|
||||
SArray* p = FILL_IS_ASC_FILL(pFillInfo) ? pFillInfo->prev.pRowVal : pFillInfo->next.pRowVal;
|
||||
SGroupKeys* pKey = taosArrayGet(p, i);
|
||||
doSetVal(pDstCol, index, pKey);
|
||||
}
|
||||
} else {
|
||||
SGroupKeys* pKey = taosArrayGet(pFillInfo->prev.pRowVal, i);
|
||||
if (IS_VAR_DATA_TYPE(type) || type == TSDB_DATA_TYPE_BOOL || pKey->isNull) {
|
||||
colDataAppendNULL(pDstCol, index);
|
||||
continue;
|
||||
}
|
||||
|
||||
SGroupKeys* pKey1 = taosArrayGet(pFillInfo->prev.pRowVal, pFillInfo->tsSlotId);
|
||||
|
||||
int64_t prevTs = *(int64_t*)pKey1->pData;
|
||||
int32_t srcSlotId = GET_DEST_SLOT_ID(pCol);
|
||||
|
||||
SColumnInfoData* pSrcCol = taosArrayGet(pSrcBlock->pDataBlock, srcSlotId);
|
||||
char* data = colDataGetData(pSrcCol, pFillInfo->index);
|
||||
|
||||
point1 = (SPoint){.key = prevTs, .val = pKey->pData};
|
||||
point2 = (SPoint){.key = ts, .val = data};
|
||||
|
||||
int64_t out = 0;
|
||||
point = (SPoint){.key = pFillInfo->currentKey, .val = &out};
|
||||
taosGetLinearInterpolationVal(&point, type, &point1, &point2, type);
|
||||
|
||||
colDataAppend(pDstCol, index, (const char*)&out, false);
|
||||
}
|
||||
|
||||
SGroupKeys* pKey = taosArrayGet(pFillInfo->prev, i);
|
||||
if (IS_VAR_DATA_TYPE(type) || type == TSDB_DATA_TYPE_BOOL || pKey->isNull) {
|
||||
colDataAppendNULL(pDstCol, index);
|
||||
continue;
|
||||
}
|
||||
|
||||
SGroupKeys* pKey1 = taosArrayGet(pFillInfo->prev, pFillInfo->tsSlotId);
|
||||
|
||||
int64_t prevTs = *(int64_t*)pKey1->pData;
|
||||
int32_t srcSlotId = GET_SRC_SLOT_ID(pCol);
|
||||
|
||||
SColumnInfoData* pSrcCol = taosArrayGet(pSrcBlock->pDataBlock, srcSlotId);
|
||||
char* data = colDataGetData(pSrcCol, pFillInfo->index);
|
||||
|
||||
point1 = (SPoint){.key = prevTs, .val = pKey->pData};
|
||||
point2 = (SPoint){.key = ts, .val = data};
|
||||
|
||||
int64_t out = 0;
|
||||
point = (SPoint){.key = pFillInfo->currentKey, .val = &out};
|
||||
taosGetLinearInterpolationVal(&point, type, &point1, &point2, type);
|
||||
|
||||
colDataAppend(pDstCol, index, (const char*)&out, false);
|
||||
}
|
||||
}
|
||||
} else if (pFillInfo->type == TSDB_FILL_NULL) { // fill with NULL
|
||||
setNullRow(pBlock, pFillInfo->currentKey, index);
|
||||
setNullRow(pBlock, pFillInfo, index);
|
||||
} else { // fill with user specified value for each column
|
||||
for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
|
||||
SFillColInfo* pCol = &pFillInfo->pFillCol[i];
|
||||
if (TSDB_COL_IS_TAG(pCol->flag)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
SVariant* pVar = &pFillInfo->pFillCol[i].fillVal;
|
||||
SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i);
|
||||
doSetUserSpecifiedValue(pDst, pVar, index, pFillInfo->currentKey);
|
||||
int32_t slotId = GET_DEST_SLOT_ID(pCol);
|
||||
SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, slotId);
|
||||
|
||||
if (pCol->notFillCol) {
|
||||
if (pDst->info.type == TSDB_DATA_TYPE_TIMESTAMP) {
|
||||
colDataAppend(pDst, index, (const char*)&pFillInfo->currentKey, false);
|
||||
} else {
|
||||
SArray* p = FILL_IS_ASC_FILL(pFillInfo) ? pFillInfo->prev.pRowVal : pFillInfo->next.pRowVal;
|
||||
SGroupKeys* pKey = taosArrayGet(p, i);
|
||||
doSetVal(pDst, index, pKey);
|
||||
}
|
||||
} else {
|
||||
SVariant* pVar = &pFillInfo->pFillCol[i].fillVal;
|
||||
doSetUserSpecifiedValue(pDst, pVar, index, pFillInfo->currentKey);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -207,7 +201,7 @@ void doSetVal(SColumnInfoData* pDstCol, int32_t rowIndex, const SGroupKeys* pKey
|
|||
}
|
||||
|
||||
static void initBeforeAfterDataBuf(SFillInfo* pFillInfo) {
|
||||
if (taosArrayGetSize(pFillInfo->next) > 0) {
|
||||
if (taosArrayGetSize(pFillInfo->next.pRowVal) > 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -221,10 +215,10 @@ static void initBeforeAfterDataBuf(SFillInfo* pFillInfo) {
|
|||
key.bytes = pSchema->bytes;
|
||||
key.type = pSchema->type;
|
||||
|
||||
taosArrayPush(pFillInfo->next, &key);
|
||||
taosArrayPush(pFillInfo->next.pRowVal, &key);
|
||||
|
||||
key.pData = taosMemoryMalloc(pSchema->bytes);
|
||||
taosArrayPush(pFillInfo->prev, &key);
|
||||
taosArrayPush(pFillInfo->prev.pRowVal, &key);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -232,20 +226,31 @@ static void saveColData(SArray* rowBuf, int32_t columnIndex, const char* src, bo
|
|||
|
||||
static void copyCurrentRowIntoBuf(SFillInfo* pFillInfo, int32_t rowIndex, SArray* pRow) {
|
||||
for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
|
||||
int32_t srcSlotId = GET_SRC_SLOT_ID(&pFillInfo->pFillCol[i]);
|
||||
int32_t type = pFillInfo->pFillCol[i].pExpr->pExpr->nodeType;
|
||||
if (type == QUERY_NODE_COLUMN) {
|
||||
int32_t srcSlotId = GET_DEST_SLOT_ID(&pFillInfo->pFillCol[i]);
|
||||
|
||||
SColumnInfoData* pSrcCol = taosArrayGet(pFillInfo->pSrcBlock->pDataBlock, srcSlotId);
|
||||
SColumnInfoData* pSrcCol = taosArrayGet(pFillInfo->pSrcBlock->pDataBlock, srcSlotId);
|
||||
|
||||
bool isNull = colDataIsNull_s(pSrcCol, rowIndex);
|
||||
char* p = colDataGetData(pSrcCol, rowIndex);
|
||||
saveColData(pRow, i, p, isNull);
|
||||
bool isNull = colDataIsNull_s(pSrcCol, rowIndex);
|
||||
char* p = colDataGetData(pSrcCol, rowIndex);
|
||||
saveColData(pRow, i, p, isNull);
|
||||
} else if (type == QUERY_NODE_OPERATOR) {
|
||||
SColumnInfoData* pSrcCol = taosArrayGet(pFillInfo->pSrcBlock->pDataBlock, i);
|
||||
|
||||
bool isNull = colDataIsNull_s(pSrcCol, rowIndex);
|
||||
char* p = colDataGetData(pSrcCol, rowIndex);
|
||||
saveColData(pRow, i, p, isNull);
|
||||
} else {
|
||||
ASSERT(0);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static int32_t fillResultImpl(SFillInfo* pFillInfo, SSDataBlock* pBlock, int32_t outputRows) {
|
||||
pFillInfo->numOfCurrent = 0;
|
||||
|
||||
SColumnInfoData* pTsCol = taosArrayGet(pFillInfo->pSrcBlock->pDataBlock, pFillInfo->tsSlotId);
|
||||
SColumnInfoData* pTsCol = taosArrayGet(pFillInfo->pSrcBlock->pDataBlock, pFillInfo->srcTsSlotId);
|
||||
|
||||
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pFillInfo->order);
|
||||
bool ascFill = FILL_IS_ASC_FILL(pFillInfo);
|
||||
|
@ -259,7 +264,7 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, SSDataBlock* pBlock, int32_t
|
|||
|
||||
// set the next value for interpolation
|
||||
if ((pFillInfo->currentKey < ts && ascFill) || (pFillInfo->currentKey > ts && !ascFill)) {
|
||||
copyCurrentRowIntoBuf(pFillInfo, pFillInfo->index, pFillInfo->next);
|
||||
copyCurrentRowIntoBuf(pFillInfo, pFillInfo->index, pFillInfo->next.pRowVal);
|
||||
}
|
||||
|
||||
if (((pFillInfo->currentKey < ts && ascFill) || (pFillInfo->currentKey > ts && !ascFill)) &&
|
||||
|
@ -281,43 +286,38 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, SSDataBlock* pBlock, int32_t
|
|||
|
||||
if (pFillInfo->type == TSDB_FILL_NEXT && (pFillInfo->index + 1) < pFillInfo->numOfRows) {
|
||||
int32_t nextRowIndex = pFillInfo->index + 1;
|
||||
copyCurrentRowIntoBuf(pFillInfo, nextRowIndex, pFillInfo->next);
|
||||
copyCurrentRowIntoBuf(pFillInfo, nextRowIndex, pFillInfo->next.pRowVal);
|
||||
}
|
||||
|
||||
// assign rows to dst buffer
|
||||
// copy rows to dst buffer
|
||||
for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
|
||||
SFillColInfo* pCol = &pFillInfo->pFillCol[i];
|
||||
if (TSDB_COL_IS_TAG(pCol->flag) /* || IS_VAR_DATA_TYPE(pCol->schema.type)*/) {
|
||||
continue;
|
||||
}
|
||||
|
||||
int32_t srcSlotId = GET_SRC_SLOT_ID(pCol);
|
||||
int32_t dstSlotId = GET_DEST_SLOT_ID(pCol);
|
||||
|
||||
SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, dstSlotId);
|
||||
SColumnInfoData* pSrc = taosArrayGet(pFillInfo->pSrcBlock->pDataBlock, srcSlotId);
|
||||
SColumnInfoData* pSrc = taosArrayGet(pFillInfo->pSrcBlock->pDataBlock, dstSlotId);
|
||||
|
||||
char* src = colDataGetData(pSrc, pFillInfo->index);
|
||||
if (/*i == 0 || (*/ !colDataIsNull_s(pSrc, pFillInfo->index)) {
|
||||
bool isNull = colDataIsNull_s(pSrc, pFillInfo->index);
|
||||
colDataAppend(pDst, index, src, isNull);
|
||||
saveColData(pFillInfo->prev, i, src, isNull);
|
||||
} else {
|
||||
if (!colDataIsNull_s(pSrc, pFillInfo->index)) {
|
||||
colDataAppend(pDst, index, src, false);
|
||||
saveColData(pFillInfo->prev.pRowVal, i, src, false);
|
||||
} else { // the value is null
|
||||
if (pDst->info.type == TSDB_DATA_TYPE_TIMESTAMP) {
|
||||
colDataAppend(pDst, index, (const char*)&pFillInfo->currentKey, false);
|
||||
} else { // i > 0 and data is null , do interpolation
|
||||
if (pFillInfo->type == TSDB_FILL_PREV) {
|
||||
SArray* p = FILL_IS_ASC_FILL(pFillInfo) ? pFillInfo->prev : pFillInfo->next;
|
||||
SArray* p = FILL_IS_ASC_FILL(pFillInfo) ? pFillInfo->prev.pRowVal : pFillInfo->next.pRowVal;
|
||||
SGroupKeys* pKey = taosArrayGet(p, i);
|
||||
doSetVal(pDst, index, pKey);
|
||||
} else if (pFillInfo->type == TSDB_FILL_LINEAR) {
|
||||
bool isNull = colDataIsNull_s(pSrc, pFillInfo->index);
|
||||
colDataAppend(pDst, index, src, isNull);
|
||||
saveColData(pFillInfo->prev, i, src, isNull); // todo:
|
||||
saveColData(pFillInfo->prev.pRowVal, i, src, isNull); // todo:
|
||||
} else if (pFillInfo->type == TSDB_FILL_NULL) {
|
||||
colDataAppendNULL(pDst, index);
|
||||
} else if (pFillInfo->type == TSDB_FILL_NEXT) {
|
||||
SArray* p = FILL_IS_ASC_FILL(pFillInfo) ? pFillInfo->next : pFillInfo->prev;
|
||||
SArray* p = FILL_IS_ASC_FILL(pFillInfo) ? pFillInfo->next.pRowVal : pFillInfo->prev.pRowVal;
|
||||
SGroupKeys* pKey = taosArrayGet(p, i);
|
||||
doSetVal(pDst, index, pKey);
|
||||
} else {
|
||||
|
@ -340,10 +340,6 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, SSDataBlock* pBlock, int32_t
|
|||
}
|
||||
|
||||
if (pFillInfo->index >= pFillInfo->numOfRows || pFillInfo->numOfCurrent >= outputRows) {
|
||||
/* the raw data block is exhausted, next value does not exists */
|
||||
// if (pFillInfo->index >= pFillInfo->numOfRows) {
|
||||
// taosMemoryFreeClear(*next);
|
||||
// }
|
||||
pFillInfo->numOfTotal += pFillInfo->numOfCurrent;
|
||||
return pFillInfo->numOfCurrent;
|
||||
}
|
||||
|
@ -357,7 +353,11 @@ static void saveColData(SArray* rowBuf, int32_t columnIndex, const char* src, bo
|
|||
if (isNull) {
|
||||
pKey->isNull = true;
|
||||
} else {
|
||||
memcpy(pKey->pData, src, pKey->bytes);
|
||||
if (IS_VAR_DATA_TYPE(pKey->type)) {
|
||||
memcpy(pKey->pData, src, varDataTLen(src));
|
||||
} else {
|
||||
memcpy(pKey->pData, src, pKey->bytes);
|
||||
}
|
||||
pKey->isNull = false;
|
||||
}
|
||||
}
|
||||
|
@ -378,53 +378,6 @@ static int64_t appendFilledResult(SFillInfo* pFillInfo, SSDataBlock* pBlock, int
|
|||
return resultCapacity;
|
||||
}
|
||||
|
||||
// there are no duplicated tags in the SFillTagColInfo list
|
||||
static int32_t setTagColumnInfo(SFillInfo* pFillInfo, int32_t numOfCols, int32_t capacity) {
|
||||
int32_t rowsize = 0;
|
||||
int32_t numOfTags = 0;
|
||||
|
||||
int32_t k = 0;
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
SFillColInfo* pColInfo = &pFillInfo->pFillCol[i];
|
||||
SResSchema* pSchema = &pColInfo->pExpr->base.resSchema;
|
||||
|
||||
if (TSDB_COL_IS_TAG(pColInfo->flag) || pSchema->type == TSDB_DATA_TYPE_BINARY) {
|
||||
numOfTags += 1;
|
||||
|
||||
bool exists = false;
|
||||
int32_t index = -1;
|
||||
for (int32_t j = 0; j < k; ++j) {
|
||||
if (pFillInfo->pTags[j].col.colId == pSchema->slotId) {
|
||||
exists = true;
|
||||
index = j;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (!exists) {
|
||||
SSchema* pSchema1 = &pFillInfo->pTags[k].col;
|
||||
pSchema1->colId = pSchema->slotId;
|
||||
pSchema1->type = pSchema->type;
|
||||
pSchema1->bytes = pSchema->bytes;
|
||||
|
||||
pFillInfo->pTags[k].tagVal = taosMemoryCalloc(1, pSchema->bytes);
|
||||
pColInfo->tagIndex = k;
|
||||
|
||||
k += 1;
|
||||
} else {
|
||||
pColInfo->tagIndex = index;
|
||||
}
|
||||
}
|
||||
|
||||
rowsize += pSchema->bytes;
|
||||
}
|
||||
|
||||
pFillInfo->numOfTags = numOfTags;
|
||||
|
||||
assert(k <= pFillInfo->numOfTags);
|
||||
return rowsize;
|
||||
}
|
||||
|
||||
static int32_t taosNumOfRemainRows(SFillInfo* pFillInfo) {
|
||||
if (pFillInfo->numOfRows == 0 || (pFillInfo->numOfRows > 0 && pFillInfo->index >= pFillInfo->numOfRows)) {
|
||||
return 0;
|
||||
|
@ -433,7 +386,7 @@ static int32_t taosNumOfRemainRows(SFillInfo* pFillInfo) {
|
|||
return pFillInfo->numOfRows - pFillInfo->index;
|
||||
}
|
||||
|
||||
struct SFillInfo* taosCreateFillInfo(TSKEY skey, int32_t numOfTags, int32_t capacity, int32_t numOfCols,
|
||||
struct SFillInfo* taosCreateFillInfo(TSKEY skey, int32_t numOfFillCols, int32_t numOfNotFillCols, int32_t capacity,
|
||||
SInterval* pInterval, int32_t fillType, struct SFillColInfo* pCol,
|
||||
int32_t primaryTsSlotId, int32_t order, const char* id) {
|
||||
if (fillType == TSDB_FILL_NONE) {
|
||||
|
@ -447,7 +400,17 @@ struct SFillInfo* taosCreateFillInfo(TSKEY skey, int32_t numOfTags, int32_t capa
|
|||
}
|
||||
|
||||
pFillInfo->order = order;
|
||||
pFillInfo->tsSlotId = primaryTsSlotId;
|
||||
pFillInfo->srcTsSlotId = primaryTsSlotId;
|
||||
|
||||
for(int32_t i = 0; i < numOfNotFillCols; ++i) {
|
||||
SFillColInfo* p = &pCol[i + numOfFillCols];
|
||||
int32_t srcSlotId = GET_DEST_SLOT_ID(p);
|
||||
if (srcSlotId == primaryTsSlotId) {
|
||||
pFillInfo->tsSlotId = i + numOfFillCols;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
taosResetFillInfo(pFillInfo, skey);
|
||||
|
||||
switch (fillType) {
|
||||
|
@ -476,26 +439,15 @@ struct SFillInfo* taosCreateFillInfo(TSKEY skey, int32_t numOfTags, int32_t capa
|
|||
|
||||
pFillInfo->type = fillType;
|
||||
pFillInfo->pFillCol = pCol;
|
||||
pFillInfo->numOfTags = numOfTags;
|
||||
pFillInfo->numOfCols = numOfCols;
|
||||
pFillInfo->numOfCols = numOfFillCols + numOfNotFillCols;
|
||||
pFillInfo->alloc = capacity;
|
||||
pFillInfo->id = id;
|
||||
pFillInfo->interval = *pInterval;
|
||||
|
||||
// if (numOfTags > 0) {
|
||||
pFillInfo->pTags = taosMemoryCalloc(numOfCols, sizeof(SFillTagColInfo));
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
pFillInfo->pTags[i].col.colId = -2; // TODO
|
||||
}
|
||||
// }
|
||||
|
||||
pFillInfo->next = taosArrayInit(numOfCols, sizeof(SGroupKeys));
|
||||
pFillInfo->prev = taosArrayInit(numOfCols, sizeof(SGroupKeys));
|
||||
pFillInfo->next.pRowVal = taosArrayInit(pFillInfo->numOfCols, sizeof(SGroupKeys));
|
||||
pFillInfo->prev.pRowVal = taosArrayInit(pFillInfo->numOfCols, sizeof(SGroupKeys));
|
||||
|
||||
initBeforeAfterDataBuf(pFillInfo);
|
||||
|
||||
pFillInfo->rowSize = setTagColumnInfo(pFillInfo, pFillInfo->numOfCols, pFillInfo->alloc);
|
||||
assert(pFillInfo->rowSize > 0);
|
||||
return pFillInfo;
|
||||
}
|
||||
|
||||
|
@ -513,20 +465,20 @@ void* taosDestroyFillInfo(SFillInfo* pFillInfo) {
|
|||
if (pFillInfo == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pFillInfo->prev); ++i) {
|
||||
SGroupKeys* pKey = taosArrayGet(pFillInfo->prev, i);
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pFillInfo->prev.pRowVal); ++i) {
|
||||
SGroupKeys* pKey = taosArrayGet(pFillInfo->prev.pRowVal, i);
|
||||
taosMemoryFree(pKey->pData);
|
||||
}
|
||||
taosArrayDestroy(pFillInfo->prev);
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pFillInfo->next); ++i) {
|
||||
SGroupKeys* pKey = taosArrayGet(pFillInfo->next, i);
|
||||
taosArrayDestroy(pFillInfo->prev.pRowVal);
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pFillInfo->next.pRowVal); ++i) {
|
||||
SGroupKeys* pKey = taosArrayGet(pFillInfo->next.pRowVal, i);
|
||||
taosMemoryFree(pKey->pData);
|
||||
}
|
||||
taosArrayDestroy(pFillInfo->next);
|
||||
taosArrayDestroy(pFillInfo->next.pRowVal);
|
||||
|
||||
for (int32_t i = 0; i < pFillInfo->numOfTags; ++i) {
|
||||
taosMemoryFreeClear(pFillInfo->pTags[i].tagVal);
|
||||
}
|
||||
// for (int32_t i = 0; i < pFillInfo->numOfTags; ++i) {
|
||||
// taosMemoryFreeClear(pFillInfo->pTags[i].tagVal);
|
||||
// }
|
||||
|
||||
taosMemoryFreeClear(pFillInfo->pTags);
|
||||
taosMemoryFreeClear(pFillInfo->pFillCol);
|
||||
|
@ -576,7 +528,7 @@ bool taosFillHasMoreResults(SFillInfo* pFillInfo) {
|
|||
}
|
||||
|
||||
int64_t getNumOfResultsAfterFillGap(SFillInfo* pFillInfo, TSKEY ekey, int32_t maxNumOfRows) {
|
||||
SColumnInfoData* pCol = taosArrayGet(pFillInfo->pSrcBlock->pDataBlock, 0);
|
||||
SColumnInfoData* pCol = taosArrayGet(pFillInfo->pSrcBlock->pDataBlock, pFillInfo->srcTsSlotId);
|
||||
|
||||
int64_t* tsList = (int64_t*)pCol->pData;
|
||||
int32_t numOfRows = taosNumOfRemainRows(pFillInfo);
|
||||
|
@ -642,17 +594,18 @@ int64_t taosFillResultDataBlock(SFillInfo* pFillInfo, SSDataBlock* p, int32_t ca
|
|||
|
||||
int64_t getFillInfoStart(struct SFillInfo* pFillInfo) { return pFillInfo->start; }
|
||||
|
||||
SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfOutput, const struct SNodeListNode* pValNode) {
|
||||
SFillColInfo* pFillCol = taosMemoryCalloc(numOfOutput, sizeof(SFillColInfo));
|
||||
SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfFillExpr, SExprInfo* pNotFillExpr,
|
||||
int32_t numOfNotFillExpr, const struct SNodeListNode* pValNode) {
|
||||
SFillColInfo* pFillCol = taosMemoryCalloc(numOfFillExpr + numOfNotFillExpr, sizeof(SFillColInfo));
|
||||
if (pFillCol == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
size_t len = (pValNode != NULL) ? LIST_LENGTH(pValNode->pNodeList) : 0;
|
||||
for (int32_t i = 0; i < numOfOutput; ++i) {
|
||||
for (int32_t i = 0; i < numOfFillExpr; ++i) {
|
||||
SExprInfo* pExprInfo = &pExpr[i];
|
||||
pFillCol[i].pExpr = pExprInfo;
|
||||
pFillCol[i].tagIndex = -2;
|
||||
pFillCol[i].notFillCol = false;
|
||||
|
||||
// todo refactor
|
||||
if (len > 0) {
|
||||
|
@ -662,10 +615,12 @@ SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfOutput, const str
|
|||
SValueNode* pv = (SValueNode*)nodesListGetNode(pValNode->pNodeList, index);
|
||||
nodesValueNodeToVariant(pv, &pFillCol[i].fillVal);
|
||||
}
|
||||
}
|
||||
|
||||
if (pExprInfo->base.numOfParams > 0) {
|
||||
pFillCol[i].flag = pExprInfo->base.pParam[0].pCol->flag; // always be the normal column for table query
|
||||
}
|
||||
for(int32_t i = 0; i < numOfNotFillExpr; ++i) {
|
||||
SExprInfo* pExprInfo = &pNotFillExpr[i];
|
||||
pFillCol[i + numOfFillExpr].pExpr = pExprInfo;
|
||||
pFillCol[i + numOfFillExpr].notFillCol = true;
|
||||
}
|
||||
|
||||
return pFillCol;
|
||||
|
|
|
@ -2675,10 +2675,8 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode
|
|||
pInfo->fillType = convertFillType(pInterpPhyNode->fillMode);
|
||||
initResultSizeInfo(&pOperator->resultInfo, 4096);
|
||||
|
||||
pInfo->pPrevRow = NULL;
|
||||
pInfo->pNextRow = NULL;
|
||||
pInfo->pFillColInfo = createFillColInfo(pExprInfo, numOfExprs, NULL, 0, (SNodeListNode*)pInterpPhyNode->pFillValues);
|
||||
pInfo->pLinearInfo = NULL;
|
||||
pInfo->pFillColInfo = createFillColInfo(pExprInfo, numOfExprs, (SNodeListNode*)pInterpPhyNode->pFillValues);
|
||||
pInfo->pRes = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
|
||||
pInfo->win = pInterpPhyNode->timeRange;
|
||||
pInfo->interval.interval = pInterpPhyNode->interval;
|
||||
|
|
|
@ -451,6 +451,8 @@ static int32_t logicWindowCopy(const SWindowLogicNode* pSrc, SWindowLogicNode* p
|
|||
static int32_t logicFillCopy(const SFillLogicNode* pSrc, SFillLogicNode* pDst) {
|
||||
COPY_BASE_OBJECT_FIELD(node, logicNodeCopy);
|
||||
COPY_SCALAR_FIELD(mode);
|
||||
CLONE_NODE_LIST_FIELD(pFillExprs);
|
||||
CLONE_NODE_LIST_FIELD(pNotFillExprs);
|
||||
CLONE_NODE_FIELD(pWStartTs);
|
||||
CLONE_NODE_FIELD(pValues);
|
||||
COPY_OBJECT_FIELD(timeRange, sizeof(STimeWindow));
|
||||
|
|
|
@ -2086,9 +2086,10 @@ static int32_t jsonToPhysiIntervalNode(const SJson* pJson, void* pObj) {
|
|||
}
|
||||
|
||||
static const char* jkFillPhysiPlanMode = "Mode";
|
||||
static const char* jkFillPhysiPlanFillExprs = "FillExprs";
|
||||
static const char* jkFillPhysiPlanNotFillExprs = "NotFillExprs";
|
||||
static const char* jkFillPhysiPlanWStartTs = "WStartTs";
|
||||
static const char* jkFillPhysiPlanValues = "Values";
|
||||
static const char* jkFillPhysiPlanTargets = "Targets";
|
||||
static const char* jkFillPhysiPlanStartTime = "StartTime";
|
||||
static const char* jkFillPhysiPlanEndTime = "EndTime";
|
||||
static const char* jkFillPhysiPlanInputTsOrder = "inputTsOrder";
|
||||
|
@ -2100,15 +2101,18 @@ static int32_t physiFillNodeToJson(const void* pObj, SJson* pJson) {
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddIntegerToObject(pJson, jkFillPhysiPlanMode, pNode->mode);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = nodeListToJson(pJson, jkFillPhysiPlanFillExprs, pNode->pFillExprs);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = nodeListToJson(pJson, jkFillPhysiPlanNotFillExprs, pNode->pNotFillExprs);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddObject(pJson, jkFillPhysiPlanWStartTs, nodeToJson, pNode->pWStartTs);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddObject(pJson, jkFillPhysiPlanValues, nodeToJson, pNode->pValues);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = nodeListToJson(pJson, jkFillPhysiPlanTargets, pNode->pTargets);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddIntegerToObject(pJson, jkFillPhysiPlanStartTime, pNode->timeRange.skey);
|
||||
}
|
||||
|
@ -2128,7 +2132,12 @@ static int32_t jsonToPhysiFillNode(const SJson* pJson, void* pObj) {
|
|||
int32_t code = jsonToPhysicPlanNode(pJson, pObj);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
tjsonGetNumberValue(pJson, jkFillPhysiPlanMode, pNode->mode, code);
|
||||
;
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = jsonToNodeList(pJson, jkFillPhysiPlanFillExprs, &pNode->pFillExprs);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = jsonToNodeList(pJson, jkFillPhysiPlanNotFillExprs, &pNode->pNotFillExprs);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = jsonToNodeObject(pJson, jkFillPhysiPlanWStartTs, &pNode->pWStartTs);
|
||||
|
@ -2136,9 +2145,6 @@ static int32_t jsonToPhysiFillNode(const SJson* pJson, void* pObj) {
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = jsonToNodeObject(pJson, jkFillPhysiPlanValues, &pNode->pValues);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = jsonToNodeList(pJson, jkFillPhysiPlanTargets, &pNode->pTargets);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonGetBigIntValue(pJson, jkFillPhysiPlanStartTime, &pNode->timeRange.skey);
|
||||
}
|
||||
|
|
|
@ -346,6 +346,7 @@ void nodesWalkSelectStmt(SSelectStmt* pSelect, ESqlClause clause, FNodeWalker wa
|
|||
if (NULL != pSelect->pWindow && QUERY_NODE_INTERVAL_WINDOW == nodeType(pSelect->pWindow)) {
|
||||
nodesWalkExpr(((SIntervalWindowNode*)pSelect->pWindow)->pFill, walker, pContext);
|
||||
}
|
||||
case SQL_CLAUSE_FILL:
|
||||
nodesWalkExprs(pSelect->pGroupByList, walker, pContext);
|
||||
case SQL_CLAUSE_GROUP_BY:
|
||||
nodesWalkExpr(pSelect->pHaving, walker, pContext);
|
||||
|
@ -379,6 +380,7 @@ void nodesRewriteSelectStmt(SSelectStmt* pSelect, ESqlClause clause, FNodeRewrit
|
|||
if (NULL != pSelect->pWindow && QUERY_NODE_INTERVAL_WINDOW == nodeType(pSelect->pWindow)) {
|
||||
nodesRewriteExpr(&(((SIntervalWindowNode*)pSelect->pWindow)->pFill), rewriter, pContext);
|
||||
}
|
||||
case SQL_CLAUSE_FILL:
|
||||
nodesRewriteExprs(pSelect->pGroupByList, rewriter, pContext);
|
||||
case SQL_CLAUSE_GROUP_BY:
|
||||
nodesRewriteExpr(&(pSelect->pHaving), rewriter, pContext);
|
||||
|
|
|
@ -931,9 +931,10 @@ void nodesDestroyNode(SNode* pNode) {
|
|||
case QUERY_NODE_PHYSICAL_PLAN_FILL: {
|
||||
SFillPhysiNode* pPhyNode = (SFillPhysiNode*)pNode;
|
||||
destroyPhysiNode((SPhysiNode*)pPhyNode);
|
||||
nodesDestroyList(pPhyNode->pFillExprs);
|
||||
nodesDestroyList(pPhyNode->pNotFillExprs);
|
||||
nodesDestroyNode(pPhyNode->pWStartTs);
|
||||
nodesDestroyNode(pPhyNode->pValues);
|
||||
nodesDestroyList(pPhyNode->pTargets);
|
||||
break;
|
||||
}
|
||||
case QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION:
|
||||
|
|
|
@ -38,6 +38,27 @@ typedef struct SRewriteExprCxt {
|
|||
SNodeList* pExprs;
|
||||
} SRewriteExprCxt;
|
||||
|
||||
static void setColumnInfo(SFunctionNode* pFunc, SColumnNode* pCol) {
|
||||
switch (pFunc->funcType) {
|
||||
case FUNCTION_TYPE_TBNAME:
|
||||
pCol->colType = COLUMN_TYPE_TBNAME;
|
||||
break;
|
||||
case FUNCTION_TYPE_WSTART:
|
||||
case FUNCTION_TYPE_WEND:
|
||||
pCol->colId = PRIMARYKEY_TIMESTAMP_COL_ID;
|
||||
pCol->colType = COLUMN_TYPE_WINDOW_PC;
|
||||
break;
|
||||
case FUNCTION_TYPE_WDURATION:
|
||||
pCol->colType = COLUMN_TYPE_WINDOW_PC;
|
||||
break;
|
||||
case FUNCTION_TYPE_GROUP_KEY:
|
||||
pCol->colType = COLUMN_TYPE_GROUP_KEY;
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
static EDealRes doRewriteExpr(SNode** pNode, void* pContext) {
|
||||
switch (nodeType(*pNode)) {
|
||||
case QUERY_NODE_OPERATOR:
|
||||
|
@ -60,11 +81,7 @@ static EDealRes doRewriteExpr(SNode** pNode, void* pContext) {
|
|||
strcpy(pCol->node.aliasName, pToBeRewrittenExpr->aliasName);
|
||||
strcpy(pCol->colName, ((SExprNode*)pExpr)->aliasName);
|
||||
if (QUERY_NODE_FUNCTION == nodeType(pExpr)) {
|
||||
if (FUNCTION_TYPE_WSTART == ((SFunctionNode*)pExpr)->funcType) {
|
||||
pCol->colId = PRIMARYKEY_TIMESTAMP_COL_ID;
|
||||
} else if (FUNCTION_TYPE_TBNAME == ((SFunctionNode*)pExpr)->funcType) {
|
||||
pCol->colType = COLUMN_TYPE_TBNAME;
|
||||
}
|
||||
setColumnInfo((SFunctionNode*)pExpr, pCol);
|
||||
}
|
||||
nodesDestroyNode(*pNode);
|
||||
*pNode = (SNode*)pCol;
|
||||
|
@ -764,6 +781,57 @@ static int32_t createWindowLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSele
|
|||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
static EDealRes needFillValueImpl(SNode* pNode, void* pContext) {
|
||||
if (QUERY_NODE_COLUMN == nodeType(pNode)) {
|
||||
SColumnNode* pCol = (SColumnNode*)pNode;
|
||||
if (COLUMN_TYPE_WINDOW_PC != pCol->colType && COLUMN_TYPE_GROUP_KEY != pCol->colType) {
|
||||
*(bool*)pContext = true;
|
||||
return DEAL_RES_END;
|
||||
}
|
||||
}
|
||||
return DEAL_RES_CONTINUE;
|
||||
}
|
||||
|
||||
static bool needFillValue(SNode* pNode) {
|
||||
bool hasFillCol = false;
|
||||
nodesWalkExpr(pNode, needFillValueImpl, &hasFillCol);
|
||||
return hasFillCol;
|
||||
}
|
||||
|
||||
static int32_t partFillExprs(SSelectStmt* pSelect, SNodeList** pFillExprs, SNodeList** pNotFillExprs) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SNode* pProject = NULL;
|
||||
FOREACH(pProject, pSelect->pProjectionList) {
|
||||
if (needFillValue(pProject)) {
|
||||
code = nodesListMakeStrictAppend(pFillExprs, nodesCloneNode(pProject));
|
||||
} else if (QUERY_NODE_VALUE != nodeType(pProject)) {
|
||||
code = nodesListMakeStrictAppend(pNotFillExprs, nodesCloneNode(pProject));
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
NODES_DESTORY_LIST(*pFillExprs);
|
||||
NODES_DESTORY_LIST(*pNotFillExprs);
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!pSelect->isDistinct) {
|
||||
SNode* pOrderExpr = NULL;
|
||||
FOREACH(pOrderExpr, pSelect->pOrderByList) {
|
||||
SNode* pExpr = ((SOrderByExprNode*)pOrderExpr)->pExpr;
|
||||
if (needFillValue(pExpr)) {
|
||||
code = nodesListMakeStrictAppend(pFillExprs, nodesCloneNode(pExpr));
|
||||
} else if (QUERY_NODE_VALUE != nodeType(pExpr)) {
|
||||
code = nodesListMakeStrictAppend(pNotFillExprs, nodesCloneNode(pExpr));
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
NODES_DESTORY_LIST(*pFillExprs);
|
||||
NODES_DESTORY_LIST(*pNotFillExprs);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t createFillLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SLogicNode** pLogicNode) {
|
||||
if (NULL == pSelect->pWindow || QUERY_NODE_INTERVAL_WINDOW != nodeType(pSelect->pWindow) ||
|
||||
NULL == ((SIntervalWindowNode*)pSelect->pWindow)->pFill) {
|
||||
|
@ -785,10 +853,18 @@ static int32_t createFillLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
|
|||
pFill->node.resultDataOrder = pFill->node.requireDataOrder;
|
||||
pFill->inputTsOrder = ORDER_ASC;
|
||||
|
||||
int32_t code = nodesCollectColumns(pSelect, SQL_CLAUSE_WINDOW, NULL, COLLECT_COL_TYPE_ALL, &pFill->node.pTargets);
|
||||
if (TSDB_CODE_SUCCESS == code && NULL == pFill->node.pTargets) {
|
||||
code = nodesListMakeStrictAppend(&pFill->node.pTargets,
|
||||
nodesCloneNode(nodesListGetNode(pCxt->pCurrRoot->pTargets, 0)));
|
||||
int32_t code = partFillExprs(pSelect, &pFill->pFillExprs, &pFill->pNotFillExprs);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = rewriteExprsForSelect(pFill->pFillExprs, pSelect, SQL_CLAUSE_FILL);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = rewriteExprsForSelect(pFill->pNotFillExprs, pSelect, SQL_CLAUSE_FILL);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = createColumnByRewriteExprs(pFill->pFillExprs, &pFill->node.pTargets);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = createColumnByRewriteExprs(pFill->pNotFillExprs, &pFill->node.pTargets);
|
||||
}
|
||||
|
||||
pFill->mode = pFillNode->mode;
|
||||
|
|
|
@ -195,7 +195,7 @@ static int32_t addDataBlockSlotsImpl(SPhysiPlanContext* pCxt, SNodeList* pList,
|
|||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SHashObj* pHash = taosArrayGetP(pCxt->pLocationHelper, pDataBlockDesc->dataBlockId);
|
||||
int16_t nextSlotId = taosHashGetSize(pHash), slotId = 0;
|
||||
int16_t nextSlotId = LIST_LENGTH(pDataBlockDesc->pSlots), slotId = 0;
|
||||
SNode* pNode = NULL;
|
||||
FOREACH(pNode, pList) {
|
||||
SNode* pExpr = QUERY_NODE_ORDER_BY_EXPR == nodeType(pNode) ? ((SOrderByExprNode*)pNode)->pExpr : pNode;
|
||||
|
@ -311,6 +311,10 @@ static EDealRes doSetSlotId(SNode* pNode, void* pContext) {
|
|||
|
||||
static int32_t setNodeSlotId(SPhysiPlanContext* pCxt, int16_t leftDataBlockId, int16_t rightDataBlockId, SNode* pNode,
|
||||
SNode** pOutput) {
|
||||
if (NULL == pNode) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SNode* pRes = nodesCloneNode(pNode);
|
||||
if (NULL == pRes) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -332,6 +336,10 @@ static int32_t setNodeSlotId(SPhysiPlanContext* pCxt, int16_t leftDataBlockId, i
|
|||
|
||||
static int32_t setListSlotId(SPhysiPlanContext* pCxt, int16_t leftDataBlockId, int16_t rightDataBlockId,
|
||||
const SNodeList* pList, SNodeList** pOutput) {
|
||||
if (NULL == pList) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SNodeList* pRes = nodesCloneList(pList);
|
||||
if (NULL == pRes) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -1372,14 +1380,23 @@ static int32_t createFillPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren
|
|||
pFill->inputTsOrder = pFillNode->inputTsOrder;
|
||||
|
||||
SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
|
||||
int32_t code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pFillNode->node.pTargets, &pFill->pTargets);
|
||||
int32_t code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pFillNode->pFillExprs, &pFill->pFillExprs);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = addDataBlockSlots(pCxt, pFill->pTargets, pFill->node.pOutputDataBlockDesc);
|
||||
code = addDataBlockSlots(pCxt, pFill->pFillExprs, pFill->node.pOutputDataBlockDesc);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pFillNode->pNotFillExprs, &pFill->pNotFillExprs);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = addDataBlockSlots(pCxt, pFill->pNotFillExprs, pFill->node.pOutputDataBlockDesc);
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = setNodeSlotId(pCxt, pChildTupe->dataBlockId, -1, pFillNode->pWStartTs, &pFill->pWStartTs);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = addDataBlockSlot(pCxt, &pFill->pWStartTs, pFill->node.pOutputDataBlockDesc);
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code && NULL != pFillNode->pValues) {
|
||||
pFill->pValues = nodesCloneNode(pFillNode->pValues);
|
||||
|
|
|
@ -45,8 +45,15 @@ TEST_F(PlanIntervalTest, fill) {
|
|||
"WHERE ts > TIMESTAMP '2022-04-01 00:00:00' and ts < TIMESTAMP '2022-04-30 23:59:59' "
|
||||
"INTERVAL(10s) FILL(VALUE, 10, 20)");
|
||||
|
||||
run("SELECT COUNT(*) FROM st1 WHERE ts > TIMESTAMP '2022-04-01 00:00:00' and ts < TIMESTAMP '2022-04-30 23:59:59' "
|
||||
"PARTITION BY TBNAME interval(10s) fill(prev)");
|
||||
run("SELECT _WSTART, TBNAME, COUNT(*) FROM st1 "
|
||||
"WHERE ts > '2022-04-01 00:00:00' and ts < '2022-04-30 23:59:59' "
|
||||
"PARTITION BY TBNAME INTERVAL(10s) FILL(PREV)");
|
||||
|
||||
run("SELECT COUNT(c1), MAX(c3), COUNT(c1) FROM t1 "
|
||||
"WHERE ts > '2022-04-01 00:00:00' and ts < '2022-04-30 23:59:59' INTERVAL(10s) FILL(PREV)");
|
||||
|
||||
run("SELECT COUNT(c1) FROM t1 WHERE ts > '2022-04-01 00:00:00' and ts < '2022-04-30 23:59:59' "
|
||||
"PARTITION BY c2 INTERVAL(10s) FILL(PREV) ORDER BY c2");
|
||||
}
|
||||
|
||||
TEST_F(PlanIntervalTest, selectFunc) {
|
||||
|
|
|
@ -467,7 +467,6 @@ TAOS_DEFINE_ERROR(TSDB_CODE_FS_APP_ERROR, "tfs out of memory")
|
|||
TAOS_DEFINE_ERROR(TSDB_CODE_CTG_INTERNAL_ERROR, "catalog internal error")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_CTG_INVALID_INPUT, "invalid catalog input parameters")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_CTG_NOT_READY, "catalog is not ready")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_OUT_OF_MEMORY, "catalog memory error")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_CTG_SYS_ERROR, "catalog system error")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_CTG_DB_DROPPED, "Database is dropped")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_CTG_OUT_OF_SERVICE, "catalog is out of service")
|
||||
|
|
|
@ -885,15 +885,15 @@ if $data10 != @20-01-01 01:01:10.000@ then
|
|||
return -1
|
||||
endi
|
||||
|
||||
if $data11 != 1.000000000 then
|
||||
if $data11 != 99.000000000 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data12 != 1.000000000 then
|
||||
if $data12 != 91.000000000 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data13 != -87.000000000 then
|
||||
if $data13 != 90.000000000 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
|
@ -917,15 +917,15 @@ if $data70 != @20-01-01 01:02:10.000@ then
|
|||
return -1
|
||||
endi
|
||||
|
||||
if $data71 != 1.000000000 then
|
||||
if $data71 != 99.000000000 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data72 != 1.000000000 then
|
||||
if $data72 != 91.000000000 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data73 != -87.000000000 then
|
||||
if $data73 != 90.000000000 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
|
@ -994,19 +994,19 @@ if $data10 != @20-01-01 01:01:10.000@ then
|
|||
return -1
|
||||
endi
|
||||
|
||||
if $data11 != 1.000000000 then
|
||||
if $data11 != 99.000000000 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data12 != 1.000000000 then
|
||||
if $data12 != 91.000000000 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data13 != -87.000000000 then
|
||||
if $data13 != 90.000000000 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data14 != 86 then
|
||||
if $data14 != 89 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
|
|
|
@ -111,13 +111,15 @@ endi
|
|||
if $data12 != -2 then
|
||||
return -1
|
||||
endi
|
||||
if $data13 != -3.00000 then
|
||||
if $data13 != -3 then
|
||||
return -1
|
||||
endi
|
||||
if $data14 != -4.000000000 then
|
||||
if $data14 != -4.00000 then
|
||||
print expect -4.00000, actual: $data14
|
||||
return -1
|
||||
endi
|
||||
if $data15 != -5 then
|
||||
if $data15 != -5.000000000 then
|
||||
print expect -5.000000000, actual: $data15
|
||||
return -1
|
||||
endi
|
||||
if $data31 != -1 then
|
||||
|
@ -126,10 +128,10 @@ endi
|
|||
if $data52 != -2 then
|
||||
return -1
|
||||
endi
|
||||
if $data73 != -3.00000 then
|
||||
if $data73 != -3 then
|
||||
return -1
|
||||
endi
|
||||
if $data74 != -4.000000000 then
|
||||
if $data74 != -4.00000 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
|
|
|
@ -1010,6 +1010,7 @@ if $data31 != 9.000000000 then
|
|||
return -1
|
||||
endi
|
||||
if $data41 != 12.500000000 then
|
||||
print expect 12.500000000, actual: $data41
|
||||
return -1
|
||||
endi
|
||||
if $data51 != 16.000000000 then
|
||||
|
|
Loading…
Reference in New Issue