fix(query): support scalar function in fill operator.
This commit is contained in:
parent
d1c6835eda
commit
a74c830125
|
@ -627,12 +627,15 @@ typedef struct SFillOperatorInfo {
|
|||
int64_t totalInputRows;
|
||||
void** p;
|
||||
SSDataBlock* existNewGroupBlock;
|
||||
bool multigroupResult;
|
||||
STimeWindow win;
|
||||
SNode* pCondition;
|
||||
SArray* pColMatchColInfo;
|
||||
int32_t primaryTsCol;
|
||||
uint64_t curGroupId; // current handled group id
|
||||
SExprInfo* pExprInfo;
|
||||
int32_t numOfExpr;
|
||||
SExprInfo* pNotFillExprInfo;
|
||||
int32_t numOfNotFillExpr;
|
||||
} SFillOperatorInfo;
|
||||
|
||||
typedef struct SGroupbyOperatorInfo {
|
||||
|
|
|
@ -44,6 +44,7 @@ typedef struct SFillInfo {
|
|||
TSKEY end; // endKey for fill
|
||||
TSKEY currentKey; // current active timestamp, the value may be changed during the fill procedure.
|
||||
int32_t tsSlotId; // primary time stamp slot id
|
||||
int32_t srcTsSlotId; // timestamp column id in the source data block.
|
||||
int32_t order; // order [TSDB_ORDER_ASC|TSDB_ORDER_DESC]
|
||||
int32_t type; // fill type
|
||||
int32_t numOfRows; // number of rows in the input data block
|
||||
|
|
|
@ -3244,7 +3244,7 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) {
|
|||
SSDataBlock* pBlock = pDownstream->fpSet.getNextFn(pDownstream);
|
||||
if (pBlock == NULL) {
|
||||
if (pInfo->totalInputRows == 0) {
|
||||
pOperator->status = OP_EXEC_DONE;
|
||||
doSetOperatorCompleted(pOperator);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
@ -3252,6 +3252,9 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) {
|
|||
} else {
|
||||
blockDataUpdateTsWindow(pBlock, pInfo->primaryTsCol);
|
||||
|
||||
SExprSupp* pSup = &pOperator->exprSupp;
|
||||
projectApplyFunctions(pSup->pExprInfo, pInfo->pRes, pBlock, pSup->pCtx, pSup->numOfExprs, NULL);
|
||||
|
||||
if (pInfo->curGroupId == 0 || pInfo->curGroupId == pBlock->info.groupId) {
|
||||
pInfo->curGroupId = pBlock->info.groupId; // the first data block
|
||||
|
||||
|
@ -3629,10 +3632,9 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode*
|
|||
goto _error;
|
||||
}
|
||||
|
||||
int32_t num = 0, num1 = 0;
|
||||
SSDataBlock* pResBlock = createResDataBlock(pPhyFillNode->node.pOutputDataBlockDesc);
|
||||
SExprInfo* pExprInfo = createExprInfo(pPhyFillNode->pFillExprs, NULL, &num);
|
||||
SExprInfo* pCopyColumnExprInfo = createExprInfo(pPhyFillNode->pNotFillExprs, NULL, &num1);
|
||||
SExprInfo* pExprInfo = createExprInfo(pPhyFillNode->pFillExprs, NULL, &pInfo->numOfExpr);
|
||||
SExprInfo* pCopyColumnExprInfo = createExprInfo(pPhyFillNode->pNotFillExprs, NULL, &pInfo->numOfNotFillExpr);
|
||||
|
||||
SInterval* pInterval =
|
||||
QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL == downstream->operatorType
|
||||
|
@ -3645,6 +3647,7 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode*
|
|||
SResultInfo* pResultInfo = &pOperator->resultInfo;
|
||||
initResultSizeInfo(&pOperator->resultInfo, 4096);
|
||||
blockDataEnsureCapacity(pResBlock, pOperator->resultInfo.capacity);
|
||||
initExprSupp(&pOperator->exprSupp, pExprInfo, pInfo->numOfExpr);
|
||||
|
||||
pInfo->primaryTsCol = ((SColumnNode*)pPhyFillNode->pWStartTs)->slotId;
|
||||
|
||||
|
@ -3652,9 +3655,9 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode*
|
|||
SArray* pColMatchColInfo = extractColMatchInfo(pPhyFillNode->pFillExprs, pPhyFillNode->node.pOutputDataBlockDesc,
|
||||
&numOfOutputCols, COL_MATCH_FROM_SLOT_ID);
|
||||
|
||||
int32_t code = initFillInfo(pInfo, pExprInfo, num, pCopyColumnExprInfo, num1,
|
||||
(SNodeListNode*)pPhyFillNode->pValues, pPhyFillNode->timeRange,
|
||||
pResultInfo->capacity, pTaskInfo->id.str, pInterval, type, order);
|
||||
int32_t code =
|
||||
initFillInfo(pInfo, pExprInfo, pInfo->numOfExpr, pCopyColumnExprInfo, pInfo->numOfNotFillExpr, (SNodeListNode*)pPhyFillNode->pValues,
|
||||
pPhyFillNode->timeRange, pResultInfo->capacity, pTaskInfo->id.str, pInterval, type, order);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
|
@ -3667,7 +3670,7 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode*
|
|||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_FILL;
|
||||
pOperator->exprSupp.pExprInfo = pExprInfo;
|
||||
pOperator->exprSupp.numOfExprs = num;
|
||||
pOperator->exprSupp.numOfExprs = pInfo->numOfExpr;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
|
||||
|
|
|
@ -258,7 +258,7 @@ static void copyCurrentRowIntoBuf(SFillInfo* pFillInfo, int32_t rowIndex, SArray
|
|||
static int32_t fillResultImpl(SFillInfo* pFillInfo, SSDataBlock* pBlock, int32_t outputRows) {
|
||||
pFillInfo->numOfCurrent = 0;
|
||||
|
||||
SColumnInfoData* pTsCol = taosArrayGet(pFillInfo->pSrcBlock->pDataBlock, pFillInfo->tsSlotId);
|
||||
SColumnInfoData* pTsCol = taosArrayGet(pFillInfo->pSrcBlock->pDataBlock, pFillInfo->srcTsSlotId);
|
||||
|
||||
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pFillInfo->order);
|
||||
bool ascFill = FILL_IS_ASC_FILL(pFillInfo);
|
||||
|
@ -349,10 +349,6 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, SSDataBlock* pBlock, int32_t
|
|||
}
|
||||
|
||||
if (pFillInfo->index >= pFillInfo->numOfRows || pFillInfo->numOfCurrent >= outputRows) {
|
||||
/* the raw data block is exhausted, next value does not exists */
|
||||
// if (pFillInfo->index >= pFillInfo->numOfRows) {
|
||||
// taosMemoryFreeClear(*next);
|
||||
// }
|
||||
pFillInfo->numOfTotal += pFillInfo->numOfCurrent;
|
||||
return pFillInfo->numOfCurrent;
|
||||
}
|
||||
|
@ -413,7 +409,17 @@ struct SFillInfo* taosCreateFillInfo(TSKEY skey, int32_t numOfFillCols, int32_t
|
|||
}
|
||||
|
||||
pFillInfo->order = order;
|
||||
pFillInfo->tsSlotId = primaryTsSlotId;
|
||||
pFillInfo->srcTsSlotId = primaryTsSlotId;
|
||||
|
||||
for(int32_t i = 0; i < numOfNotFillCols; ++i) {
|
||||
SFillColInfo* p = &pCol[i + numOfFillCols];
|
||||
int32_t srcSlotId = GET_SRC_SLOT_ID(p);
|
||||
if (srcSlotId == primaryTsSlotId) {
|
||||
pFillInfo->tsSlotId = i + numOfFillCols;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
taosResetFillInfo(pFillInfo, skey);
|
||||
|
||||
switch (fillType) {
|
||||
|
@ -531,7 +537,7 @@ bool taosFillHasMoreResults(SFillInfo* pFillInfo) {
|
|||
}
|
||||
|
||||
int64_t getNumOfResultsAfterFillGap(SFillInfo* pFillInfo, TSKEY ekey, int32_t maxNumOfRows) {
|
||||
SColumnInfoData* pCol = taosArrayGet(pFillInfo->pSrcBlock->pDataBlock, 0);
|
||||
SColumnInfoData* pCol = taosArrayGet(pFillInfo->pSrcBlock->pDataBlock, pFillInfo->srcTsSlotId);
|
||||
|
||||
int64_t* tsList = (int64_t*)pCol->pData;
|
||||
int32_t numOfRows = taosNumOfRemainRows(pFillInfo);
|
||||
|
@ -619,9 +625,9 @@ SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfFillExpr, SExprIn
|
|||
nodesValueNodeToVariant(pv, &pFillCol[i].fillVal);
|
||||
}
|
||||
|
||||
if (pExprInfo->base.numOfParams > 0) {
|
||||
// if (pExprInfo->base.numOfParams > 0) {
|
||||
// pFillCol[i].flag = pExprInfo->base.pParam[0].pCol->flag; // always be the normal column for table query
|
||||
}
|
||||
// }
|
||||
}
|
||||
|
||||
for(int32_t i = 0; i < numOfNotFillExpr; ++i) {
|
||||
|
|
|
@ -1010,6 +1010,7 @@ if $data31 != 9.000000000 then
|
|||
return -1
|
||||
endi
|
||||
if $data41 != 12.500000000 then
|
||||
print expect 12.500000000, actual: $data41
|
||||
return -1
|
||||
endi
|
||||
if $data51 != 16.000000000 then
|
||||
|
|
Loading…
Reference in New Issue