add not fill exprs for fill operator
This commit is contained in:
parent
a60f798ada
commit
d1d6689684
|
@ -325,6 +325,7 @@ typedef struct SFillLogicNode {
|
|||
SNode* pWStartTs;
|
||||
SNode* pValues; // SNodeListNode
|
||||
STimeWindow timeRange;
|
||||
SNodeList* pFillNullExprs;
|
||||
} SFillLogicNode;
|
||||
|
||||
typedef struct SSortLogicNode {
|
||||
|
@ -663,6 +664,7 @@ typedef struct SFillPhysiNode {
|
|||
SNode* pWStartTs; // SColumnNode
|
||||
SNode* pValues; // SNodeListNode
|
||||
STimeWindow timeRange;
|
||||
SNodeList* pFillNullExprs;
|
||||
} SFillPhysiNode;
|
||||
|
||||
typedef SFillPhysiNode SStreamFillPhysiNode;
|
||||
|
|
|
@ -35,6 +35,7 @@ typedef struct SFillColInfo {
|
|||
SExprInfo* pExpr;
|
||||
bool notFillCol; // denote if this column needs fill operation
|
||||
SVariant fillVal;
|
||||
bool fillNull;
|
||||
} SFillColInfo;
|
||||
|
||||
typedef struct SFillLinearInfo {
|
||||
|
@ -125,12 +126,14 @@ void taosFillSetInputDataBlock(struct SFillInfo* pFillInfo, const struc
|
|||
void taosFillUpdateStartTimestampInfo(SFillInfo* pFillInfo, int64_t ts);
|
||||
bool taosFillNotStarted(const SFillInfo* pFillInfo);
|
||||
SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfFillExpr, SExprInfo* pNotFillExpr,
|
||||
int32_t numOfNotFillCols, const struct SNodeListNode* val);
|
||||
int32_t numOfNotFillCols, SExprInfo* pFillNullExpr, int32_t numOfFillNullExprs,
|
||||
const struct SNodeListNode* val);
|
||||
bool taosFillHasMoreResults(struct SFillInfo* pFillInfo);
|
||||
|
||||
int32_t taosCreateFillInfo(TSKEY skey, int32_t numOfFillCols, int32_t numOfNotFillCols, int32_t capacity,
|
||||
SInterval* pInterval, int32_t fillType, struct SFillColInfo* pCol, int32_t slotId,
|
||||
int32_t order, const char* id, SExecTaskInfo* pTaskInfo, SFillInfo** ppFillInfo);
|
||||
int32_t taosCreateFillInfo(TSKEY skey, int32_t numOfFillCols, int32_t numOfNotFillCols, int32_t fillNullCols,
|
||||
int32_t capacity, SInterval* pInterval, int32_t fillType, struct SFillColInfo* pCol,
|
||||
int32_t slotId, int32_t order, const char* id, SExecTaskInfo* pTaskInfo,
|
||||
SFillInfo** ppFillInfo);
|
||||
|
||||
void* taosDestroyFillInfo(struct SFillInfo* pFillInfo);
|
||||
int32_t taosFillResultDataBlock(struct SFillInfo* pFillInfo, SSDataBlock* p, int32_t capacity);
|
||||
|
|
|
@ -53,6 +53,7 @@ typedef struct SFillOperatorInfo {
|
|||
SExprInfo* pExprInfo;
|
||||
int32_t numOfExpr;
|
||||
SExprSupp noFillExprSupp;
|
||||
SExprSupp fillNullExprSupp;
|
||||
} SFillOperatorInfo;
|
||||
|
||||
static void destroyFillOperatorInfo(void* param);
|
||||
|
@ -140,6 +141,15 @@ void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int
|
|||
code = projectApplyFunctions(pNoFillSupp->pExprInfo, pInfo->pRes, pBlock, pNoFillSupp->pCtx, pNoFillSupp->numOfExprs,
|
||||
NULL);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
if (pInfo->fillNullExprSupp.pExprInfo) {
|
||||
pInfo->pRes->info.rows = 0;
|
||||
code = setInputDataBlock(&pInfo->fillNullExprSupp, pBlock, order, scanFlag, false);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
code = projectApplyFunctions(pInfo->fillNullExprSupp.pExprInfo, pInfo->pRes, pBlock, pInfo->fillNullExprSupp.pCtx,
|
||||
pInfo->fillNullExprSupp.numOfExprs, NULL);
|
||||
}
|
||||
|
||||
pInfo->pRes->info.id.groupId = pBlock->info.id.groupId;
|
||||
|
||||
_end:
|
||||
|
@ -334,10 +344,11 @@ void destroyFillOperatorInfo(void* param) {
|
|||
}
|
||||
|
||||
static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t numOfCols, SExprInfo* pNotFillExpr,
|
||||
int32_t numOfNotFillCols, SNodeListNode* pValNode, STimeWindow win, int32_t capacity,
|
||||
const char* id, SInterval* pInterval, int32_t fillType, int32_t order,
|
||||
SExecTaskInfo* pTaskInfo) {
|
||||
SFillColInfo* pColInfo = createFillColInfo(pExpr, numOfCols, pNotFillExpr, numOfNotFillCols, pValNode);
|
||||
int32_t numOfNotFillCols, SExprInfo* pFillNullExpr, int32_t numOfFillNullExprs,
|
||||
SNodeListNode* pValNode, STimeWindow win, int32_t capacity, const char* id,
|
||||
SInterval* pInterval, int32_t fillType, int32_t order, SExecTaskInfo* pTaskInfo) {
|
||||
SFillColInfo* pColInfo =
|
||||
createFillColInfo(pExpr, numOfCols, pNotFillExpr, numOfNotFillCols, pFillNullExpr, numOfFillNullExprs, pValNode);
|
||||
if (!pColInfo) {
|
||||
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
|
||||
return terrno;
|
||||
|
@ -348,8 +359,8 @@ static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t
|
|||
// STimeWindow w = {0};
|
||||
// getInitialStartTimeWindow(pInterval, startKey, &w, order == TSDB_ORDER_ASC);
|
||||
pInfo->pFillInfo = NULL;
|
||||
int32_t code = taosCreateFillInfo(startKey, numOfCols, numOfNotFillCols, capacity, pInterval, fillType, pColInfo,
|
||||
pInfo->primaryTsCol, order, id, pTaskInfo, &pInfo->pFillInfo);
|
||||
int32_t code = taosCreateFillInfo(startKey, numOfCols, numOfNotFillCols, numOfFillNullExprs, capacity, pInterval,
|
||||
fillType, pColInfo, pInfo->primaryTsCol, order, id, pTaskInfo, &pInfo->pFillInfo);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
|
||||
return code;
|
||||
|
@ -455,6 +466,13 @@ int32_t createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFi
|
|||
initExprSupp(pNoFillSupp, pNoFillSupp->pExprInfo, pNoFillSupp->numOfExprs, &pTaskInfo->storageAPI.functionStore);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
code = createExprInfo(pPhyFillNode->pFillNullExprs, NULL, &pInfo->fillNullExprSupp.pExprInfo,
|
||||
&pInfo->fillNullExprSupp.numOfExprs);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
code = initExprSupp(&pInfo->fillNullExprSupp, pInfo->fillNullExprSupp.pExprInfo, pInfo->fillNullExprSupp.numOfExprs,
|
||||
&pTaskInfo->storageAPI.functionStore);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
SInterval* pInterval =
|
||||
QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL == downstream->operatorType
|
||||
? &((SMergeAlignedIntervalAggOperatorInfo*)downstream->info)->intervalAggOperatorInfo->interval
|
||||
|
@ -482,7 +500,9 @@ int32_t createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFi
|
|||
code = extractColMatchInfo(pPhyFillNode->pFillExprs, pPhyFillNode->node.pOutputDataBlockDesc, &numOfOutputCols,
|
||||
COL_MATCH_FROM_SLOT_ID, &pInfo->matchInfo);
|
||||
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
code = initFillInfo(pInfo, pExprInfo, pInfo->numOfExpr, pNoFillSupp->pExprInfo, pNoFillSupp->numOfExprs,
|
||||
pInfo->fillNullExprSupp.pExprInfo, pInfo->fillNullExprSupp.numOfExprs,
|
||||
(SNodeListNode*)pPhyFillNode->pValues, pPhyFillNode->timeRange, pResultInfo->capacity,
|
||||
pTaskInfo->id.str, pInterval, type, order, pTaskInfo);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
|
|
@ -1201,7 +1201,7 @@ static SStreamFillSupporter* initStreamFillSup(SStreamFillPhysiNode* pPhyFillNod
|
|||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
pFillSup->pAllColInfo = createFillColInfo(pFillExprInfo, pFillSup->numOfFillCols, noFillExprInfo, numOfNotFillCols,
|
||||
(const SNodeListNode*)(pPhyFillNode->pValues));
|
||||
NULL, 0, (const SNodeListNode*)(pPhyFillNode->pValues));
|
||||
if (pFillSup->pAllColInfo == NULL) {
|
||||
code = terrno;
|
||||
lino = __LINE__;
|
||||
|
|
|
@ -39,6 +39,10 @@
|
|||
static int32_t doSetVal(SColumnInfoData* pDstColInfoData, int32_t rowIndex, const SGroupKeys* pKey);
|
||||
|
||||
static void setNotFillColumn(SFillInfo* pFillInfo, SColumnInfoData* pDstColInfo, int32_t rowIndex, int32_t colIdx) {
|
||||
SFillColInfo* pCol = &pFillInfo->pFillCol[colIdx];
|
||||
if (pCol->fillNull) {
|
||||
colDataSetNULL(pDstColInfo, rowIndex);
|
||||
} else {
|
||||
SRowVal* p = NULL;
|
||||
if (pFillInfo->type == TSDB_FILL_NEXT) {
|
||||
p = FILL_IS_ASC_FILL(pFillInfo) ? &pFillInfo->next : &pFillInfo->prev;
|
||||
|
@ -56,6 +60,7 @@ static void setNotFillColumn(SFillInfo* pFillInfo, SColumnInfoData* pDstColInfo,
|
|||
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
|
||||
T_LONG_JMP(pFillInfo->pTaskInfo->env, code);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static void setNullRow(SSDataBlock* pBlock, SFillInfo* pFillInfo, int32_t rowIndex) {
|
||||
|
@ -545,9 +550,10 @@ static int32_t taosNumOfRemainRows(SFillInfo* pFillInfo) {
|
|||
return pFillInfo->numOfRows - pFillInfo->index;
|
||||
}
|
||||
|
||||
int32_t taosCreateFillInfo(TSKEY skey, int32_t numOfFillCols, int32_t numOfNotFillCols, int32_t capacity,
|
||||
SInterval* pInterval, int32_t fillType, struct SFillColInfo* pCol, int32_t primaryTsSlotId,
|
||||
int32_t order, const char* id, SExecTaskInfo* pTaskInfo, SFillInfo** ppFillInfo) {
|
||||
int32_t taosCreateFillInfo(TSKEY skey, int32_t numOfFillCols, int32_t numOfNotFillCols, int32_t fillNullCols,
|
||||
int32_t capacity, SInterval* pInterval, int32_t fillType, struct SFillColInfo* pCol,
|
||||
int32_t primaryTsSlotId, int32_t order, const char* id, SExecTaskInfo* pTaskInfo,
|
||||
SFillInfo** ppFillInfo) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
if (fillType == TSDB_FILL_NONE) {
|
||||
|
@ -574,7 +580,7 @@ int32_t taosCreateFillInfo(TSKEY skey, int32_t numOfFillCols, int32_t numOfNotFi
|
|||
|
||||
pFillInfo->type = fillType;
|
||||
pFillInfo->pFillCol = pCol;
|
||||
pFillInfo->numOfCols = numOfFillCols + numOfNotFillCols;
|
||||
pFillInfo->numOfCols = numOfFillCols + numOfNotFillCols + fillNullCols;
|
||||
pFillInfo->alloc = capacity;
|
||||
pFillInfo->id = id;
|
||||
pFillInfo->interval = *pInterval;
|
||||
|
@ -761,10 +767,11 @@ _end:
|
|||
int64_t getFillInfoStart(struct SFillInfo* pFillInfo) { return pFillInfo->start; }
|
||||
|
||||
SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfFillExpr, SExprInfo* pNotFillExpr,
|
||||
int32_t numOfNoFillExpr, const struct SNodeListNode* pValNode) {
|
||||
int32_t numOfNoFillExpr, SExprInfo* pFillNullExpr, int32_t numOfFillNullExpr,
|
||||
const struct SNodeListNode* pValNode) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
SFillColInfo* pFillCol = taosMemoryCalloc(numOfFillExpr + numOfNoFillExpr, sizeof(SFillColInfo));
|
||||
SFillColInfo* pFillCol = taosMemoryCalloc(numOfFillExpr + numOfNoFillExpr + numOfFillNullExpr, sizeof(SFillColInfo));
|
||||
if (pFillCol == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
@ -797,6 +804,13 @@ SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfFillExpr, SExprIn
|
|||
pFillCol[i + numOfFillExpr].notFillCol = true;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < numOfFillNullExpr; ++i) {
|
||||
SExprInfo* pExprInfo = &pFillNullExpr[i];
|
||||
pFillCol[i + numOfFillExpr + numOfNoFillExpr].pExpr = pExprInfo;
|
||||
pFillCol[i + numOfFillExpr + numOfNoFillExpr].notFillCol = true;
|
||||
pFillCol[i + numOfFillExpr + numOfNoFillExpr].fillNull = true;
|
||||
}
|
||||
|
||||
return pFillCol;
|
||||
|
||||
_end:
|
||||
|
|
|
@ -1147,7 +1147,8 @@ int32_t createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyN
|
|||
pInfo->fillType = convertFillType(pInterpPhyNode->fillMode);
|
||||
initResultSizeInfo(&pOperator->resultInfo, 4096);
|
||||
|
||||
pInfo->pFillColInfo = createFillColInfo(pExprInfo, numOfExprs, NULL, 0, (SNodeListNode*)pInterpPhyNode->pFillValues);
|
||||
pInfo->pFillColInfo =
|
||||
createFillColInfo(pExprInfo, numOfExprs, NULL, 0, NULL, 0, (SNodeListNode*)pInterpPhyNode->pFillValues);
|
||||
QUERY_CHECK_NULL(pInfo->pFillColInfo, code, lino, _error, terrno);
|
||||
|
||||
pInfo->pLinearInfo = NULL;
|
||||
|
|
|
@ -633,6 +633,7 @@ static int32_t logicFillCopy(const SFillLogicNode* pSrc, SFillLogicNode* pDst) {
|
|||
CLONE_NODE_FIELD(pWStartTs);
|
||||
CLONE_NODE_FIELD(pValues);
|
||||
COPY_OBJECT_FIELD(timeRange, sizeof(STimeWindow));
|
||||
CLONE_NODE_LIST_FIELD(pFillNullExprs);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
|
|
@ -2843,6 +2843,7 @@ static const char* jkFillPhysiPlanWStartTs = "WStartTs";
|
|||
static const char* jkFillPhysiPlanValues = "Values";
|
||||
static const char* jkFillPhysiPlanStartTime = "StartTime";
|
||||
static const char* jkFillPhysiPlanEndTime = "EndTime";
|
||||
static const char* jkFillPhysiPlanFillNullExprs = "FillNullExprs";
|
||||
|
||||
static int32_t physiFillNodeToJson(const void* pObj, SJson* pJson) {
|
||||
const SFillPhysiNode* pNode = (const SFillPhysiNode*)pObj;
|
||||
|
@ -2869,6 +2870,9 @@ static int32_t physiFillNodeToJson(const void* pObj, SJson* pJson) {
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddIntegerToObject(pJson, jkFillPhysiPlanEndTime, pNode->timeRange.ekey);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = nodeListToJson(pJson, jkFillPhysiPlanFillNullExprs, pNode->pFillNullExprs);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
@ -2898,6 +2902,9 @@ static int32_t jsonToPhysiFillNode(const SJson* pJson, void* pObj) {
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonGetBigIntValue(pJson, jkFillPhysiPlanEndTime, &pNode->timeRange.ekey);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = jsonToNodeList(pJson, jkFillPhysiPlanFillNullExprs, &pNode->pFillNullExprs);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -3326,7 +3326,8 @@ enum {
|
|||
PHY_FILL_CODE_WSTART,
|
||||
PHY_FILL_CODE_VALUES,
|
||||
PHY_FILL_CODE_TIME_RANGE,
|
||||
PHY_FILL_CODE_INPUT_TS_ORDER
|
||||
PHY_FILL_CODE_INPUT_TS_ORDER,
|
||||
PHY_FILL_CODE_FILL_NULL_EXPRS,
|
||||
};
|
||||
|
||||
static int32_t physiFillNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
|
||||
|
@ -3351,6 +3352,9 @@ static int32_t physiFillNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tlvEncodeObj(pEncoder, PHY_FILL_CODE_TIME_RANGE, timeWindowToMsg, &pNode->timeRange);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tlvEncodeObj(pEncoder, PHY_FILL_CODE_FILL_NULL_EXPRS, nodeListToMsg, pNode->pFillNullExprs);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
@ -3383,6 +3387,9 @@ static int32_t msgToPhysiFillNode(STlvDecoder* pDecoder, void* pObj) {
|
|||
case PHY_FILL_CODE_TIME_RANGE:
|
||||
code = tlvDecodeObjFromTlv(pTlv, msgToTimeWindow, (void**)&pNode->timeRange);
|
||||
break;
|
||||
case PHY_FILL_CODE_FILL_NULL_EXPRS:
|
||||
code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pFillNullExprs);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -1469,6 +1469,7 @@ void nodesDestroyNode(SNode* pNode) {
|
|||
nodesDestroyNode(pLogicNode->pValues);
|
||||
nodesDestroyList(pLogicNode->pFillExprs);
|
||||
nodesDestroyList(pLogicNode->pNotFillExprs);
|
||||
nodesDestroyList(pLogicNode->pFillNullExprs);
|
||||
break;
|
||||
}
|
||||
case QUERY_NODE_LOGIC_PLAN_SORT: {
|
||||
|
@ -1634,6 +1635,7 @@ void nodesDestroyNode(SNode* pNode) {
|
|||
nodesDestroyList(pPhyNode->pNotFillExprs);
|
||||
nodesDestroyNode(pPhyNode->pWStartTs);
|
||||
nodesDestroyNode(pPhyNode->pValues);
|
||||
nodesDestroyList(pPhyNode->pFillNullExprs);
|
||||
break;
|
||||
}
|
||||
case QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION:
|
||||
|
|
|
@ -1196,117 +1196,21 @@ static int32_t createWindowLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSele
|
|||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
typedef struct SPartFillExprsCtx {
|
||||
typedef struct SCollectFillExprsCtx {
|
||||
SHashObj* pPseudoCols;
|
||||
SNodeList* pFillExprs;
|
||||
SNodeList* pNotFillExprs;
|
||||
bool collectAggFuncs;
|
||||
SNodeList* pAggFuncCols;
|
||||
} SCollectFillExprsCtx;
|
||||
|
||||
typedef struct SWalkFillSubExprCtx {
|
||||
bool hasFillCol;
|
||||
bool hasPseudoWinCol;
|
||||
bool hasGroupKeyCol;
|
||||
SHashObj* pPseudoCols;
|
||||
SCollectFillExprsCtx* pCollectFillCtx;
|
||||
int32_t code;
|
||||
} SPartFillExprsCtx;
|
||||
|
||||
static EDealRes needFillValueImpl(SNode* pNode, void* pContext) {
|
||||
SPartFillExprsCtx *pCtx = pContext;
|
||||
if (QUERY_NODE_COLUMN == nodeType(pNode)) {
|
||||
SColumnNode* pCol = (SColumnNode*)pNode;
|
||||
if (COLUMN_TYPE_WINDOW_START == pCol->colType || COLUMN_TYPE_WINDOW_END == pCol->colType ||
|
||||
COLUMN_TYPE_WINDOW_DURATION == pCol->colType) {
|
||||
pCtx->hasPseudoWinCol = true;
|
||||
pCtx->code = taosHashPut(pCtx->pPseudoCols, pCol->colName, TSDB_COL_NAME_LEN, &pNode, POINTER_BYTES);
|
||||
} else if (COLUMN_TYPE_GROUP_KEY == pCol->colType || COLUMN_TYPE_TBNAME == pCol->colType || COLUMN_TYPE_TAG == pCol->colType) {
|
||||
pCtx->hasGroupKeyCol = true;
|
||||
pCtx->code = taosHashPut(pCtx->pPseudoCols, pCol->colName, TSDB_COL_NAME_LEN, &pNode, POINTER_BYTES);
|
||||
} else {
|
||||
pCtx->hasFillCol = true;
|
||||
return DEAL_RES_END;
|
||||
}
|
||||
}
|
||||
return DEAL_RES_CONTINUE;
|
||||
}
|
||||
|
||||
static void needFillValue(SNode* pNode, SPartFillExprsCtx* pCtx) {
|
||||
nodesWalkExpr(pNode, needFillValueImpl, pCtx);
|
||||
}
|
||||
|
||||
typedef struct SCollectFillExprsCtx {
|
||||
SHashObj* pPseudoCols;
|
||||
int32_t code;
|
||||
SNodeList* pFillExprs;
|
||||
SNodeList* pNotFillExprs;
|
||||
bool skipFillCols;
|
||||
} SCollectFillExprsCtx;
|
||||
|
||||
static EDealRes collectFillExpr(SNode* pNode, void* pContext) {
|
||||
SCollectFillExprsCtx* pCollectFillCtx = pContext;
|
||||
SPartFillExprsCtx partFillCtx = {0};
|
||||
SNode* pNew = NULL;
|
||||
partFillCtx.pPseudoCols = pCollectFillCtx->pPseudoCols;
|
||||
needFillValue(pNode, &partFillCtx);
|
||||
if (partFillCtx.code != TSDB_CODE_SUCCESS) {
|
||||
pCollectFillCtx->code = partFillCtx.code;
|
||||
return DEAL_RES_ERROR;
|
||||
}
|
||||
|
||||
if (partFillCtx.hasFillCol && !pCollectFillCtx->skipFillCols) {
|
||||
if (nodeType(pNode) == QUERY_NODE_ORDER_BY_EXPR) {
|
||||
pCollectFillCtx->code = nodesCloneNode(((SOrderByExprNode*)pNode)->pExpr, &pNew);
|
||||
} else {
|
||||
pCollectFillCtx->code = nodesCloneNode(pNode, &pNew);
|
||||
}
|
||||
if (pCollectFillCtx->code == TSDB_CODE_SUCCESS) {
|
||||
pCollectFillCtx->code = nodesListMakeStrictAppend(&pCollectFillCtx->pFillExprs, pNew);
|
||||
}
|
||||
if (pCollectFillCtx->code != TSDB_CODE_SUCCESS) return DEAL_RES_ERROR;
|
||||
return DEAL_RES_IGNORE_CHILD;
|
||||
}
|
||||
return DEAL_RES_CONTINUE;
|
||||
}
|
||||
|
||||
static int32_t collectFillExprs(SSelectStmt* pSelect, SNodeList** pFillExprs, SNodeList** pNotFillExprs) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SCollectFillExprsCtx collectFillCtx = {0};
|
||||
collectFillCtx.pPseudoCols = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
||||
if (!collectFillCtx.pPseudoCols) return terrno;
|
||||
|
||||
if (collectFillCtx.code == TSDB_CODE_SUCCESS) {
|
||||
nodesWalkExprs(pSelect->pProjectionList, collectFillExpr, &collectFillCtx);
|
||||
}
|
||||
if (collectFillCtx.code == TSDB_CODE_SUCCESS) {
|
||||
collectFillCtx.skipFillCols = true;
|
||||
nodesWalkExpr(pSelect->pHaving, collectFillExpr, &collectFillCtx);
|
||||
}
|
||||
if (collectFillCtx.code == TSDB_CODE_SUCCESS) {
|
||||
nodesWalkExprs(pSelect->pGroupByList, collectFillExpr, &collectFillCtx);
|
||||
}
|
||||
if (collectFillCtx.code == TSDB_CODE_SUCCESS) {
|
||||
nodesWalkExprs(pSelect->pOrderByList, collectFillExpr, &collectFillCtx);
|
||||
}
|
||||
if (collectFillCtx.code == TSDB_CODE_SUCCESS) {
|
||||
void* pIter = taosHashIterate(collectFillCtx.pPseudoCols, 0);
|
||||
while (pIter) {
|
||||
SNode* pNode = *(SNode**)pIter, *pNew = NULL;
|
||||
collectFillCtx.code = nodesCloneNode(pNode, &pNew);
|
||||
if (collectFillCtx.code == TSDB_CODE_SUCCESS) {
|
||||
collectFillCtx.code = nodesListMakeStrictAppend(&collectFillCtx.pNotFillExprs, pNew);
|
||||
}
|
||||
if (collectFillCtx.code == TSDB_CODE_SUCCESS) {
|
||||
pIter = taosHashIterate(collectFillCtx.pPseudoCols, pIter);
|
||||
} else {
|
||||
taosHashCancelIterate(collectFillCtx.pPseudoCols, pIter);
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (collectFillCtx.code == TSDB_CODE_SUCCESS) {
|
||||
TSWAP(*pFillExprs, collectFillCtx.pFillExprs);
|
||||
TSWAP(*pNotFillExprs, collectFillCtx.pNotFillExprs);
|
||||
}
|
||||
}
|
||||
if (collectFillCtx.code != TSDB_CODE_SUCCESS) {
|
||||
if (collectFillCtx.pFillExprs) nodesDestroyList(collectFillCtx.pFillExprs);
|
||||
if (collectFillCtx.pNotFillExprs) nodesDestroyList(collectFillCtx.pNotFillExprs);
|
||||
}
|
||||
taosHashCleanup(collectFillCtx.pPseudoCols);
|
||||
return code;
|
||||
}
|
||||
} SWalkFillSubExprCtx;
|
||||
|
||||
static bool nodeAlreadyContained(SNodeList* pList, SNode* pNode) {
|
||||
SNode* pExpr = NULL;
|
||||
|
@ -1318,6 +1222,116 @@ static bool nodeAlreadyContained(SNodeList* pList, SNode* pNode) {
|
|||
return false;
|
||||
}
|
||||
|
||||
static EDealRes needFillValueImpl(SNode* pNode, void* pContext) {
|
||||
SWalkFillSubExprCtx *pCtx = pContext;
|
||||
EDealRes res = DEAL_RES_CONTINUE;
|
||||
if (QUERY_NODE_COLUMN == nodeType(pNode)) {
|
||||
SColumnNode* pCol = (SColumnNode*)pNode;
|
||||
if (COLUMN_TYPE_WINDOW_START == pCol->colType || COLUMN_TYPE_WINDOW_END == pCol->colType ||
|
||||
COLUMN_TYPE_WINDOW_DURATION == pCol->colType) {
|
||||
pCtx->hasPseudoWinCol = true;
|
||||
pCtx->code =
|
||||
taosHashPut(pCtx->pCollectFillCtx->pPseudoCols, pCol->colName, TSDB_COL_NAME_LEN, &pNode, POINTER_BYTES);
|
||||
} else if (COLUMN_TYPE_GROUP_KEY == pCol->colType || COLUMN_TYPE_TBNAME == pCol->colType ||
|
||||
COLUMN_TYPE_TAG == pCol->colType) {
|
||||
pCtx->hasGroupKeyCol = true;
|
||||
pCtx->code =
|
||||
taosHashPut(pCtx->pCollectFillCtx->pPseudoCols, pCol->colName, TSDB_COL_NAME_LEN, &pNode, POINTER_BYTES);
|
||||
} else {
|
||||
pCtx->hasFillCol = true;
|
||||
if (pCtx->pCollectFillCtx->collectAggFuncs) {
|
||||
// Agg funcs has already been rewriten to columns by Interval
|
||||
// Here, we return DEAL_RES_CONTINUE cause we need to collect all agg funcs
|
||||
if (!nodeAlreadyContained(pCtx->pCollectFillCtx->pFillExprs, pNode) &&
|
||||
!nodeAlreadyContained(pCtx->pCollectFillCtx->pAggFuncCols, pNode))
|
||||
pCtx->code = nodesListMakeStrictAppend(&pCtx->pCollectFillCtx->pAggFuncCols, pNode);
|
||||
} else {
|
||||
res = DEAL_RES_END;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (pCtx->code != TSDB_CODE_SUCCESS) res = DEAL_RES_ERROR;
|
||||
return res;
|
||||
}
|
||||
|
||||
static void needFillValue(SNode* pNode, SWalkFillSubExprCtx* pCtx) {
|
||||
nodesWalkExpr(pNode, needFillValueImpl, pCtx);
|
||||
}
|
||||
|
||||
static int32_t collectFillExpr(SNode* pNode, SCollectFillExprsCtx* pCollectFillCtx) {
|
||||
SNode* pNew = NULL;
|
||||
SWalkFillSubExprCtx collectFillSubExprCtx = {
|
||||
.hasFillCol = false, .hasPseudoWinCol = false, .hasGroupKeyCol = false, .pCollectFillCtx = pCollectFillCtx};
|
||||
needFillValue(pNode, &collectFillSubExprCtx);
|
||||
if (collectFillSubExprCtx.code != TSDB_CODE_SUCCESS) {
|
||||
return collectFillSubExprCtx.code;
|
||||
}
|
||||
|
||||
if (collectFillSubExprCtx.hasFillCol && !pCollectFillCtx->collectAggFuncs) {
|
||||
if (nodeType(pNode) == QUERY_NODE_ORDER_BY_EXPR) {
|
||||
collectFillSubExprCtx.code = nodesCloneNode(((SOrderByExprNode*)pNode)->pExpr, &pNew);
|
||||
} else {
|
||||
collectFillSubExprCtx.code = nodesCloneNode(pNode, &pNew);
|
||||
}
|
||||
if (collectFillSubExprCtx.code == TSDB_CODE_SUCCESS) {
|
||||
collectFillSubExprCtx.code = nodesListMakeStrictAppend(&pCollectFillCtx->pFillExprs, pNew);
|
||||
}
|
||||
}
|
||||
return collectFillSubExprCtx.code;
|
||||
}
|
||||
|
||||
static int32_t collectFillExprs(SSelectStmt* pSelect, SNodeList** pFillExprs, SNodeList** pNotFillExprs,
|
||||
SNodeList** pPossibleFillNullCols) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SCollectFillExprsCtx collectFillCtx = {0};
|
||||
SNode* pNode = NULL;
|
||||
collectFillCtx.pPseudoCols = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
||||
if (!collectFillCtx.pPseudoCols) return terrno;
|
||||
|
||||
FOREACH(pNode, pSelect->pProjectionList) {
|
||||
code = collectFillExpr(pNode, &collectFillCtx);
|
||||
if (code != TSDB_CODE_SUCCESS) break;
|
||||
}
|
||||
collectFillCtx.collectAggFuncs = true;
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
code = collectFillExpr(pSelect->pHaving, &collectFillCtx);
|
||||
}
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
FOREACH(pNode, pSelect->pOrderByList) {
|
||||
code = collectFillExpr(pNode, &collectFillCtx);
|
||||
if (code != TSDB_CODE_SUCCESS) break;
|
||||
}
|
||||
}
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
void* pIter = taosHashIterate(collectFillCtx.pPseudoCols, 0);
|
||||
while (pIter) {
|
||||
SNode* pNode = *(SNode**)pIter, *pNew = NULL;
|
||||
code = nodesCloneNode(pNode, &pNew);
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
code = nodesListMakeStrictAppend(&collectFillCtx.pNotFillExprs, pNew);
|
||||
}
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
pIter = taosHashIterate(collectFillCtx.pPseudoCols, pIter);
|
||||
} else {
|
||||
taosHashCancelIterate(collectFillCtx.pPseudoCols, pIter);
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
TSWAP(*pFillExprs, collectFillCtx.pFillExprs);
|
||||
TSWAP(*pNotFillExprs, collectFillCtx.pNotFillExprs);
|
||||
TSWAP(*pPossibleFillNullCols, collectFillCtx.pAggFuncCols);
|
||||
}
|
||||
}
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
if (collectFillCtx.pFillExprs) nodesDestroyList(collectFillCtx.pFillExprs);
|
||||
if (collectFillCtx.pNotFillExprs) nodesDestroyList(collectFillCtx.pNotFillExprs);
|
||||
if (collectFillCtx.pAggFuncCols) nodesDestroyList(collectFillCtx.pAggFuncCols);
|
||||
}
|
||||
taosHashCleanup(collectFillCtx.pPseudoCols);
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t createFillLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SLogicNode** pLogicNode) {
|
||||
if (NULL == pSelect->pWindow || QUERY_NODE_INTERVAL_WINDOW != nodeType(pSelect->pWindow) ||
|
||||
NULL == ((SIntervalWindowNode*)pSelect->pWindow)->pFill) {
|
||||
|
@ -1340,33 +1354,15 @@ static int32_t createFillLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
|
|||
pFill->node.resultDataOrder = pFill->node.requireDataOrder;
|
||||
pFill->node.inputTsOrder = TSDB_ORDER_ASC;
|
||||
|
||||
code = collectFillExprs(pSelect, &pFill->pFillExprs, &pFill->pNotFillExprs);
|
||||
code = collectFillExprs(pSelect, &pFill->pFillExprs, &pFill->pNotFillExprs, &pFill->pFillNullExprs);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = rewriteExprsForSelect(pFill->pFillExprs, pSelect, SQL_CLAUSE_FILL, NULL);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = rewriteExprsForSelect(pFill->pNotFillExprs, pSelect, SQL_CLAUSE_FILL, NULL);
|
||||
}
|
||||
SNodeList* pWindowTargets = NULL;
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
SNode* pNode = NULL, *pNodeNew = NULL;
|
||||
FOREACH(pNode, pCxt->pCurrRoot->pTargets) {
|
||||
if (nodesEqualNode(pNode, pFillNode->pWStartTs)) continue;
|
||||
if (nodeAlreadyContained(pFill->pFillExprs, pNode)) continue;
|
||||
if (nodeAlreadyContained(pFill->pNotFillExprs, pNode)) continue;
|
||||
pNodeNew = NULL;
|
||||
code = nodesCloneNode(pNode, &pNodeNew);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = nodesListMakeStrictAppend(&pWindowTargets, pNodeNew);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
nodesDestroyList(pWindowTargets);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code && LIST_LENGTH(pWindowTargets) > 0) {
|
||||
code = nodesListMakeStrictAppendList(&pFill->pFillExprs, pWindowTargets);
|
||||
if (TSDB_CODE_SUCCESS == code && LIST_LENGTH(pFill->pFillNullExprs) > 0) {
|
||||
code = createColumnByRewriteExprs(pFill->pFillNullExprs, &pFill->node.pTargets);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = createColumnByRewriteExprs(pFill->pFillExprs, &pFill->node.pTargets);
|
||||
|
|
|
@ -2512,6 +2512,12 @@ static int32_t createFillPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = addDataBlockSlots(pCxt, pFill->pNotFillExprs, pFill->node.pOutputDataBlockDesc);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code && LIST_LENGTH(pFillNode->pFillNullExprs) > 0) {
|
||||
code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pFillNode->pFillNullExprs, &pFill->pFillNullExprs);
|
||||
if (TSDB_CODE_SUCCESS == code ) {
|
||||
code = addDataBlockSlots(pCxt, pFill->pFillNullExprs, pFill->node.pOutputDataBlockDesc);
|
||||
}
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = setNodeSlotId(pCxt, pChildTupe->dataBlockId, -1, pFillNode->pWStartTs, &pFill->pWStartTs);
|
||||
|
|
|
@ -295,10 +295,14 @@ class TDTestCase:
|
|||
tdSql.query(sql, queryTimes=1)
|
||||
tdSql.checkRows(48)
|
||||
|
||||
sql = "SELECT count(*) FROM meters WHERE ts >= '2018-09-20 00:00:00.000' AND ts < '2018-09-20 01:00:00.000' PARTITION BY tbname, t1 INTERVAL(5m) FILL(NULL) having(timediff(last(ts), _wstart) >= 0)"
|
||||
sql = "SELECT count(*) FROM meters WHERE ts >= '2018-09-20 00:00:00.000' AND ts < '2018-09-20 01:00:00.000' PARTITION BY tbname, t1 INTERVAL(5m) FILL(NULL) HAVING(timediff(last(ts), _wstart) >= 0)"
|
||||
tdSql.query(sql, queryTimes=1)
|
||||
tdSql.checkRows(60)
|
||||
|
||||
sql = "SELECT count(*) + 1 FROM meters WHERE ts >= '2018-09-20 00:00:00.000' AND ts < '2018-09-20 01:00:00.000' PARTITION BY tbname, t1 INTERVAL(5m) FILL(NULL) HAVING(count(*) > 1)"
|
||||
tdSql.query(sql, queryTimes=1)
|
||||
tdSql.checkRows(0)
|
||||
|
||||
def run(self):
|
||||
self.prepareTestEnv()
|
||||
self.test_partition_by_with_interval_fill_prev_new_group_fill_error()
|
||||
|
|
Loading…
Reference in New Issue