diff --git a/docs/en/14-reference/03-taos-sql/12-distinguished.md b/docs/en/14-reference/03-taos-sql/12-distinguished.md index bfc9ca32c0..2374b762d4 100644 --- a/docs/en/14-reference/03-taos-sql/12-distinguished.md +++ b/docs/en/14-reference/03-taos-sql/12-distinguished.md @@ -80,7 +80,7 @@ These pseudocolumns occur after the aggregation clause. `FILL` clause is used to specify how to fill when there is data missing in any window, including: 1. NONE: No fill (the default fill mode) -2. VALUE: Fill with a fixed value, which should be specified together, for example `FILL(VALUE, 1.23)` Note: The value filled depends on the data type. For example, if you run FILL(VALUE 1.23) on an integer column, the value 1 is filled. If multiple columns in select list need to be filled, then in the fill clause there must be a fill value for each of these columns, for example, `SELECT _wstart, min(c1), max(c1) FROM ... FILL(VALUE, 0, 0)`. +2. VALUE: Fill with a fixed value, which should be specified together, for example `FILL(VALUE, 1.23)` Note: The value filled depends on the data type. For example, if you run FILL(VALUE 1.23) on an integer column, the value 1 is filled. If multiple columns in select list need to be filled, then in the fill clause there must be a fill value for each of these columns, for example, `SELECT _wstart, min(c1), max(c1) FROM ... FILL(VALUE, 0, 0)`. Note that only exprs in select list that contains normal cols need to specify fill value, exprs like `_wstart`, `_wend`, `_wduration`, `_wstart + 1a`, `now`, `1+1`, partition keys like tbname(when using partition by) don't need to specify fill value. But exprs like `timediff(last(ts), _wstart)` need to specify fill value. 3. PREV: Fill with the previous non-NULL value, `FILL(PREV)` 4. NULL: Fill with NULL, `FILL(NULL)` 5. LINEAR: Fill with the closest non-NULL value, `FILL(LINEAR)` diff --git a/docs/zh/14-reference/03-taos-sql/12-distinguished.md b/docs/zh/14-reference/03-taos-sql/12-distinguished.md index e149c2c82e..d7696b1859 100644 --- a/docs/zh/14-reference/03-taos-sql/12-distinguished.md +++ b/docs/zh/14-reference/03-taos-sql/12-distinguished.md @@ -76,7 +76,7 @@ window_clause: { FILL 语句指定某一窗口区间数据缺失的情况下的填充模式。填充模式包括以下几种: 1. 不进行填充:NONE(默认填充模式)。 -2. VALUE 填充:固定值填充,此时需要指定填充的数值。例如:FILL(VALUE, 1.23)。这里需要注意,最终填充的值受由相应列的类型决定,如 FILL(VALUE, 1.23),相应列为 INT 类型,则填充值为 1, 若查询列表中有多列需要FILL, 则需要给每一个FILL列指定VALUE, 如`SELECT _wstart, min(c1), max(c1) FROM ... FILL(VALUE, 0, 0)`。 +2. VALUE 填充:固定值填充,此时需要指定填充的数值。例如:FILL(VALUE, 1.23)。这里需要注意,最终填充的值受由相应列的类型决定,如 FILL(VALUE, 1.23),相应列为 INT 类型,则填充值为 1, 若查询列表中有多列需要 FILL, 则需要给每一个 FILL 列指定 VALUE, 如 `SELECT _wstart, min(c1), max(c1) FROM ... FILL(VALUE, 0, 0)`, 注意, SELECT 表达式中只有包含普通列时才需要指定 FILL VALUE, 如 `_wstart`, `_wstart+1a`, `now`, `1+1` 以及使用 partition by 时的 partition key (如 tbname)都不需要指定 VALUE, 如 `timediff(last(ts), _wstart)` 则需要指定VALUE。 3. PREV 填充:使用前一个非 NULL 值填充数据。例如:FILL(PREV)。 4. NULL 填充:使用 NULL 填充数据。例如:FILL(NULL)。 5. LINEAR 填充:根据前后距离最近的非 NULL 值做线性插值填充。例如:FILL(LINEAR)。 diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 8e4a3ea32b..a05211ece3 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -333,6 +333,7 @@ typedef struct SFillLogicNode { SNode* pWStartTs; SNode* pValues; // SNodeListNode STimeWindow timeRange; + SNodeList* pFillNullExprs; } SFillLogicNode; typedef struct SSortLogicNode { @@ -677,6 +678,7 @@ typedef struct SFillPhysiNode { SNode* pWStartTs; // SColumnNode SNode* pValues; // SNodeListNode STimeWindow timeRange; + SNodeList* pFillNullExprs; } SFillPhysiNode; typedef SFillPhysiNode SStreamFillPhysiNode; diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 488b94eb20..5ab5500fa6 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -56,7 +56,7 @@ int32_t tsShellActivityTimer = 3; // second // queue & threads int32_t tsNumOfRpcThreads = 1; int32_t tsNumOfRpcSessions = 30000; -int32_t tsShareConnLimit = 8; +int32_t tsShareConnLimit = 10; int32_t tsReadTimeout = 900; int32_t tsTimeToGetAvailableConn = 500000; int32_t tsKeepAliveIdle = 60; diff --git a/source/libs/executor/inc/tfill.h b/source/libs/executor/inc/tfill.h index b06aa7d1c8..31ac5689f6 100644 --- a/source/libs/executor/inc/tfill.h +++ b/source/libs/executor/inc/tfill.h @@ -35,6 +35,7 @@ typedef struct SFillColInfo { SExprInfo* pExpr; bool notFillCol; // denote if this column needs fill operation SVariant fillVal; + bool fillNull; } SFillColInfo; typedef struct SFillLinearInfo { @@ -125,12 +126,14 @@ void taosFillSetInputDataBlock(struct SFillInfo* pFillInfo, const struc void taosFillUpdateStartTimestampInfo(SFillInfo* pFillInfo, int64_t ts); bool taosFillNotStarted(const SFillInfo* pFillInfo); SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfFillExpr, SExprInfo* pNotFillExpr, - int32_t numOfNotFillCols, const struct SNodeListNode* val); + int32_t numOfNotFillCols, SExprInfo* pFillNullExpr, int32_t numOfFillNullExprs, + const struct SNodeListNode* val); bool taosFillHasMoreResults(struct SFillInfo* pFillInfo); -int32_t taosCreateFillInfo(TSKEY skey, int32_t numOfFillCols, int32_t numOfNotFillCols, int32_t capacity, - SInterval* pInterval, int32_t fillType, struct SFillColInfo* pCol, int32_t slotId, - int32_t order, const char* id, SExecTaskInfo* pTaskInfo, SFillInfo** ppFillInfo); +int32_t taosCreateFillInfo(TSKEY skey, int32_t numOfFillCols, int32_t numOfNotFillCols, int32_t fillNullCols, + int32_t capacity, SInterval* pInterval, int32_t fillType, struct SFillColInfo* pCol, + int32_t slotId, int32_t order, const char* id, SExecTaskInfo* pTaskInfo, + SFillInfo** ppFillInfo); void* taosDestroyFillInfo(struct SFillInfo* pFillInfo); int32_t taosFillResultDataBlock(struct SFillInfo* pFillInfo, SSDataBlock* p, int32_t capacity); diff --git a/source/libs/executor/src/filloperator.c b/source/libs/executor/src/filloperator.c index d530382f7c..1595c90419 100644 --- a/source/libs/executor/src/filloperator.c +++ b/source/libs/executor/src/filloperator.c @@ -53,6 +53,7 @@ typedef struct SFillOperatorInfo { SExprInfo* pExprInfo; int32_t numOfExpr; SExprSupp noFillExprSupp; + SExprSupp fillNullExprSupp; } SFillOperatorInfo; static void destroyFillOperatorInfo(void* param); @@ -140,6 +141,15 @@ void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int code = projectApplyFunctions(pNoFillSupp->pExprInfo, pInfo->pRes, pBlock, pNoFillSupp->pCtx, pNoFillSupp->numOfExprs, NULL); QUERY_CHECK_CODE(code, lino, _end); + + if (pInfo->fillNullExprSupp.pExprInfo) { + pInfo->pRes->info.rows = 0; + code = setInputDataBlock(&pInfo->fillNullExprSupp, pBlock, order, scanFlag, false); + QUERY_CHECK_CODE(code, lino, _end); + code = projectApplyFunctions(pInfo->fillNullExprSupp.pExprInfo, pInfo->pRes, pBlock, pInfo->fillNullExprSupp.pCtx, + pInfo->fillNullExprSupp.numOfExprs, NULL); + } + pInfo->pRes->info.id.groupId = pBlock->info.id.groupId; _end: @@ -327,6 +337,7 @@ void destroyFillOperatorInfo(void* param) { pInfo->pFinalRes = NULL; cleanupExprSupp(&pInfo->noFillExprSupp); + cleanupExprSupp(&pInfo->fillNullExprSupp); taosMemoryFreeClear(pInfo->p); taosArrayDestroy(pInfo->matchInfo.pList); @@ -334,10 +345,11 @@ void destroyFillOperatorInfo(void* param) { } static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t numOfCols, SExprInfo* pNotFillExpr, - int32_t numOfNotFillCols, SNodeListNode* pValNode, STimeWindow win, int32_t capacity, - const char* id, SInterval* pInterval, int32_t fillType, int32_t order, - SExecTaskInfo* pTaskInfo) { - SFillColInfo* pColInfo = createFillColInfo(pExpr, numOfCols, pNotFillExpr, numOfNotFillCols, pValNode); + int32_t numOfNotFillCols, SExprInfo* pFillNullExpr, int32_t numOfFillNullExprs, + SNodeListNode* pValNode, STimeWindow win, int32_t capacity, const char* id, + SInterval* pInterval, int32_t fillType, int32_t order, SExecTaskInfo* pTaskInfo) { + SFillColInfo* pColInfo = + createFillColInfo(pExpr, numOfCols, pNotFillExpr, numOfNotFillCols, pFillNullExpr, numOfFillNullExprs, pValNode); if (!pColInfo) { qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno)); return terrno; @@ -348,8 +360,8 @@ static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t // STimeWindow w = {0}; // getInitialStartTimeWindow(pInterval, startKey, &w, order == TSDB_ORDER_ASC); pInfo->pFillInfo = NULL; - int32_t code = taosCreateFillInfo(startKey, numOfCols, numOfNotFillCols, capacity, pInterval, fillType, pColInfo, - pInfo->primaryTsCol, order, id, pTaskInfo, &pInfo->pFillInfo); + int32_t code = taosCreateFillInfo(startKey, numOfCols, numOfNotFillCols, numOfFillNullExprs, capacity, pInterval, + fillType, pColInfo, pInfo->primaryTsCol, order, id, pTaskInfo, &pInfo->pFillInfo); if (code != TSDB_CODE_SUCCESS) { qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); return code; @@ -455,6 +467,13 @@ int32_t createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFi initExprSupp(pNoFillSupp, pNoFillSupp->pExprInfo, pNoFillSupp->numOfExprs, &pTaskInfo->storageAPI.functionStore); QUERY_CHECK_CODE(code, lino, _error); + code = createExprInfo(pPhyFillNode->pFillNullExprs, NULL, &pInfo->fillNullExprSupp.pExprInfo, + &pInfo->fillNullExprSupp.numOfExprs); + QUERY_CHECK_CODE(code, lino, _error); + code = initExprSupp(&pInfo->fillNullExprSupp, pInfo->fillNullExprSupp.pExprInfo, pInfo->fillNullExprSupp.numOfExprs, + &pTaskInfo->storageAPI.functionStore); + QUERY_CHECK_CODE(code, lino, _error); + SInterval* pInterval = QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL == downstream->operatorType ? &((SMergeAlignedIntervalAggOperatorInfo*)downstream->info)->intervalAggOperatorInfo->interval @@ -482,7 +501,9 @@ int32_t createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFi code = extractColMatchInfo(pPhyFillNode->pFillExprs, pPhyFillNode->node.pOutputDataBlockDesc, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID, &pInfo->matchInfo); + QUERY_CHECK_CODE(code, lino, _error); code = initFillInfo(pInfo, pExprInfo, pInfo->numOfExpr, pNoFillSupp->pExprInfo, pNoFillSupp->numOfExprs, + pInfo->fillNullExprSupp.pExprInfo, pInfo->fillNullExprSupp.numOfExprs, (SNodeListNode*)pPhyFillNode->pValues, pPhyFillNode->timeRange, pResultInfo->capacity, pTaskInfo->id.str, pInterval, type, order, pTaskInfo); if (code != TSDB_CODE_SUCCESS) { diff --git a/source/libs/executor/src/streamfilloperator.c b/source/libs/executor/src/streamfilloperator.c index 826220581a..b7061fad97 100644 --- a/source/libs/executor/src/streamfilloperator.c +++ b/source/libs/executor/src/streamfilloperator.c @@ -1201,7 +1201,7 @@ static SStreamFillSupporter* initStreamFillSup(SStreamFillPhysiNode* pPhyFillNod QUERY_CHECK_CODE(code, lino, _end); pFillSup->pAllColInfo = createFillColInfo(pFillExprInfo, pFillSup->numOfFillCols, noFillExprInfo, numOfNotFillCols, - (const SNodeListNode*)(pPhyFillNode->pValues)); + NULL, 0, (const SNodeListNode*)(pPhyFillNode->pValues)); if (pFillSup->pAllColInfo == NULL) { code = terrno; lino = __LINE__; diff --git a/source/libs/executor/src/tfill.c b/source/libs/executor/src/tfill.c index cdfbd7a850..190b327522 100644 --- a/source/libs/executor/src/tfill.c +++ b/source/libs/executor/src/tfill.c @@ -39,22 +39,27 @@ static int32_t doSetVal(SColumnInfoData* pDstColInfoData, int32_t rowIndex, const SGroupKeys* pKey); static void setNotFillColumn(SFillInfo* pFillInfo, SColumnInfoData* pDstColInfo, int32_t rowIndex, int32_t colIdx) { - SRowVal* p = NULL; - if (pFillInfo->type == TSDB_FILL_NEXT) { - p = FILL_IS_ASC_FILL(pFillInfo) ? &pFillInfo->next : &pFillInfo->prev; + SFillColInfo* pCol = &pFillInfo->pFillCol[colIdx]; + if (pCol->fillNull) { + colDataSetNULL(pDstColInfo, rowIndex); } else { - p = FILL_IS_ASC_FILL(pFillInfo) ? &pFillInfo->prev : &pFillInfo->next; - } + SRowVal* p = NULL; + if (pFillInfo->type == TSDB_FILL_NEXT) { + p = FILL_IS_ASC_FILL(pFillInfo) ? &pFillInfo->next : &pFillInfo->prev; + } else { + p = FILL_IS_ASC_FILL(pFillInfo) ? &pFillInfo->prev : &pFillInfo->next; + } - SGroupKeys* pKey = taosArrayGet(p->pRowVal, colIdx); - if (!pKey) { - qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno)); - T_LONG_JMP(pFillInfo->pTaskInfo->env, terrno); - } - int32_t code = doSetVal(pDstColInfo, rowIndex, pKey); - if (code != TSDB_CODE_SUCCESS) { - qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); - T_LONG_JMP(pFillInfo->pTaskInfo->env, code); + SGroupKeys* pKey = taosArrayGet(p->pRowVal, colIdx); + if (!pKey) { + qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno)); + T_LONG_JMP(pFillInfo->pTaskInfo->env, terrno); + } + int32_t code = doSetVal(pDstColInfo, rowIndex, pKey); + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); + T_LONG_JMP(pFillInfo->pTaskInfo->env, code); + } } } @@ -545,9 +550,10 @@ static int32_t taosNumOfRemainRows(SFillInfo* pFillInfo) { return pFillInfo->numOfRows - pFillInfo->index; } -int32_t taosCreateFillInfo(TSKEY skey, int32_t numOfFillCols, int32_t numOfNotFillCols, int32_t capacity, - SInterval* pInterval, int32_t fillType, struct SFillColInfo* pCol, int32_t primaryTsSlotId, - int32_t order, const char* id, SExecTaskInfo* pTaskInfo, SFillInfo** ppFillInfo) { +int32_t taosCreateFillInfo(TSKEY skey, int32_t numOfFillCols, int32_t numOfNotFillCols, int32_t fillNullCols, + int32_t capacity, SInterval* pInterval, int32_t fillType, struct SFillColInfo* pCol, + int32_t primaryTsSlotId, int32_t order, const char* id, SExecTaskInfo* pTaskInfo, + SFillInfo** ppFillInfo) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; if (fillType == TSDB_FILL_NONE) { @@ -574,7 +580,7 @@ int32_t taosCreateFillInfo(TSKEY skey, int32_t numOfFillCols, int32_t numOfNotFi pFillInfo->type = fillType; pFillInfo->pFillCol = pCol; - pFillInfo->numOfCols = numOfFillCols + numOfNotFillCols; + pFillInfo->numOfCols = numOfFillCols + numOfNotFillCols + fillNullCols; pFillInfo->alloc = capacity; pFillInfo->id = id; pFillInfo->interval = *pInterval; @@ -761,10 +767,11 @@ _end: int64_t getFillInfoStart(struct SFillInfo* pFillInfo) { return pFillInfo->start; } SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfFillExpr, SExprInfo* pNotFillExpr, - int32_t numOfNoFillExpr, const struct SNodeListNode* pValNode) { + int32_t numOfNoFillExpr, SExprInfo* pFillNullExpr, int32_t numOfFillNullExpr, + const struct SNodeListNode* pValNode) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; - SFillColInfo* pFillCol = taosMemoryCalloc(numOfFillExpr + numOfNoFillExpr, sizeof(SFillColInfo)); + SFillColInfo* pFillCol = taosMemoryCalloc(numOfFillExpr + numOfNoFillExpr + numOfFillNullExpr, sizeof(SFillColInfo)); if (pFillCol == NULL) { return NULL; } @@ -797,6 +804,13 @@ SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfFillExpr, SExprIn pFillCol[i + numOfFillExpr].notFillCol = true; } + for (int32_t i = 0; i < numOfFillNullExpr; ++i) { + SExprInfo* pExprInfo = &pFillNullExpr[i]; + pFillCol[i + numOfFillExpr + numOfNoFillExpr].pExpr = pExprInfo; + pFillCol[i + numOfFillExpr + numOfNoFillExpr].notFillCol = true; + pFillCol[i + numOfFillExpr + numOfNoFillExpr].fillNull = true; + } + return pFillCol; _end: diff --git a/source/libs/executor/src/timesliceoperator.c b/source/libs/executor/src/timesliceoperator.c index 2ea300ace8..6803f40da4 100644 --- a/source/libs/executor/src/timesliceoperator.c +++ b/source/libs/executor/src/timesliceoperator.c @@ -1147,7 +1147,8 @@ int32_t createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyN pInfo->fillType = convertFillType(pInterpPhyNode->fillMode); initResultSizeInfo(&pOperator->resultInfo, 4096); - pInfo->pFillColInfo = createFillColInfo(pExprInfo, numOfExprs, NULL, 0, (SNodeListNode*)pInterpPhyNode->pFillValues); + pInfo->pFillColInfo = + createFillColInfo(pExprInfo, numOfExprs, NULL, 0, NULL, 0, (SNodeListNode*)pInterpPhyNode->pFillValues); QUERY_CHECK_NULL(pInfo->pFillColInfo, code, lino, _error, terrno); pInfo->pLinearInfo = NULL; diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index b77ffb8d2c..b2b06e6bea 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -642,6 +642,7 @@ static int32_t logicFillCopy(const SFillLogicNode* pSrc, SFillLogicNode* pDst) { CLONE_NODE_FIELD(pWStartTs); CLONE_NODE_FIELD(pValues); COPY_OBJECT_FIELD(timeRange, sizeof(STimeWindow)); + CLONE_NODE_LIST_FIELD(pFillNullExprs); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 0f67493094..723a38c2bd 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -2887,6 +2887,7 @@ static const char* jkFillPhysiPlanWStartTs = "WStartTs"; static const char* jkFillPhysiPlanValues = "Values"; static const char* jkFillPhysiPlanStartTime = "StartTime"; static const char* jkFillPhysiPlanEndTime = "EndTime"; +static const char* jkFillPhysiPlanFillNullExprs = "FillNullExprs"; static int32_t physiFillNodeToJson(const void* pObj, SJson* pJson) { const SFillPhysiNode* pNode = (const SFillPhysiNode*)pObj; @@ -2913,6 +2914,9 @@ static int32_t physiFillNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddIntegerToObject(pJson, jkFillPhysiPlanEndTime, pNode->timeRange.ekey); } + if (TSDB_CODE_SUCCESS == code) { + code = nodeListToJson(pJson, jkFillPhysiPlanFillNullExprs, pNode->pFillNullExprs); + } return code; } @@ -2942,6 +2946,9 @@ static int32_t jsonToPhysiFillNode(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { code = tjsonGetBigIntValue(pJson, jkFillPhysiPlanEndTime, &pNode->timeRange.ekey); } + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeList(pJson, jkFillPhysiPlanFillNullExprs, &pNode->pFillNullExprs); + } return code; } diff --git a/source/libs/nodes/src/nodesMsgFuncs.c b/source/libs/nodes/src/nodesMsgFuncs.c index 3d8a57363b..7e3e0e0806 100644 --- a/source/libs/nodes/src/nodesMsgFuncs.c +++ b/source/libs/nodes/src/nodesMsgFuncs.c @@ -3326,7 +3326,8 @@ enum { PHY_FILL_CODE_WSTART, PHY_FILL_CODE_VALUES, PHY_FILL_CODE_TIME_RANGE, - PHY_FILL_CODE_INPUT_TS_ORDER + PHY_FILL_CODE_INPUT_TS_ORDER, + PHY_FILL_CODE_FILL_NULL_EXPRS, }; static int32_t physiFillNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { @@ -3351,6 +3352,9 @@ static int32_t physiFillNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { if (TSDB_CODE_SUCCESS == code) { code = tlvEncodeObj(pEncoder, PHY_FILL_CODE_TIME_RANGE, timeWindowToMsg, &pNode->timeRange); } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, PHY_FILL_CODE_FILL_NULL_EXPRS, nodeListToMsg, pNode->pFillNullExprs); + } return code; } @@ -3383,6 +3387,9 @@ static int32_t msgToPhysiFillNode(STlvDecoder* pDecoder, void* pObj) { case PHY_FILL_CODE_TIME_RANGE: code = tlvDecodeObjFromTlv(pTlv, msgToTimeWindow, (void**)&pNode->timeRange); break; + case PHY_FILL_CODE_FILL_NULL_EXPRS: + code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pFillNullExprs); + break; default: break; } diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index 31568149c9..6830dd01b6 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -1495,6 +1495,7 @@ void nodesDestroyNode(SNode* pNode) { nodesDestroyNode(pLogicNode->pValues); nodesDestroyList(pLogicNode->pFillExprs); nodesDestroyList(pLogicNode->pNotFillExprs); + nodesDestroyList(pLogicNode->pFillNullExprs); break; } case QUERY_NODE_LOGIC_PLAN_SORT: { @@ -1666,6 +1667,7 @@ void nodesDestroyNode(SNode* pNode) { nodesDestroyList(pPhyNode->pNotFillExprs); nodesDestroyNode(pPhyNode->pWStartTs); nodesDestroyNode(pPhyNode->pValues); + nodesDestroyList(pPhyNode->pFillNullExprs); break; } case QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION: diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index 40cd415cb9..c579b66511 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -1279,71 +1279,139 @@ static int32_t createWindowLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSele return TSDB_CODE_FAILED; } +typedef struct SCollectFillExprsCtx { + SHashObj* pPseudoCols; + SNodeList* pFillExprs; + SNodeList* pNotFillExprs; + bool collectAggFuncs; + SNodeList* pAggFuncCols; +} SCollectFillExprsCtx; + +typedef struct SWalkFillSubExprCtx { + bool hasFillCol; + bool hasPseudoWinCol; + bool hasGroupKeyCol; + SCollectFillExprsCtx* pCollectFillCtx; + int32_t code; +} SWalkFillSubExprCtx; + +static bool nodeAlreadyContained(SNodeList* pList, SNode* pNode) { + SNode* pExpr = NULL; + FOREACH(pExpr, pList) { + if (nodesEqualNode(pExpr, pNode)) { + return true; + } + } + return false; +} + static EDealRes needFillValueImpl(SNode* pNode, void* pContext) { + SWalkFillSubExprCtx *pCtx = pContext; + EDealRes res = DEAL_RES_CONTINUE; if (QUERY_NODE_COLUMN == nodeType(pNode)) { SColumnNode* pCol = (SColumnNode*)pNode; - if (COLUMN_TYPE_WINDOW_START != pCol->colType && COLUMN_TYPE_WINDOW_END != pCol->colType && - COLUMN_TYPE_WINDOW_DURATION != pCol->colType && COLUMN_TYPE_GROUP_KEY != pCol->colType) { - *(bool*)pContext = true; - return DEAL_RES_END; + if (COLUMN_TYPE_WINDOW_START == pCol->colType || COLUMN_TYPE_WINDOW_END == pCol->colType || + COLUMN_TYPE_WINDOW_DURATION == pCol->colType) { + pCtx->hasPseudoWinCol = true; + pCtx->code = + taosHashPut(pCtx->pCollectFillCtx->pPseudoCols, pCol->colName, TSDB_COL_NAME_LEN, &pNode, POINTER_BYTES); + } else if (COLUMN_TYPE_GROUP_KEY == pCol->colType || COLUMN_TYPE_TBNAME == pCol->colType || + COLUMN_TYPE_TAG == pCol->colType) { + pCtx->hasGroupKeyCol = true; + pCtx->code = + taosHashPut(pCtx->pCollectFillCtx->pPseudoCols, pCol->colName, TSDB_COL_NAME_LEN, &pNode, POINTER_BYTES); + } else { + pCtx->hasFillCol = true; + if (pCtx->pCollectFillCtx->collectAggFuncs) { + // Agg funcs has already been rewriten to columns by Interval + // Here, we return DEAL_RES_CONTINUE cause we need to collect all agg funcs + if (!nodeAlreadyContained(pCtx->pCollectFillCtx->pFillExprs, pNode) && + !nodeAlreadyContained(pCtx->pCollectFillCtx->pAggFuncCols, pNode)) + pCtx->code = nodesListMakeStrictAppend(&pCtx->pCollectFillCtx->pAggFuncCols, pNode); + } else { + res = DEAL_RES_END; + } } } - return DEAL_RES_CONTINUE; + if (pCtx->code != TSDB_CODE_SUCCESS) res = DEAL_RES_ERROR; + return res; } -static bool needFillValue(SNode* pNode) { - bool hasFillCol = false; - nodesWalkExpr(pNode, needFillValueImpl, &hasFillCol); - return hasFillCol; +static void needFillValue(SNode* pNode, SWalkFillSubExprCtx* pCtx) { + nodesWalkExpr(pNode, needFillValueImpl, pCtx); } -static int32_t partFillExprs(SSelectStmt* pSelect, SNodeList** pFillExprs, SNodeList** pNotFillExprs) { - int32_t code = TSDB_CODE_SUCCESS; - SNode* pProject = NULL; - FOREACH(pProject, pSelect->pProjectionList) { - if (needFillValue(pProject)) { - SNode* pNew = NULL; - code = nodesCloneNode(pProject, &pNew); - if (TSDB_CODE_SUCCESS == code) { - code = nodesListMakeStrictAppend(pFillExprs, pNew); - } - } else if (QUERY_NODE_VALUE != nodeType(pProject)) { - SNode* pNew = NULL; - code = nodesCloneNode(pProject, &pNew); - if (TSDB_CODE_SUCCESS == code) { - code = nodesListMakeStrictAppend(pNotFillExprs, pNew); - } +static int32_t collectFillExpr(SNode* pNode, SCollectFillExprsCtx* pCollectFillCtx) { + SNode* pNew = NULL; + SWalkFillSubExprCtx collectFillSubExprCtx = { + .hasFillCol = false, .hasPseudoWinCol = false, .hasGroupKeyCol = false, .pCollectFillCtx = pCollectFillCtx}; + needFillValue(pNode, &collectFillSubExprCtx); + if (collectFillSubExprCtx.code != TSDB_CODE_SUCCESS) { + return collectFillSubExprCtx.code; + } + + if (collectFillSubExprCtx.hasFillCol && !pCollectFillCtx->collectAggFuncs) { + if (nodeType(pNode) == QUERY_NODE_ORDER_BY_EXPR) { + collectFillSubExprCtx.code = nodesCloneNode(((SOrderByExprNode*)pNode)->pExpr, &pNew); + } else { + collectFillSubExprCtx.code = nodesCloneNode(pNode, &pNew); } - if (TSDB_CODE_SUCCESS != code) { - NODES_DESTORY_LIST(*pFillExprs); - NODES_DESTORY_LIST(*pNotFillExprs); - break; + if (collectFillSubExprCtx.code == TSDB_CODE_SUCCESS) { + collectFillSubExprCtx.code = nodesListMakeStrictAppend(&pCollectFillCtx->pFillExprs, pNew); } } - if (!pSelect->isDistinct) { - SNode* pOrderExpr = NULL; - FOREACH(pOrderExpr, pSelect->pOrderByList) { - SNode* pExpr = ((SOrderByExprNode*)pOrderExpr)->pExpr; - if (needFillValue(pExpr)) { - SNode* pNew = NULL; - code = nodesCloneNode(pExpr, &pNew); - if (TSDB_CODE_SUCCESS == code) { - code = nodesListMakeStrictAppend(pFillExprs, pNew); - } - } else if (QUERY_NODE_VALUE != nodeType(pExpr)) { - SNode* pNew = NULL; - code = nodesCloneNode(pExpr, &pNew); - if (TSDB_CODE_SUCCESS == code) { - code = nodesListMakeStrictAppend(pNotFillExprs, pNew); - } + return collectFillSubExprCtx.code; +} + +static int32_t collectFillExprs(SSelectStmt* pSelect, SNodeList** pFillExprs, SNodeList** pNotFillExprs, + SNodeList** pPossibleFillNullCols) { + int32_t code = TSDB_CODE_SUCCESS; + SCollectFillExprsCtx collectFillCtx = {0}; + SNode* pNode = NULL; + collectFillCtx.pPseudoCols = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); + if (!collectFillCtx.pPseudoCols) return terrno; + + FOREACH(pNode, pSelect->pProjectionList) { + code = collectFillExpr(pNode, &collectFillCtx); + if (code != TSDB_CODE_SUCCESS) break; + } + collectFillCtx.collectAggFuncs = true; + if (code == TSDB_CODE_SUCCESS) { + code = collectFillExpr(pSelect->pHaving, &collectFillCtx); + } + if (code == TSDB_CODE_SUCCESS) { + FOREACH(pNode, pSelect->pOrderByList) { + code = collectFillExpr(pNode, &collectFillCtx); + if (code != TSDB_CODE_SUCCESS) break; + } + } + if (code == TSDB_CODE_SUCCESS) { + void* pIter = taosHashIterate(collectFillCtx.pPseudoCols, 0); + while (pIter) { + SNode* pNode = *(SNode**)pIter, *pNew = NULL; + code = nodesCloneNode(pNode, &pNew); + if (code == TSDB_CODE_SUCCESS) { + code = nodesListMakeStrictAppend(&collectFillCtx.pNotFillExprs, pNew); } - if (TSDB_CODE_SUCCESS != code) { - NODES_DESTORY_LIST(*pFillExprs); - NODES_DESTORY_LIST(*pNotFillExprs); + if (code == TSDB_CODE_SUCCESS) { + pIter = taosHashIterate(collectFillCtx.pPseudoCols, pIter); + } else { + taosHashCancelIterate(collectFillCtx.pPseudoCols, pIter); break; } } + if (code == TSDB_CODE_SUCCESS) { + TSWAP(*pFillExprs, collectFillCtx.pFillExprs); + TSWAP(*pNotFillExprs, collectFillCtx.pNotFillExprs); + TSWAP(*pPossibleFillNullCols, collectFillCtx.pAggFuncCols); + } } + if (code != TSDB_CODE_SUCCESS) { + if (collectFillCtx.pFillExprs) nodesDestroyList(collectFillCtx.pFillExprs); + if (collectFillCtx.pNotFillExprs) nodesDestroyList(collectFillCtx.pNotFillExprs); + if (collectFillCtx.pAggFuncCols) nodesDestroyList(collectFillCtx.pAggFuncCols); + } + taosHashCleanup(collectFillCtx.pPseudoCols); return code; } @@ -1369,13 +1437,16 @@ static int32_t createFillLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect pFill->node.resultDataOrder = pFill->node.requireDataOrder; pFill->node.inputTsOrder = TSDB_ORDER_ASC; - code = partFillExprs(pSelect, &pFill->pFillExprs, &pFill->pNotFillExprs); + code = collectFillExprs(pSelect, &pFill->pFillExprs, &pFill->pNotFillExprs, &pFill->pFillNullExprs); if (TSDB_CODE_SUCCESS == code) { code = rewriteExprsForSelect(pFill->pFillExprs, pSelect, SQL_CLAUSE_FILL, NULL); } if (TSDB_CODE_SUCCESS == code) { code = rewriteExprsForSelect(pFill->pNotFillExprs, pSelect, SQL_CLAUSE_FILL, NULL); } + if (TSDB_CODE_SUCCESS == code && LIST_LENGTH(pFill->pFillNullExprs) > 0) { + code = createColumnByRewriteExprs(pFill->pFillNullExprs, &pFill->node.pTargets); + } if (TSDB_CODE_SUCCESS == code) { code = createColumnByRewriteExprs(pFill->pFillExprs, &pFill->node.pTargets); } diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index 738ccf3224..f05966802b 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -2605,6 +2605,12 @@ static int32_t createFillPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren if (TSDB_CODE_SUCCESS == code) { code = addDataBlockSlots(pCxt, pFill->pNotFillExprs, pFill->node.pOutputDataBlockDesc); } + if (TSDB_CODE_SUCCESS == code && LIST_LENGTH(pFillNode->pFillNullExprs) > 0) { + code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pFillNode->pFillNullExprs, &pFill->pFillNullExprs); + if (TSDB_CODE_SUCCESS == code ) { + code = addDataBlockSlots(pCxt, pFill->pFillNullExprs, pFill->node.pOutputDataBlockDesc); + } + } if (TSDB_CODE_SUCCESS == code) { code = setNodeSlotId(pCxt, pChildTupe->dataBlockId, -1, pFillNode->pWStartTs, &pFill->pWStartTs); diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 9e0e6a0d24..c3e214b5e3 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -27,7 +27,7 @@ typedef struct { typedef struct SConnList { queue conns; int32_t size; - int32_t totaSize; + int32_t totalSize; } SConnList; typedef struct { @@ -855,10 +855,9 @@ static int32_t cliGetConnFromPool(SCliThrd* pThrd, const char* key, SCliConn** p } if (QUEUE_IS_EMPTY(&plist->conns)) { - if (plist->size >= pInst->connLimitNum) { + if (plist->totalSize >= pInst->connLimitNum) { return TSDB_CODE_RPC_MAX_SESSIONS; } - plist->totaSize += 1; return TSDB_CODE_RPC_NETWORK_BUSY; } @@ -1046,7 +1045,7 @@ static int32_t cliCreateConn(SCliThrd* pThrd, SCliConn** pCliConn, char* ip, int conn->hostThrd = pThrd; conn->seq = 0; - conn->pQTable = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); + conn->pQTable = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); if (conn->pQTable == NULL) { TAOS_CHECK_GOTO(terrno, NULL, _failed); } @@ -1249,7 +1248,7 @@ static void cliHandleException(SCliConn* conn) { cliDestroyAllQidFromThrd(conn); QUEUE_REMOVE(&conn->q); if (conn->list) { - conn->list->totaSize -= 1; + conn->list->totalSize -= 1; conn->list = NULL; } @@ -1548,10 +1547,15 @@ static int32_t cliDoConn(SCliThrd* pThrd, SCliConn* conn) { } transRefCliHandle(conn); + + conn->list = taosHashGet((SHashObj*)pThrd->pool, conn->dstAddr, strlen(conn->dstAddr)); + if (conn->list != NULL) { + conn->list->totalSize += 1; + } + ret = uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb); if (ret != 0) { tError("failed connect to %s since %s", conn->dstAddr, uv_err_name(ret)); - TAOS_CHECK_GOTO(TSDB_CODE_THIRDPARTY_ERROR, &lino, _exception1); } @@ -2363,7 +2367,7 @@ static int32_t createThrdObj(void* trans, SCliThrd** ppThrd) { } } - pThrd->pool = createConnPool(4); + pThrd->pool = createConnPool(128); if (pThrd->pool == NULL) { code = terrno; TAOS_CHECK_GOTO(terrno, NULL, _end); @@ -2382,22 +2386,22 @@ static int32_t createThrdObj(void* trans, SCliThrd** ppThrd) { pThrd->destroyAhandleFp = pInst->destroyFp; - pThrd->fqdn2ipCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); + pThrd->fqdn2ipCache = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); if (pThrd->fqdn2ipCache == NULL) { TAOS_CHECK_GOTO(terrno, NULL, _end); } - pThrd->batchCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); + pThrd->batchCache = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); if (pThrd->batchCache == NULL) { TAOS_CHECK_GOTO(terrno, NULL, _end); } - pThrd->connHeapCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); + pThrd->connHeapCache = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); if (pThrd->connHeapCache == NULL) { TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end); } - pThrd->pIdConnTable = taosHashInit(512, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); + pThrd->pIdConnTable = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); if (pThrd->connHeapCache == NULL) { TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end); } @@ -3739,7 +3743,7 @@ static FORCE_INLINE int8_t shouldSWitchToOtherConn(SCliConn* pConn, char* key) { tTrace("conn %p get list %p from pool for key:%s", pConn, pConn->list, key); } } - if (pConn->list && pConn->list->totaSize >= pInst->connLimitNum / 4) { + if (pConn->list && pConn->list->totalSize >= pInst->connLimitNum / 4) { tWarn("%s conn %p try to remove timeout msg since too many conn created", transLabel(pInst), pConn); if (cliConnRemoveTimeoutMsg(pConn)) { diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index a7c24f3fae..5723f2ff23 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -239,7 +239,7 @@ SIpWhiteListTab* uvWhiteListCreate() { return NULL; } - pWhiteList->pList = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), 0, HASH_NO_LOCK); + pWhiteList->pList = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), 0, HASH_NO_LOCK); if (pWhiteList->pList == NULL) { taosMemoryFree(pWhiteList); return NULL; @@ -1333,7 +1333,7 @@ static FORCE_INLINE SSvrConn* createConn(void* hThrd) { QUEUE_INIT(&exh->q); tTrace("%s handle %p, conn %p created, refId:%" PRId64, transLabel(pInst), exh, pConn, pConn->refId); - pConn->pQTable = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); + pConn->pQTable = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); if (pConn->pQTable == NULL) { TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _end); } diff --git a/tests/system-test/2-query/fill_with_group.py b/tests/system-test/2-query/fill_with_group.py index 2139bbbfb3..3b98ec30ce 100644 --- a/tests/system-test/2-query/fill_with_group.py +++ b/tests/system-test/2-query/fill_with_group.py @@ -237,11 +237,123 @@ class TDTestCase: tdSql.checkData(12, 1, None) tdSql.checkData(13, 1, None) + def test_fill_with_complex_expr(self): + sql = "SELECT _wstart, _wstart + 1d, count(*), now, 1+1 FROM meters WHERE ts >= '2018-09-20 00:00:00.000' AND ts < '2018-09-20 01:00:00.000' INTERVAL(5m) FILL(NULL)" + tdSql.query(sql, queryTimes=1) + tdSql.checkRows(12) + for i in range(0, 12, 2): + tdSql.checkData(i, 2, 10) + for i in range(1, 12, 2): + tdSql.checkData(i, 2, None) + for i in range(0, 12): + firstCol = tdSql.getData(i, 0) + secondCol = tdSql.getData(i, 1) + tdLog.debug(f"firstCol: {firstCol}, secondCol: {secondCol}, secondCol - firstCol: {secondCol - firstCol}") + if secondCol - firstCol != timedelta(days=1): + tdLog.exit(f"query error: secondCol - firstCol: {secondCol - firstCol}") + nowCol = tdSql.getData(i, 3) + if nowCol is None: + tdLog.exit(f"query error: nowCol: {nowCol}") + constCol = tdSql.getData(i, 4) + if constCol != 2: + tdLog.exit(f"query error: constCol: {constCol}") + + sql = "SELECT _wstart + 1d, count(*), last(ts) + 1a, timediff(_wend, last(ts)) FROM meters WHERE ts >= '2018-09-20 00:00:00.000' AND ts < '2018-09-20 01:00:00.000' INTERVAL(5m) FILL(NULL)" + tdSql.query(sql, queryTimes=1) + tdSql.checkRows(12) + for i in range(0, 12, 2): + tdSql.checkData(i, 1, 10) + tdSql.checkData(i, 3, 300000) + for i in range(1, 12, 2): + tdSql.checkData(i, 1, None) + tdSql.checkData(i, 2, None) + tdSql.checkData(i, 3, None) + + sql = "SELECT count(*), tbname FROM meters WHERE ts >= '2018-09-20 00:00:00.000' AND ts < '2018-09-20 01:00:00.000' PARTITION BY tbname INTERVAL(5m) FILL(NULL)" + tdSql.query(sql, queryTimes=1) + tdSql.checkRows(120) + + sql = "SELECT * from (SELECT count(*), timediff(_wend, last(ts)) + t1, tbname FROM meters WHERE ts >= '2018-09-20 00:00:00.000' AND ts < '2018-09-20 01:00:00.000' PARTITION BY tbname, t1 INTERVAL(5m) FILL(NULL) LIMIT 1) order by tbname" + tdSql.query(sql, queryTimes=1) + tdSql.checkRows(10) + j = 0 + for i in range(0, 10): + tdSql.checkData(i, 1, 300000 + j) + j = j + 1 + if j == 5: + j = 0 + + sql = "SELECT count(*), timediff(_wend, last(ts)) + t1, tbname,t1 FROM meters WHERE ts >= '2018-09-20 00:00:00.000' AND ts < '2018-09-20 01:00:00.000' PARTITION BY tbname, t1 INTERVAL(5m) FILL(NULL) ORDER BY timediff(last(ts), _wstart)" + tdSql.query(sql, queryTimes=1) + tdSql.checkRows(120) + + sql = "SELECT 1+1, count(*), timediff(_wend, last(ts)) + t1 FROM meters WHERE ts >= '2018-09-20 00:00:00.000' AND ts < '2018-09-20 01:00:00.000' PARTITION BY tbname, t1 INTERVAL(5m) FILL(NULL) HAVING(timediff(last(ts), _wstart)+ t1 >= 1) ORDER BY timediff(last(ts), _wstart)" + tdSql.query(sql, queryTimes=1) + tdSql.checkRows(48) + + sql = "SELECT count(*), timediff(_wend, last(ts)) + t1, timediff('2018-09-20 01:00:00', _wstart) + t1, concat(to_char(_wstart, 'HH:MI:SS__'), tbname) FROM meters WHERE ts >= '2018-09-20 00:00:00.000' AND ts < '2018-09-20 01:00:00.000' PARTITION BY tbname, t1 INTERVAL(5m) FILL(NULL) HAVING(timediff(last(ts), _wstart) + t1 >= 1) ORDER BY timediff(last(ts), _wstart), tbname" + tdSql.query(sql, queryTimes=1) + tdSql.checkRows(48) + + sql = "SELECT count(*) FROM meters WHERE ts >= '2018-09-20 00:00:00.000' AND ts < '2018-09-20 01:00:00.000' PARTITION BY tbname, t1 INTERVAL(5m) FILL(NULL) HAVING(timediff(last(ts), _wstart) >= 0)" + tdSql.query(sql, queryTimes=1) + tdSql.checkRows(60) + + sql = "SELECT count(*) + 1 FROM meters WHERE ts >= '2018-09-20 00:00:00.000' AND ts < '2018-09-20 01:00:00.000' PARTITION BY tbname, t1 INTERVAL(5m) FILL(NULL) HAVING(count(*) > 1)" + tdSql.query(sql, queryTimes=1) + tdSql.checkRows(0) + + sql = "SELECT count(*), timediff(_wend, last(ts)) + t1, timediff('2018-09-20 01:00:00', _wstart) + t1, concat(to_char(_wstart, 'HH:MI:SS__'), tbname) FROM meters WHERE ts >= '2018-09-20 00:00:00.000' AND ts < '2018-09-20 01:00:00.000' PARTITION BY tbname, t1 INTERVAL(5m) FILL(value, 0, 0) HAVING(timediff(last(ts), _wstart) + t1 >= 1) ORDER BY timediff(last(ts), _wstart), tbname" + tdSql.query(sql, queryTimes=1) + tdSql.checkRows(48) + sql = "SELECT count(*), timediff(_wend, last(ts)) + t1, timediff('2018-09-20 01:00:00', _wstart) + t1, concat(to_char(_wstart, 'HH:MI:SS__'), tbname) FROM meters WHERE ts >= '2018-09-20 00:00:00.000' AND ts < '2018-09-20 01:00:00.000' PARTITION BY tbname, t1 INTERVAL(5m) FILL(value, 0, 0) HAVING(count(*) >= 0) ORDER BY timediff(last(ts), _wstart), tbname" + tdSql.query(sql, queryTimes=1) + tdSql.checkRows(120) + sql = "SELECT count(*), timediff(_wend, last(ts)) + t1, timediff('2018-09-20 01:00:00', _wstart) + t1, concat(to_char(_wstart, 'HH:MI:SS__'), tbname) FROM meters WHERE ts >= '2018-09-20 00:00:00.000' AND ts < '2018-09-20 01:00:00.000' PARTITION BY tbname, t1 INTERVAL(5m) FILL(value, 0, 0) HAVING(count(*) > 0) ORDER BY timediff(last(ts), _wstart), tbname" + tdSql.query(sql, queryTimes=1) + tdSql.checkRows(60) + sql = "SELECT count(*), timediff(_wend, last(ts)) + t1, timediff('2018-09-20 01:00:00', _wstart) + t1, concat(to_char(_wstart, 'HH:MI:SS__'), tbname) FROM meters WHERE ts >= '2018-09-20 00:00:00.000' AND ts < '2018-09-20 01:00:00.000' PARTITION BY tbname INTERVAL(5m) FILL(linear) HAVING(count(*) >= 0 and t1 <= 1) ORDER BY timediff(last(ts), _wstart), tbname, t1" + tdSql.query(sql, queryTimes=1) + tdSql.checkRows(44) + sql = "SELECT count(*), timediff(_wend, last(ts)) + t1, timediff('2018-09-20 01:00:00', _wstart) + t1, concat(to_char(_wstart, 'HH:MI:SS__'), tbname) FROM meters WHERE ts >= '2018-09-20 00:00:00.000' AND ts < '2018-09-20 01:00:00.000' PARTITION BY tbname INTERVAL(5m) FILL(prev) HAVING(count(*) >= 0 and t1 > 1) ORDER BY timediff(last(ts), _wstart), tbname, t1" + tdSql.query(sql, queryTimes=1) + tdSql.checkRows(72) + + sql = "SELECT 1+1, count(*), timediff(_wend, last(ts)) + t1, timediff('2018-09-20 01:00:00', _wstart) + t1, concat(to_char(_wstart, 'HH:MI:SS__'), tbname) FROM meters WHERE ts >= '2018-09-20 00:00:00.000' AND ts < '2018-09-20 01:00:00.000' PARTITION BY tbname INTERVAL(5m) FILL(linear) ORDER BY tbname, _wstart;" + tdSql.query(sql, queryTimes=1) + tdSql.checkRows(120) + for i in range(11, 120, 12): + tdSql.checkData(i, 1, None) + for i in range(0, 120): + tdSql.checkData(i, 0, 2) + + sql = "SELECT count(*), timediff(_wend, last(ts)) + t1, timediff('2018-09-20 01:00:00', _wstart) + t1, concat(to_char(_wstart, 'HH:MI:SS__'), tbname) FROM meters WHERE ts >= '2018-09-20 00:00:00.000' AND ts < '2018-09-20 01:00:00.000' PARTITION BY tbname INTERVAL(5m) FILL(linear) HAVING(count(*) >= 0) ORDER BY tbname;" + tdSql.query(sql, queryTimes=1) + tdSql.checkRows(110) + for i in range(0, 110, 11): + lastCol = tdSql.getData(i, 3) + tdLog.debug(f"lastCol: {lastCol}") + if lastCol[-1:] != str(i//11): + tdLog.exit(f"query error: lastCol: {lastCol}") + + sql = "SELECT 1+1, count(*), timediff(_wend, last(ts)) + t1, timediff('2018-09-20 01:00:00', _wstart) + t1,t1 FROM meters WHERE ts >= '2018-09-20 00:00:00.000' AND ts < '2018-09-20 01:00:00.000' PARTITION BY t1 INTERVAL(5m) FILL(linear) ORDER BY t1, _wstart;" + tdSql.query(sql, queryTimes=1) + tdSql.checkRows(60) + + sql = "SELECT 1+1, count(*), timediff(_wend, last(ts)) + t1, timediff('2018-09-20 01:00:00', _wstart) + t1,t1 FROM meters WHERE ts >= '2018-09-20 00:00:00.000' AND ts < '2018-09-20 01:00:00.000' PARTITION BY t1 INTERVAL(5m) FILL(linear) HAVING(count(*) > 0) ORDER BY t1, _wstart;" + tdSql.query(sql, queryTimes=1) + tdSql.checkRows(55) + + # TODO Fix Me! + sql = "explain SELECT count(*), timediff(_wend, last(ts)), timediff('2018-09-20 01:00:00', _wstart) FROM meters WHERE ts >= '2018-09-20 00:00:00.000' AND ts < '2018-09-20 01:00:00.000' PARTITION BY concat(tbname, 'asd') INTERVAL(5m) having(concat(tbname, 'asd') like '%asd');" + tdSql.error(sql, -2147473664) # Error: Planner internal error + def run(self): self.prepareTestEnv() self.test_partition_by_with_interval_fill_prev_new_group_fill_error() self.test_fill_with_order_by() self.test_fill_with_order_by2() + self.test_fill_with_complex_expr() def stop(self): tdSql.close()