Merge pull request #28485 from taosdata/merge/30tomain
merge: from 3.0 to main branch
This commit is contained in:
commit
9a7000da4a
|
@ -80,7 +80,7 @@ These pseudocolumns occur after the aggregation clause.
|
||||||
`FILL` clause is used to specify how to fill when there is data missing in any window, including:
|
`FILL` clause is used to specify how to fill when there is data missing in any window, including:
|
||||||
|
|
||||||
1. NONE: No fill (the default fill mode)
|
1. NONE: No fill (the default fill mode)
|
||||||
2. VALUE: Fill with a fixed value, which should be specified together, for example `FILL(VALUE, 1.23)` Note: The value filled depends on the data type. For example, if you run FILL(VALUE 1.23) on an integer column, the value 1 is filled. If multiple columns in select list need to be filled, then in the fill clause there must be a fill value for each of these columns, for example, `SELECT _wstart, min(c1), max(c1) FROM ... FILL(VALUE, 0, 0)`.
|
2. VALUE: Fill with a fixed value, which should be specified together, for example `FILL(VALUE, 1.23)` Note: The value filled depends on the data type. For example, if you run FILL(VALUE 1.23) on an integer column, the value 1 is filled. If multiple columns in select list need to be filled, then in the fill clause there must be a fill value for each of these columns, for example, `SELECT _wstart, min(c1), max(c1) FROM ... FILL(VALUE, 0, 0)`. Note that only exprs in select list that contains normal cols need to specify fill value, exprs like `_wstart`, `_wend`, `_wduration`, `_wstart + 1a`, `now`, `1+1`, partition keys like tbname(when using partition by) don't need to specify fill value. But exprs like `timediff(last(ts), _wstart)` need to specify fill value.
|
||||||
3. PREV: Fill with the previous non-NULL value, `FILL(PREV)`
|
3. PREV: Fill with the previous non-NULL value, `FILL(PREV)`
|
||||||
4. NULL: Fill with NULL, `FILL(NULL)`
|
4. NULL: Fill with NULL, `FILL(NULL)`
|
||||||
5. LINEAR: Fill with the closest non-NULL value, `FILL(LINEAR)`
|
5. LINEAR: Fill with the closest non-NULL value, `FILL(LINEAR)`
|
||||||
|
|
|
@ -76,7 +76,7 @@ window_clause: {
|
||||||
FILL 语句指定某一窗口区间数据缺失的情况下的填充模式。填充模式包括以下几种:
|
FILL 语句指定某一窗口区间数据缺失的情况下的填充模式。填充模式包括以下几种:
|
||||||
|
|
||||||
1. 不进行填充:NONE(默认填充模式)。
|
1. 不进行填充:NONE(默认填充模式)。
|
||||||
2. VALUE 填充:固定值填充,此时需要指定填充的数值。例如:FILL(VALUE, 1.23)。这里需要注意,最终填充的值受由相应列的类型决定,如 FILL(VALUE, 1.23),相应列为 INT 类型,则填充值为 1, 若查询列表中有多列需要FILL, 则需要给每一个FILL列指定VALUE, 如`SELECT _wstart, min(c1), max(c1) FROM ... FILL(VALUE, 0, 0)`。
|
2. VALUE 填充:固定值填充,此时需要指定填充的数值。例如:FILL(VALUE, 1.23)。这里需要注意,最终填充的值受由相应列的类型决定,如 FILL(VALUE, 1.23),相应列为 INT 类型,则填充值为 1, 若查询列表中有多列需要 FILL, 则需要给每一个 FILL 列指定 VALUE, 如 `SELECT _wstart, min(c1), max(c1) FROM ... FILL(VALUE, 0, 0)`, 注意, SELECT 表达式中只有包含普通列时才需要指定 FILL VALUE, 如 `_wstart`, `_wstart+1a`, `now`, `1+1` 以及使用 partition by 时的 partition key (如 tbname)都不需要指定 VALUE, 如 `timediff(last(ts), _wstart)` 则需要指定VALUE。
|
||||||
3. PREV 填充:使用前一个非 NULL 值填充数据。例如:FILL(PREV)。
|
3. PREV 填充:使用前一个非 NULL 值填充数据。例如:FILL(PREV)。
|
||||||
4. NULL 填充:使用 NULL 填充数据。例如:FILL(NULL)。
|
4. NULL 填充:使用 NULL 填充数据。例如:FILL(NULL)。
|
||||||
5. LINEAR 填充:根据前后距离最近的非 NULL 值做线性插值填充。例如:FILL(LINEAR)。
|
5. LINEAR 填充:根据前后距离最近的非 NULL 值做线性插值填充。例如:FILL(LINEAR)。
|
||||||
|
|
|
@ -333,6 +333,7 @@ typedef struct SFillLogicNode {
|
||||||
SNode* pWStartTs;
|
SNode* pWStartTs;
|
||||||
SNode* pValues; // SNodeListNode
|
SNode* pValues; // SNodeListNode
|
||||||
STimeWindow timeRange;
|
STimeWindow timeRange;
|
||||||
|
SNodeList* pFillNullExprs;
|
||||||
} SFillLogicNode;
|
} SFillLogicNode;
|
||||||
|
|
||||||
typedef struct SSortLogicNode {
|
typedef struct SSortLogicNode {
|
||||||
|
@ -677,6 +678,7 @@ typedef struct SFillPhysiNode {
|
||||||
SNode* pWStartTs; // SColumnNode
|
SNode* pWStartTs; // SColumnNode
|
||||||
SNode* pValues; // SNodeListNode
|
SNode* pValues; // SNodeListNode
|
||||||
STimeWindow timeRange;
|
STimeWindow timeRange;
|
||||||
|
SNodeList* pFillNullExprs;
|
||||||
} SFillPhysiNode;
|
} SFillPhysiNode;
|
||||||
|
|
||||||
typedef SFillPhysiNode SStreamFillPhysiNode;
|
typedef SFillPhysiNode SStreamFillPhysiNode;
|
||||||
|
|
|
@ -56,7 +56,7 @@ int32_t tsShellActivityTimer = 3; // second
|
||||||
// queue & threads
|
// queue & threads
|
||||||
int32_t tsNumOfRpcThreads = 1;
|
int32_t tsNumOfRpcThreads = 1;
|
||||||
int32_t tsNumOfRpcSessions = 30000;
|
int32_t tsNumOfRpcSessions = 30000;
|
||||||
int32_t tsShareConnLimit = 8;
|
int32_t tsShareConnLimit = 10;
|
||||||
int32_t tsReadTimeout = 900;
|
int32_t tsReadTimeout = 900;
|
||||||
int32_t tsTimeToGetAvailableConn = 500000;
|
int32_t tsTimeToGetAvailableConn = 500000;
|
||||||
int32_t tsKeepAliveIdle = 60;
|
int32_t tsKeepAliveIdle = 60;
|
||||||
|
|
|
@ -35,6 +35,7 @@ typedef struct SFillColInfo {
|
||||||
SExprInfo* pExpr;
|
SExprInfo* pExpr;
|
||||||
bool notFillCol; // denote if this column needs fill operation
|
bool notFillCol; // denote if this column needs fill operation
|
||||||
SVariant fillVal;
|
SVariant fillVal;
|
||||||
|
bool fillNull;
|
||||||
} SFillColInfo;
|
} SFillColInfo;
|
||||||
|
|
||||||
typedef struct SFillLinearInfo {
|
typedef struct SFillLinearInfo {
|
||||||
|
@ -125,12 +126,14 @@ void taosFillSetInputDataBlock(struct SFillInfo* pFillInfo, const struc
|
||||||
void taosFillUpdateStartTimestampInfo(SFillInfo* pFillInfo, int64_t ts);
|
void taosFillUpdateStartTimestampInfo(SFillInfo* pFillInfo, int64_t ts);
|
||||||
bool taosFillNotStarted(const SFillInfo* pFillInfo);
|
bool taosFillNotStarted(const SFillInfo* pFillInfo);
|
||||||
SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfFillExpr, SExprInfo* pNotFillExpr,
|
SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfFillExpr, SExprInfo* pNotFillExpr,
|
||||||
int32_t numOfNotFillCols, const struct SNodeListNode* val);
|
int32_t numOfNotFillCols, SExprInfo* pFillNullExpr, int32_t numOfFillNullExprs,
|
||||||
|
const struct SNodeListNode* val);
|
||||||
bool taosFillHasMoreResults(struct SFillInfo* pFillInfo);
|
bool taosFillHasMoreResults(struct SFillInfo* pFillInfo);
|
||||||
|
|
||||||
int32_t taosCreateFillInfo(TSKEY skey, int32_t numOfFillCols, int32_t numOfNotFillCols, int32_t capacity,
|
int32_t taosCreateFillInfo(TSKEY skey, int32_t numOfFillCols, int32_t numOfNotFillCols, int32_t fillNullCols,
|
||||||
SInterval* pInterval, int32_t fillType, struct SFillColInfo* pCol, int32_t slotId,
|
int32_t capacity, SInterval* pInterval, int32_t fillType, struct SFillColInfo* pCol,
|
||||||
int32_t order, const char* id, SExecTaskInfo* pTaskInfo, SFillInfo** ppFillInfo);
|
int32_t slotId, int32_t order, const char* id, SExecTaskInfo* pTaskInfo,
|
||||||
|
SFillInfo** ppFillInfo);
|
||||||
|
|
||||||
void* taosDestroyFillInfo(struct SFillInfo* pFillInfo);
|
void* taosDestroyFillInfo(struct SFillInfo* pFillInfo);
|
||||||
int32_t taosFillResultDataBlock(struct SFillInfo* pFillInfo, SSDataBlock* p, int32_t capacity);
|
int32_t taosFillResultDataBlock(struct SFillInfo* pFillInfo, SSDataBlock* p, int32_t capacity);
|
||||||
|
|
|
@ -53,6 +53,7 @@ typedef struct SFillOperatorInfo {
|
||||||
SExprInfo* pExprInfo;
|
SExprInfo* pExprInfo;
|
||||||
int32_t numOfExpr;
|
int32_t numOfExpr;
|
||||||
SExprSupp noFillExprSupp;
|
SExprSupp noFillExprSupp;
|
||||||
|
SExprSupp fillNullExprSupp;
|
||||||
} SFillOperatorInfo;
|
} SFillOperatorInfo;
|
||||||
|
|
||||||
static void destroyFillOperatorInfo(void* param);
|
static void destroyFillOperatorInfo(void* param);
|
||||||
|
@ -140,6 +141,15 @@ void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int
|
||||||
code = projectApplyFunctions(pNoFillSupp->pExprInfo, pInfo->pRes, pBlock, pNoFillSupp->pCtx, pNoFillSupp->numOfExprs,
|
code = projectApplyFunctions(pNoFillSupp->pExprInfo, pInfo->pRes, pBlock, pNoFillSupp->pCtx, pNoFillSupp->numOfExprs,
|
||||||
NULL);
|
NULL);
|
||||||
QUERY_CHECK_CODE(code, lino, _end);
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
|
|
||||||
|
if (pInfo->fillNullExprSupp.pExprInfo) {
|
||||||
|
pInfo->pRes->info.rows = 0;
|
||||||
|
code = setInputDataBlock(&pInfo->fillNullExprSupp, pBlock, order, scanFlag, false);
|
||||||
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
|
code = projectApplyFunctions(pInfo->fillNullExprSupp.pExprInfo, pInfo->pRes, pBlock, pInfo->fillNullExprSupp.pCtx,
|
||||||
|
pInfo->fillNullExprSupp.numOfExprs, NULL);
|
||||||
|
}
|
||||||
|
|
||||||
pInfo->pRes->info.id.groupId = pBlock->info.id.groupId;
|
pInfo->pRes->info.id.groupId = pBlock->info.id.groupId;
|
||||||
|
|
||||||
_end:
|
_end:
|
||||||
|
@ -327,6 +337,7 @@ void destroyFillOperatorInfo(void* param) {
|
||||||
pInfo->pFinalRes = NULL;
|
pInfo->pFinalRes = NULL;
|
||||||
|
|
||||||
cleanupExprSupp(&pInfo->noFillExprSupp);
|
cleanupExprSupp(&pInfo->noFillExprSupp);
|
||||||
|
cleanupExprSupp(&pInfo->fillNullExprSupp);
|
||||||
|
|
||||||
taosMemoryFreeClear(pInfo->p);
|
taosMemoryFreeClear(pInfo->p);
|
||||||
taosArrayDestroy(pInfo->matchInfo.pList);
|
taosArrayDestroy(pInfo->matchInfo.pList);
|
||||||
|
@ -334,10 +345,11 @@ void destroyFillOperatorInfo(void* param) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t numOfCols, SExprInfo* pNotFillExpr,
|
static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t numOfCols, SExprInfo* pNotFillExpr,
|
||||||
int32_t numOfNotFillCols, SNodeListNode* pValNode, STimeWindow win, int32_t capacity,
|
int32_t numOfNotFillCols, SExprInfo* pFillNullExpr, int32_t numOfFillNullExprs,
|
||||||
const char* id, SInterval* pInterval, int32_t fillType, int32_t order,
|
SNodeListNode* pValNode, STimeWindow win, int32_t capacity, const char* id,
|
||||||
SExecTaskInfo* pTaskInfo) {
|
SInterval* pInterval, int32_t fillType, int32_t order, SExecTaskInfo* pTaskInfo) {
|
||||||
SFillColInfo* pColInfo = createFillColInfo(pExpr, numOfCols, pNotFillExpr, numOfNotFillCols, pValNode);
|
SFillColInfo* pColInfo =
|
||||||
|
createFillColInfo(pExpr, numOfCols, pNotFillExpr, numOfNotFillCols, pFillNullExpr, numOfFillNullExprs, pValNode);
|
||||||
if (!pColInfo) {
|
if (!pColInfo) {
|
||||||
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
|
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
|
||||||
return terrno;
|
return terrno;
|
||||||
|
@ -348,8 +360,8 @@ static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t
|
||||||
// STimeWindow w = {0};
|
// STimeWindow w = {0};
|
||||||
// getInitialStartTimeWindow(pInterval, startKey, &w, order == TSDB_ORDER_ASC);
|
// getInitialStartTimeWindow(pInterval, startKey, &w, order == TSDB_ORDER_ASC);
|
||||||
pInfo->pFillInfo = NULL;
|
pInfo->pFillInfo = NULL;
|
||||||
int32_t code = taosCreateFillInfo(startKey, numOfCols, numOfNotFillCols, capacity, pInterval, fillType, pColInfo,
|
int32_t code = taosCreateFillInfo(startKey, numOfCols, numOfNotFillCols, numOfFillNullExprs, capacity, pInterval,
|
||||||
pInfo->primaryTsCol, order, id, pTaskInfo, &pInfo->pFillInfo);
|
fillType, pColInfo, pInfo->primaryTsCol, order, id, pTaskInfo, &pInfo->pFillInfo);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
|
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
|
||||||
return code;
|
return code;
|
||||||
|
@ -455,6 +467,13 @@ int32_t createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFi
|
||||||
initExprSupp(pNoFillSupp, pNoFillSupp->pExprInfo, pNoFillSupp->numOfExprs, &pTaskInfo->storageAPI.functionStore);
|
initExprSupp(pNoFillSupp, pNoFillSupp->pExprInfo, pNoFillSupp->numOfExprs, &pTaskInfo->storageAPI.functionStore);
|
||||||
QUERY_CHECK_CODE(code, lino, _error);
|
QUERY_CHECK_CODE(code, lino, _error);
|
||||||
|
|
||||||
|
code = createExprInfo(pPhyFillNode->pFillNullExprs, NULL, &pInfo->fillNullExprSupp.pExprInfo,
|
||||||
|
&pInfo->fillNullExprSupp.numOfExprs);
|
||||||
|
QUERY_CHECK_CODE(code, lino, _error);
|
||||||
|
code = initExprSupp(&pInfo->fillNullExprSupp, pInfo->fillNullExprSupp.pExprInfo, pInfo->fillNullExprSupp.numOfExprs,
|
||||||
|
&pTaskInfo->storageAPI.functionStore);
|
||||||
|
QUERY_CHECK_CODE(code, lino, _error);
|
||||||
|
|
||||||
SInterval* pInterval =
|
SInterval* pInterval =
|
||||||
QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL == downstream->operatorType
|
QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL == downstream->operatorType
|
||||||
? &((SMergeAlignedIntervalAggOperatorInfo*)downstream->info)->intervalAggOperatorInfo->interval
|
? &((SMergeAlignedIntervalAggOperatorInfo*)downstream->info)->intervalAggOperatorInfo->interval
|
||||||
|
@ -482,7 +501,9 @@ int32_t createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFi
|
||||||
code = extractColMatchInfo(pPhyFillNode->pFillExprs, pPhyFillNode->node.pOutputDataBlockDesc, &numOfOutputCols,
|
code = extractColMatchInfo(pPhyFillNode->pFillExprs, pPhyFillNode->node.pOutputDataBlockDesc, &numOfOutputCols,
|
||||||
COL_MATCH_FROM_SLOT_ID, &pInfo->matchInfo);
|
COL_MATCH_FROM_SLOT_ID, &pInfo->matchInfo);
|
||||||
|
|
||||||
|
QUERY_CHECK_CODE(code, lino, _error);
|
||||||
code = initFillInfo(pInfo, pExprInfo, pInfo->numOfExpr, pNoFillSupp->pExprInfo, pNoFillSupp->numOfExprs,
|
code = initFillInfo(pInfo, pExprInfo, pInfo->numOfExpr, pNoFillSupp->pExprInfo, pNoFillSupp->numOfExprs,
|
||||||
|
pInfo->fillNullExprSupp.pExprInfo, pInfo->fillNullExprSupp.numOfExprs,
|
||||||
(SNodeListNode*)pPhyFillNode->pValues, pPhyFillNode->timeRange, pResultInfo->capacity,
|
(SNodeListNode*)pPhyFillNode->pValues, pPhyFillNode->timeRange, pResultInfo->capacity,
|
||||||
pTaskInfo->id.str, pInterval, type, order, pTaskInfo);
|
pTaskInfo->id.str, pInterval, type, order, pTaskInfo);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
|
|
@ -1201,7 +1201,7 @@ static SStreamFillSupporter* initStreamFillSup(SStreamFillPhysiNode* pPhyFillNod
|
||||||
QUERY_CHECK_CODE(code, lino, _end);
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
|
|
||||||
pFillSup->pAllColInfo = createFillColInfo(pFillExprInfo, pFillSup->numOfFillCols, noFillExprInfo, numOfNotFillCols,
|
pFillSup->pAllColInfo = createFillColInfo(pFillExprInfo, pFillSup->numOfFillCols, noFillExprInfo, numOfNotFillCols,
|
||||||
(const SNodeListNode*)(pPhyFillNode->pValues));
|
NULL, 0, (const SNodeListNode*)(pPhyFillNode->pValues));
|
||||||
if (pFillSup->pAllColInfo == NULL) {
|
if (pFillSup->pAllColInfo == NULL) {
|
||||||
code = terrno;
|
code = terrno;
|
||||||
lino = __LINE__;
|
lino = __LINE__;
|
||||||
|
|
|
@ -39,6 +39,10 @@
|
||||||
static int32_t doSetVal(SColumnInfoData* pDstColInfoData, int32_t rowIndex, const SGroupKeys* pKey);
|
static int32_t doSetVal(SColumnInfoData* pDstColInfoData, int32_t rowIndex, const SGroupKeys* pKey);
|
||||||
|
|
||||||
static void setNotFillColumn(SFillInfo* pFillInfo, SColumnInfoData* pDstColInfo, int32_t rowIndex, int32_t colIdx) {
|
static void setNotFillColumn(SFillInfo* pFillInfo, SColumnInfoData* pDstColInfo, int32_t rowIndex, int32_t colIdx) {
|
||||||
|
SFillColInfo* pCol = &pFillInfo->pFillCol[colIdx];
|
||||||
|
if (pCol->fillNull) {
|
||||||
|
colDataSetNULL(pDstColInfo, rowIndex);
|
||||||
|
} else {
|
||||||
SRowVal* p = NULL;
|
SRowVal* p = NULL;
|
||||||
if (pFillInfo->type == TSDB_FILL_NEXT) {
|
if (pFillInfo->type == TSDB_FILL_NEXT) {
|
||||||
p = FILL_IS_ASC_FILL(pFillInfo) ? &pFillInfo->next : &pFillInfo->prev;
|
p = FILL_IS_ASC_FILL(pFillInfo) ? &pFillInfo->next : &pFillInfo->prev;
|
||||||
|
@ -56,6 +60,7 @@ static void setNotFillColumn(SFillInfo* pFillInfo, SColumnInfoData* pDstColInfo,
|
||||||
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
|
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
|
||||||
T_LONG_JMP(pFillInfo->pTaskInfo->env, code);
|
T_LONG_JMP(pFillInfo->pTaskInfo->env, code);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void setNullRow(SSDataBlock* pBlock, SFillInfo* pFillInfo, int32_t rowIndex) {
|
static void setNullRow(SSDataBlock* pBlock, SFillInfo* pFillInfo, int32_t rowIndex) {
|
||||||
|
@ -545,9 +550,10 @@ static int32_t taosNumOfRemainRows(SFillInfo* pFillInfo) {
|
||||||
return pFillInfo->numOfRows - pFillInfo->index;
|
return pFillInfo->numOfRows - pFillInfo->index;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t taosCreateFillInfo(TSKEY skey, int32_t numOfFillCols, int32_t numOfNotFillCols, int32_t capacity,
|
int32_t taosCreateFillInfo(TSKEY skey, int32_t numOfFillCols, int32_t numOfNotFillCols, int32_t fillNullCols,
|
||||||
SInterval* pInterval, int32_t fillType, struct SFillColInfo* pCol, int32_t primaryTsSlotId,
|
int32_t capacity, SInterval* pInterval, int32_t fillType, struct SFillColInfo* pCol,
|
||||||
int32_t order, const char* id, SExecTaskInfo* pTaskInfo, SFillInfo** ppFillInfo) {
|
int32_t primaryTsSlotId, int32_t order, const char* id, SExecTaskInfo* pTaskInfo,
|
||||||
|
SFillInfo** ppFillInfo) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
if (fillType == TSDB_FILL_NONE) {
|
if (fillType == TSDB_FILL_NONE) {
|
||||||
|
@ -574,7 +580,7 @@ int32_t taosCreateFillInfo(TSKEY skey, int32_t numOfFillCols, int32_t numOfNotFi
|
||||||
|
|
||||||
pFillInfo->type = fillType;
|
pFillInfo->type = fillType;
|
||||||
pFillInfo->pFillCol = pCol;
|
pFillInfo->pFillCol = pCol;
|
||||||
pFillInfo->numOfCols = numOfFillCols + numOfNotFillCols;
|
pFillInfo->numOfCols = numOfFillCols + numOfNotFillCols + fillNullCols;
|
||||||
pFillInfo->alloc = capacity;
|
pFillInfo->alloc = capacity;
|
||||||
pFillInfo->id = id;
|
pFillInfo->id = id;
|
||||||
pFillInfo->interval = *pInterval;
|
pFillInfo->interval = *pInterval;
|
||||||
|
@ -761,10 +767,11 @@ _end:
|
||||||
int64_t getFillInfoStart(struct SFillInfo* pFillInfo) { return pFillInfo->start; }
|
int64_t getFillInfoStart(struct SFillInfo* pFillInfo) { return pFillInfo->start; }
|
||||||
|
|
||||||
SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfFillExpr, SExprInfo* pNotFillExpr,
|
SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfFillExpr, SExprInfo* pNotFillExpr,
|
||||||
int32_t numOfNoFillExpr, const struct SNodeListNode* pValNode) {
|
int32_t numOfNoFillExpr, SExprInfo* pFillNullExpr, int32_t numOfFillNullExpr,
|
||||||
|
const struct SNodeListNode* pValNode) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
SFillColInfo* pFillCol = taosMemoryCalloc(numOfFillExpr + numOfNoFillExpr, sizeof(SFillColInfo));
|
SFillColInfo* pFillCol = taosMemoryCalloc(numOfFillExpr + numOfNoFillExpr + numOfFillNullExpr, sizeof(SFillColInfo));
|
||||||
if (pFillCol == NULL) {
|
if (pFillCol == NULL) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -797,6 +804,13 @@ SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfFillExpr, SExprIn
|
||||||
pFillCol[i + numOfFillExpr].notFillCol = true;
|
pFillCol[i + numOfFillExpr].notFillCol = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < numOfFillNullExpr; ++i) {
|
||||||
|
SExprInfo* pExprInfo = &pFillNullExpr[i];
|
||||||
|
pFillCol[i + numOfFillExpr + numOfNoFillExpr].pExpr = pExprInfo;
|
||||||
|
pFillCol[i + numOfFillExpr + numOfNoFillExpr].notFillCol = true;
|
||||||
|
pFillCol[i + numOfFillExpr + numOfNoFillExpr].fillNull = true;
|
||||||
|
}
|
||||||
|
|
||||||
return pFillCol;
|
return pFillCol;
|
||||||
|
|
||||||
_end:
|
_end:
|
||||||
|
|
|
@ -1147,7 +1147,8 @@ int32_t createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyN
|
||||||
pInfo->fillType = convertFillType(pInterpPhyNode->fillMode);
|
pInfo->fillType = convertFillType(pInterpPhyNode->fillMode);
|
||||||
initResultSizeInfo(&pOperator->resultInfo, 4096);
|
initResultSizeInfo(&pOperator->resultInfo, 4096);
|
||||||
|
|
||||||
pInfo->pFillColInfo = createFillColInfo(pExprInfo, numOfExprs, NULL, 0, (SNodeListNode*)pInterpPhyNode->pFillValues);
|
pInfo->pFillColInfo =
|
||||||
|
createFillColInfo(pExprInfo, numOfExprs, NULL, 0, NULL, 0, (SNodeListNode*)pInterpPhyNode->pFillValues);
|
||||||
QUERY_CHECK_NULL(pInfo->pFillColInfo, code, lino, _error, terrno);
|
QUERY_CHECK_NULL(pInfo->pFillColInfo, code, lino, _error, terrno);
|
||||||
|
|
||||||
pInfo->pLinearInfo = NULL;
|
pInfo->pLinearInfo = NULL;
|
||||||
|
|
|
@ -642,6 +642,7 @@ static int32_t logicFillCopy(const SFillLogicNode* pSrc, SFillLogicNode* pDst) {
|
||||||
CLONE_NODE_FIELD(pWStartTs);
|
CLONE_NODE_FIELD(pWStartTs);
|
||||||
CLONE_NODE_FIELD(pValues);
|
CLONE_NODE_FIELD(pValues);
|
||||||
COPY_OBJECT_FIELD(timeRange, sizeof(STimeWindow));
|
COPY_OBJECT_FIELD(timeRange, sizeof(STimeWindow));
|
||||||
|
CLONE_NODE_LIST_FIELD(pFillNullExprs);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -2887,6 +2887,7 @@ static const char* jkFillPhysiPlanWStartTs = "WStartTs";
|
||||||
static const char* jkFillPhysiPlanValues = "Values";
|
static const char* jkFillPhysiPlanValues = "Values";
|
||||||
static const char* jkFillPhysiPlanStartTime = "StartTime";
|
static const char* jkFillPhysiPlanStartTime = "StartTime";
|
||||||
static const char* jkFillPhysiPlanEndTime = "EndTime";
|
static const char* jkFillPhysiPlanEndTime = "EndTime";
|
||||||
|
static const char* jkFillPhysiPlanFillNullExprs = "FillNullExprs";
|
||||||
|
|
||||||
static int32_t physiFillNodeToJson(const void* pObj, SJson* pJson) {
|
static int32_t physiFillNodeToJson(const void* pObj, SJson* pJson) {
|
||||||
const SFillPhysiNode* pNode = (const SFillPhysiNode*)pObj;
|
const SFillPhysiNode* pNode = (const SFillPhysiNode*)pObj;
|
||||||
|
@ -2913,6 +2914,9 @@ static int32_t physiFillNodeToJson(const void* pObj, SJson* pJson) {
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = tjsonAddIntegerToObject(pJson, jkFillPhysiPlanEndTime, pNode->timeRange.ekey);
|
code = tjsonAddIntegerToObject(pJson, jkFillPhysiPlanEndTime, pNode->timeRange.ekey);
|
||||||
}
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = nodeListToJson(pJson, jkFillPhysiPlanFillNullExprs, pNode->pFillNullExprs);
|
||||||
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -2942,6 +2946,9 @@ static int32_t jsonToPhysiFillNode(const SJson* pJson, void* pObj) {
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = tjsonGetBigIntValue(pJson, jkFillPhysiPlanEndTime, &pNode->timeRange.ekey);
|
code = tjsonGetBigIntValue(pJson, jkFillPhysiPlanEndTime, &pNode->timeRange.ekey);
|
||||||
}
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = jsonToNodeList(pJson, jkFillPhysiPlanFillNullExprs, &pNode->pFillNullExprs);
|
||||||
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
|
@ -3326,7 +3326,8 @@ enum {
|
||||||
PHY_FILL_CODE_WSTART,
|
PHY_FILL_CODE_WSTART,
|
||||||
PHY_FILL_CODE_VALUES,
|
PHY_FILL_CODE_VALUES,
|
||||||
PHY_FILL_CODE_TIME_RANGE,
|
PHY_FILL_CODE_TIME_RANGE,
|
||||||
PHY_FILL_CODE_INPUT_TS_ORDER
|
PHY_FILL_CODE_INPUT_TS_ORDER,
|
||||||
|
PHY_FILL_CODE_FILL_NULL_EXPRS,
|
||||||
};
|
};
|
||||||
|
|
||||||
static int32_t physiFillNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
|
static int32_t physiFillNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
|
||||||
|
@ -3351,6 +3352,9 @@ static int32_t physiFillNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = tlvEncodeObj(pEncoder, PHY_FILL_CODE_TIME_RANGE, timeWindowToMsg, &pNode->timeRange);
|
code = tlvEncodeObj(pEncoder, PHY_FILL_CODE_TIME_RANGE, timeWindowToMsg, &pNode->timeRange);
|
||||||
}
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = tlvEncodeObj(pEncoder, PHY_FILL_CODE_FILL_NULL_EXPRS, nodeListToMsg, pNode->pFillNullExprs);
|
||||||
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -3383,6 +3387,9 @@ static int32_t msgToPhysiFillNode(STlvDecoder* pDecoder, void* pObj) {
|
||||||
case PHY_FILL_CODE_TIME_RANGE:
|
case PHY_FILL_CODE_TIME_RANGE:
|
||||||
code = tlvDecodeObjFromTlv(pTlv, msgToTimeWindow, (void**)&pNode->timeRange);
|
code = tlvDecodeObjFromTlv(pTlv, msgToTimeWindow, (void**)&pNode->timeRange);
|
||||||
break;
|
break;
|
||||||
|
case PHY_FILL_CODE_FILL_NULL_EXPRS:
|
||||||
|
code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pFillNullExprs);
|
||||||
|
break;
|
||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1495,6 +1495,7 @@ void nodesDestroyNode(SNode* pNode) {
|
||||||
nodesDestroyNode(pLogicNode->pValues);
|
nodesDestroyNode(pLogicNode->pValues);
|
||||||
nodesDestroyList(pLogicNode->pFillExprs);
|
nodesDestroyList(pLogicNode->pFillExprs);
|
||||||
nodesDestroyList(pLogicNode->pNotFillExprs);
|
nodesDestroyList(pLogicNode->pNotFillExprs);
|
||||||
|
nodesDestroyList(pLogicNode->pFillNullExprs);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case QUERY_NODE_LOGIC_PLAN_SORT: {
|
case QUERY_NODE_LOGIC_PLAN_SORT: {
|
||||||
|
@ -1666,6 +1667,7 @@ void nodesDestroyNode(SNode* pNode) {
|
||||||
nodesDestroyList(pPhyNode->pNotFillExprs);
|
nodesDestroyList(pPhyNode->pNotFillExprs);
|
||||||
nodesDestroyNode(pPhyNode->pWStartTs);
|
nodesDestroyNode(pPhyNode->pWStartTs);
|
||||||
nodesDestroyNode(pPhyNode->pValues);
|
nodesDestroyNode(pPhyNode->pValues);
|
||||||
|
nodesDestroyList(pPhyNode->pFillNullExprs);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION:
|
case QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION:
|
||||||
|
|
|
@ -1279,71 +1279,139 @@ static int32_t createWindowLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSele
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
typedef struct SCollectFillExprsCtx {
|
||||||
|
SHashObj* pPseudoCols;
|
||||||
|
SNodeList* pFillExprs;
|
||||||
|
SNodeList* pNotFillExprs;
|
||||||
|
bool collectAggFuncs;
|
||||||
|
SNodeList* pAggFuncCols;
|
||||||
|
} SCollectFillExprsCtx;
|
||||||
|
|
||||||
|
typedef struct SWalkFillSubExprCtx {
|
||||||
|
bool hasFillCol;
|
||||||
|
bool hasPseudoWinCol;
|
||||||
|
bool hasGroupKeyCol;
|
||||||
|
SCollectFillExprsCtx* pCollectFillCtx;
|
||||||
|
int32_t code;
|
||||||
|
} SWalkFillSubExprCtx;
|
||||||
|
|
||||||
|
static bool nodeAlreadyContained(SNodeList* pList, SNode* pNode) {
|
||||||
|
SNode* pExpr = NULL;
|
||||||
|
FOREACH(pExpr, pList) {
|
||||||
|
if (nodesEqualNode(pExpr, pNode)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
static EDealRes needFillValueImpl(SNode* pNode, void* pContext) {
|
static EDealRes needFillValueImpl(SNode* pNode, void* pContext) {
|
||||||
|
SWalkFillSubExprCtx *pCtx = pContext;
|
||||||
|
EDealRes res = DEAL_RES_CONTINUE;
|
||||||
if (QUERY_NODE_COLUMN == nodeType(pNode)) {
|
if (QUERY_NODE_COLUMN == nodeType(pNode)) {
|
||||||
SColumnNode* pCol = (SColumnNode*)pNode;
|
SColumnNode* pCol = (SColumnNode*)pNode;
|
||||||
if (COLUMN_TYPE_WINDOW_START != pCol->colType && COLUMN_TYPE_WINDOW_END != pCol->colType &&
|
if (COLUMN_TYPE_WINDOW_START == pCol->colType || COLUMN_TYPE_WINDOW_END == pCol->colType ||
|
||||||
COLUMN_TYPE_WINDOW_DURATION != pCol->colType && COLUMN_TYPE_GROUP_KEY != pCol->colType) {
|
COLUMN_TYPE_WINDOW_DURATION == pCol->colType) {
|
||||||
*(bool*)pContext = true;
|
pCtx->hasPseudoWinCol = true;
|
||||||
return DEAL_RES_END;
|
pCtx->code =
|
||||||
|
taosHashPut(pCtx->pCollectFillCtx->pPseudoCols, pCol->colName, TSDB_COL_NAME_LEN, &pNode, POINTER_BYTES);
|
||||||
|
} else if (COLUMN_TYPE_GROUP_KEY == pCol->colType || COLUMN_TYPE_TBNAME == pCol->colType ||
|
||||||
|
COLUMN_TYPE_TAG == pCol->colType) {
|
||||||
|
pCtx->hasGroupKeyCol = true;
|
||||||
|
pCtx->code =
|
||||||
|
taosHashPut(pCtx->pCollectFillCtx->pPseudoCols, pCol->colName, TSDB_COL_NAME_LEN, &pNode, POINTER_BYTES);
|
||||||
|
} else {
|
||||||
|
pCtx->hasFillCol = true;
|
||||||
|
if (pCtx->pCollectFillCtx->collectAggFuncs) {
|
||||||
|
// Agg funcs has already been rewriten to columns by Interval
|
||||||
|
// Here, we return DEAL_RES_CONTINUE cause we need to collect all agg funcs
|
||||||
|
if (!nodeAlreadyContained(pCtx->pCollectFillCtx->pFillExprs, pNode) &&
|
||||||
|
!nodeAlreadyContained(pCtx->pCollectFillCtx->pAggFuncCols, pNode))
|
||||||
|
pCtx->code = nodesListMakeStrictAppend(&pCtx->pCollectFillCtx->pAggFuncCols, pNode);
|
||||||
|
} else {
|
||||||
|
res = DEAL_RES_END;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return DEAL_RES_CONTINUE;
|
}
|
||||||
|
if (pCtx->code != TSDB_CODE_SUCCESS) res = DEAL_RES_ERROR;
|
||||||
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool needFillValue(SNode* pNode) {
|
static void needFillValue(SNode* pNode, SWalkFillSubExprCtx* pCtx) {
|
||||||
bool hasFillCol = false;
|
nodesWalkExpr(pNode, needFillValueImpl, pCtx);
|
||||||
nodesWalkExpr(pNode, needFillValueImpl, &hasFillCol);
|
|
||||||
return hasFillCol;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t partFillExprs(SSelectStmt* pSelect, SNodeList** pFillExprs, SNodeList** pNotFillExprs) {
|
static int32_t collectFillExpr(SNode* pNode, SCollectFillExprsCtx* pCollectFillCtx) {
|
||||||
|
SNode* pNew = NULL;
|
||||||
|
SWalkFillSubExprCtx collectFillSubExprCtx = {
|
||||||
|
.hasFillCol = false, .hasPseudoWinCol = false, .hasGroupKeyCol = false, .pCollectFillCtx = pCollectFillCtx};
|
||||||
|
needFillValue(pNode, &collectFillSubExprCtx);
|
||||||
|
if (collectFillSubExprCtx.code != TSDB_CODE_SUCCESS) {
|
||||||
|
return collectFillSubExprCtx.code;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (collectFillSubExprCtx.hasFillCol && !pCollectFillCtx->collectAggFuncs) {
|
||||||
|
if (nodeType(pNode) == QUERY_NODE_ORDER_BY_EXPR) {
|
||||||
|
collectFillSubExprCtx.code = nodesCloneNode(((SOrderByExprNode*)pNode)->pExpr, &pNew);
|
||||||
|
} else {
|
||||||
|
collectFillSubExprCtx.code = nodesCloneNode(pNode, &pNew);
|
||||||
|
}
|
||||||
|
if (collectFillSubExprCtx.code == TSDB_CODE_SUCCESS) {
|
||||||
|
collectFillSubExprCtx.code = nodesListMakeStrictAppend(&pCollectFillCtx->pFillExprs, pNew);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return collectFillSubExprCtx.code;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t collectFillExprs(SSelectStmt* pSelect, SNodeList** pFillExprs, SNodeList** pNotFillExprs,
|
||||||
|
SNodeList** pPossibleFillNullCols) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
SNode* pProject = NULL;
|
SCollectFillExprsCtx collectFillCtx = {0};
|
||||||
FOREACH(pProject, pSelect->pProjectionList) {
|
SNode* pNode = NULL;
|
||||||
if (needFillValue(pProject)) {
|
collectFillCtx.pPseudoCols = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
||||||
SNode* pNew = NULL;
|
if (!collectFillCtx.pPseudoCols) return terrno;
|
||||||
code = nodesCloneNode(pProject, &pNew);
|
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
FOREACH(pNode, pSelect->pProjectionList) {
|
||||||
code = nodesListMakeStrictAppend(pFillExprs, pNew);
|
code = collectFillExpr(pNode, &collectFillCtx);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) break;
|
||||||
}
|
}
|
||||||
} else if (QUERY_NODE_VALUE != nodeType(pProject)) {
|
collectFillCtx.collectAggFuncs = true;
|
||||||
SNode* pNew = NULL;
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
code = nodesCloneNode(pProject, &pNew);
|
code = collectFillExpr(pSelect->pHaving, &collectFillCtx);
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
}
|
||||||
code = nodesListMakeStrictAppend(pNotFillExprs, pNew);
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
|
FOREACH(pNode, pSelect->pOrderByList) {
|
||||||
|
code = collectFillExpr(pNode, &collectFillCtx);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (TSDB_CODE_SUCCESS != code) {
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
NODES_DESTORY_LIST(*pFillExprs);
|
void* pIter = taosHashIterate(collectFillCtx.pPseudoCols, 0);
|
||||||
NODES_DESTORY_LIST(*pNotFillExprs);
|
while (pIter) {
|
||||||
|
SNode* pNode = *(SNode**)pIter, *pNew = NULL;
|
||||||
|
code = nodesCloneNode(pNode, &pNew);
|
||||||
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
|
code = nodesListMakeStrictAppend(&collectFillCtx.pNotFillExprs, pNew);
|
||||||
|
}
|
||||||
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
|
pIter = taosHashIterate(collectFillCtx.pPseudoCols, pIter);
|
||||||
|
} else {
|
||||||
|
taosHashCancelIterate(collectFillCtx.pPseudoCols, pIter);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (!pSelect->isDistinct) {
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
SNode* pOrderExpr = NULL;
|
TSWAP(*pFillExprs, collectFillCtx.pFillExprs);
|
||||||
FOREACH(pOrderExpr, pSelect->pOrderByList) {
|
TSWAP(*pNotFillExprs, collectFillCtx.pNotFillExprs);
|
||||||
SNode* pExpr = ((SOrderByExprNode*)pOrderExpr)->pExpr;
|
TSWAP(*pPossibleFillNullCols, collectFillCtx.pAggFuncCols);
|
||||||
if (needFillValue(pExpr)) {
|
|
||||||
SNode* pNew = NULL;
|
|
||||||
code = nodesCloneNode(pExpr, &pNew);
|
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
|
||||||
code = nodesListMakeStrictAppend(pFillExprs, pNew);
|
|
||||||
}
|
|
||||||
} else if (QUERY_NODE_VALUE != nodeType(pExpr)) {
|
|
||||||
SNode* pNew = NULL;
|
|
||||||
code = nodesCloneNode(pExpr, &pNew);
|
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
|
||||||
code = nodesListMakeStrictAppend(pNotFillExprs, pNew);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (TSDB_CODE_SUCCESS != code) {
|
|
||||||
NODES_DESTORY_LIST(*pFillExprs);
|
|
||||||
NODES_DESTORY_LIST(*pNotFillExprs);
|
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
if (collectFillCtx.pFillExprs) nodesDestroyList(collectFillCtx.pFillExprs);
|
||||||
|
if (collectFillCtx.pNotFillExprs) nodesDestroyList(collectFillCtx.pNotFillExprs);
|
||||||
|
if (collectFillCtx.pAggFuncCols) nodesDestroyList(collectFillCtx.pAggFuncCols);
|
||||||
}
|
}
|
||||||
|
taosHashCleanup(collectFillCtx.pPseudoCols);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1369,13 +1437,16 @@ static int32_t createFillLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
|
||||||
pFill->node.resultDataOrder = pFill->node.requireDataOrder;
|
pFill->node.resultDataOrder = pFill->node.requireDataOrder;
|
||||||
pFill->node.inputTsOrder = TSDB_ORDER_ASC;
|
pFill->node.inputTsOrder = TSDB_ORDER_ASC;
|
||||||
|
|
||||||
code = partFillExprs(pSelect, &pFill->pFillExprs, &pFill->pNotFillExprs);
|
code = collectFillExprs(pSelect, &pFill->pFillExprs, &pFill->pNotFillExprs, &pFill->pFillNullExprs);
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = rewriteExprsForSelect(pFill->pFillExprs, pSelect, SQL_CLAUSE_FILL, NULL);
|
code = rewriteExprsForSelect(pFill->pFillExprs, pSelect, SQL_CLAUSE_FILL, NULL);
|
||||||
}
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = rewriteExprsForSelect(pFill->pNotFillExprs, pSelect, SQL_CLAUSE_FILL, NULL);
|
code = rewriteExprsForSelect(pFill->pNotFillExprs, pSelect, SQL_CLAUSE_FILL, NULL);
|
||||||
}
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code && LIST_LENGTH(pFill->pFillNullExprs) > 0) {
|
||||||
|
code = createColumnByRewriteExprs(pFill->pFillNullExprs, &pFill->node.pTargets);
|
||||||
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = createColumnByRewriteExprs(pFill->pFillExprs, &pFill->node.pTargets);
|
code = createColumnByRewriteExprs(pFill->pFillExprs, &pFill->node.pTargets);
|
||||||
}
|
}
|
||||||
|
|
|
@ -2605,6 +2605,12 @@ static int32_t createFillPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = addDataBlockSlots(pCxt, pFill->pNotFillExprs, pFill->node.pOutputDataBlockDesc);
|
code = addDataBlockSlots(pCxt, pFill->pNotFillExprs, pFill->node.pOutputDataBlockDesc);
|
||||||
}
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code && LIST_LENGTH(pFillNode->pFillNullExprs) > 0) {
|
||||||
|
code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pFillNode->pFillNullExprs, &pFill->pFillNullExprs);
|
||||||
|
if (TSDB_CODE_SUCCESS == code ) {
|
||||||
|
code = addDataBlockSlots(pCxt, pFill->pFillNullExprs, pFill->node.pOutputDataBlockDesc);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = setNodeSlotId(pCxt, pChildTupe->dataBlockId, -1, pFillNode->pWStartTs, &pFill->pWStartTs);
|
code = setNodeSlotId(pCxt, pChildTupe->dataBlockId, -1, pFillNode->pWStartTs, &pFill->pWStartTs);
|
||||||
|
|
|
@ -27,7 +27,7 @@ typedef struct {
|
||||||
typedef struct SConnList {
|
typedef struct SConnList {
|
||||||
queue conns;
|
queue conns;
|
||||||
int32_t size;
|
int32_t size;
|
||||||
int32_t totaSize;
|
int32_t totalSize;
|
||||||
} SConnList;
|
} SConnList;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
@ -855,10 +855,9 @@ static int32_t cliGetConnFromPool(SCliThrd* pThrd, const char* key, SCliConn** p
|
||||||
}
|
}
|
||||||
|
|
||||||
if (QUEUE_IS_EMPTY(&plist->conns)) {
|
if (QUEUE_IS_EMPTY(&plist->conns)) {
|
||||||
if (plist->size >= pInst->connLimitNum) {
|
if (plist->totalSize >= pInst->connLimitNum) {
|
||||||
return TSDB_CODE_RPC_MAX_SESSIONS;
|
return TSDB_CODE_RPC_MAX_SESSIONS;
|
||||||
}
|
}
|
||||||
plist->totaSize += 1;
|
|
||||||
return TSDB_CODE_RPC_NETWORK_BUSY;
|
return TSDB_CODE_RPC_NETWORK_BUSY;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1046,7 +1045,7 @@ static int32_t cliCreateConn(SCliThrd* pThrd, SCliConn** pCliConn, char* ip, int
|
||||||
conn->hostThrd = pThrd;
|
conn->hostThrd = pThrd;
|
||||||
conn->seq = 0;
|
conn->seq = 0;
|
||||||
|
|
||||||
conn->pQTable = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
|
conn->pQTable = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
|
||||||
if (conn->pQTable == NULL) {
|
if (conn->pQTable == NULL) {
|
||||||
TAOS_CHECK_GOTO(terrno, NULL, _failed);
|
TAOS_CHECK_GOTO(terrno, NULL, _failed);
|
||||||
}
|
}
|
||||||
|
@ -1249,7 +1248,7 @@ static void cliHandleException(SCliConn* conn) {
|
||||||
cliDestroyAllQidFromThrd(conn);
|
cliDestroyAllQidFromThrd(conn);
|
||||||
QUEUE_REMOVE(&conn->q);
|
QUEUE_REMOVE(&conn->q);
|
||||||
if (conn->list) {
|
if (conn->list) {
|
||||||
conn->list->totaSize -= 1;
|
conn->list->totalSize -= 1;
|
||||||
conn->list = NULL;
|
conn->list = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1548,10 +1547,15 @@ static int32_t cliDoConn(SCliThrd* pThrd, SCliConn* conn) {
|
||||||
}
|
}
|
||||||
|
|
||||||
transRefCliHandle(conn);
|
transRefCliHandle(conn);
|
||||||
|
|
||||||
|
conn->list = taosHashGet((SHashObj*)pThrd->pool, conn->dstAddr, strlen(conn->dstAddr));
|
||||||
|
if (conn->list != NULL) {
|
||||||
|
conn->list->totalSize += 1;
|
||||||
|
}
|
||||||
|
|
||||||
ret = uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb);
|
ret = uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb);
|
||||||
if (ret != 0) {
|
if (ret != 0) {
|
||||||
tError("failed connect to %s since %s", conn->dstAddr, uv_err_name(ret));
|
tError("failed connect to %s since %s", conn->dstAddr, uv_err_name(ret));
|
||||||
|
|
||||||
TAOS_CHECK_GOTO(TSDB_CODE_THIRDPARTY_ERROR, &lino, _exception1);
|
TAOS_CHECK_GOTO(TSDB_CODE_THIRDPARTY_ERROR, &lino, _exception1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2363,7 +2367,7 @@ static int32_t createThrdObj(void* trans, SCliThrd** ppThrd) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pThrd->pool = createConnPool(4);
|
pThrd->pool = createConnPool(128);
|
||||||
if (pThrd->pool == NULL) {
|
if (pThrd->pool == NULL) {
|
||||||
code = terrno;
|
code = terrno;
|
||||||
TAOS_CHECK_GOTO(terrno, NULL, _end);
|
TAOS_CHECK_GOTO(terrno, NULL, _end);
|
||||||
|
@ -2382,22 +2386,22 @@ static int32_t createThrdObj(void* trans, SCliThrd** ppThrd) {
|
||||||
|
|
||||||
pThrd->destroyAhandleFp = pInst->destroyFp;
|
pThrd->destroyAhandleFp = pInst->destroyFp;
|
||||||
|
|
||||||
pThrd->fqdn2ipCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
pThrd->fqdn2ipCache = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
||||||
if (pThrd->fqdn2ipCache == NULL) {
|
if (pThrd->fqdn2ipCache == NULL) {
|
||||||
TAOS_CHECK_GOTO(terrno, NULL, _end);
|
TAOS_CHECK_GOTO(terrno, NULL, _end);
|
||||||
}
|
}
|
||||||
|
|
||||||
pThrd->batchCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
pThrd->batchCache = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
||||||
if (pThrd->batchCache == NULL) {
|
if (pThrd->batchCache == NULL) {
|
||||||
TAOS_CHECK_GOTO(terrno, NULL, _end);
|
TAOS_CHECK_GOTO(terrno, NULL, _end);
|
||||||
}
|
}
|
||||||
|
|
||||||
pThrd->connHeapCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
pThrd->connHeapCache = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
||||||
if (pThrd->connHeapCache == NULL) {
|
if (pThrd->connHeapCache == NULL) {
|
||||||
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end);
|
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end);
|
||||||
}
|
}
|
||||||
|
|
||||||
pThrd->pIdConnTable = taosHashInit(512, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
pThrd->pIdConnTable = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
||||||
if (pThrd->connHeapCache == NULL) {
|
if (pThrd->connHeapCache == NULL) {
|
||||||
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end);
|
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end);
|
||||||
}
|
}
|
||||||
|
@ -3739,7 +3743,7 @@ static FORCE_INLINE int8_t shouldSWitchToOtherConn(SCliConn* pConn, char* key) {
|
||||||
tTrace("conn %p get list %p from pool for key:%s", pConn, pConn->list, key);
|
tTrace("conn %p get list %p from pool for key:%s", pConn, pConn->list, key);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (pConn->list && pConn->list->totaSize >= pInst->connLimitNum / 4) {
|
if (pConn->list && pConn->list->totalSize >= pInst->connLimitNum / 4) {
|
||||||
tWarn("%s conn %p try to remove timeout msg since too many conn created", transLabel(pInst), pConn);
|
tWarn("%s conn %p try to remove timeout msg since too many conn created", transLabel(pInst), pConn);
|
||||||
|
|
||||||
if (cliConnRemoveTimeoutMsg(pConn)) {
|
if (cliConnRemoveTimeoutMsg(pConn)) {
|
||||||
|
|
|
@ -239,7 +239,7 @@ SIpWhiteListTab* uvWhiteListCreate() {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
pWhiteList->pList = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), 0, HASH_NO_LOCK);
|
pWhiteList->pList = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), 0, HASH_NO_LOCK);
|
||||||
if (pWhiteList->pList == NULL) {
|
if (pWhiteList->pList == NULL) {
|
||||||
taosMemoryFree(pWhiteList);
|
taosMemoryFree(pWhiteList);
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -1333,7 +1333,7 @@ static FORCE_INLINE SSvrConn* createConn(void* hThrd) {
|
||||||
QUEUE_INIT(&exh->q);
|
QUEUE_INIT(&exh->q);
|
||||||
tTrace("%s handle %p, conn %p created, refId:%" PRId64, transLabel(pInst), exh, pConn, pConn->refId);
|
tTrace("%s handle %p, conn %p created, refId:%" PRId64, transLabel(pInst), exh, pConn, pConn->refId);
|
||||||
|
|
||||||
pConn->pQTable = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
|
pConn->pQTable = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
|
||||||
if (pConn->pQTable == NULL) {
|
if (pConn->pQTable == NULL) {
|
||||||
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _end);
|
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _end);
|
||||||
}
|
}
|
||||||
|
|
|
@ -237,11 +237,123 @@ class TDTestCase:
|
||||||
tdSql.checkData(12, 1, None)
|
tdSql.checkData(12, 1, None)
|
||||||
tdSql.checkData(13, 1, None)
|
tdSql.checkData(13, 1, None)
|
||||||
|
|
||||||
|
def test_fill_with_complex_expr(self):
|
||||||
|
sql = "SELECT _wstart, _wstart + 1d, count(*), now, 1+1 FROM meters WHERE ts >= '2018-09-20 00:00:00.000' AND ts < '2018-09-20 01:00:00.000' INTERVAL(5m) FILL(NULL)"
|
||||||
|
tdSql.query(sql, queryTimes=1)
|
||||||
|
tdSql.checkRows(12)
|
||||||
|
for i in range(0, 12, 2):
|
||||||
|
tdSql.checkData(i, 2, 10)
|
||||||
|
for i in range(1, 12, 2):
|
||||||
|
tdSql.checkData(i, 2, None)
|
||||||
|
for i in range(0, 12):
|
||||||
|
firstCol = tdSql.getData(i, 0)
|
||||||
|
secondCol = tdSql.getData(i, 1)
|
||||||
|
tdLog.debug(f"firstCol: {firstCol}, secondCol: {secondCol}, secondCol - firstCol: {secondCol - firstCol}")
|
||||||
|
if secondCol - firstCol != timedelta(days=1):
|
||||||
|
tdLog.exit(f"query error: secondCol - firstCol: {secondCol - firstCol}")
|
||||||
|
nowCol = tdSql.getData(i, 3)
|
||||||
|
if nowCol is None:
|
||||||
|
tdLog.exit(f"query error: nowCol: {nowCol}")
|
||||||
|
constCol = tdSql.getData(i, 4)
|
||||||
|
if constCol != 2:
|
||||||
|
tdLog.exit(f"query error: constCol: {constCol}")
|
||||||
|
|
||||||
|
sql = "SELECT _wstart + 1d, count(*), last(ts) + 1a, timediff(_wend, last(ts)) FROM meters WHERE ts >= '2018-09-20 00:00:00.000' AND ts < '2018-09-20 01:00:00.000' INTERVAL(5m) FILL(NULL)"
|
||||||
|
tdSql.query(sql, queryTimes=1)
|
||||||
|
tdSql.checkRows(12)
|
||||||
|
for i in range(0, 12, 2):
|
||||||
|
tdSql.checkData(i, 1, 10)
|
||||||
|
tdSql.checkData(i, 3, 300000)
|
||||||
|
for i in range(1, 12, 2):
|
||||||
|
tdSql.checkData(i, 1, None)
|
||||||
|
tdSql.checkData(i, 2, None)
|
||||||
|
tdSql.checkData(i, 3, None)
|
||||||
|
|
||||||
|
sql = "SELECT count(*), tbname FROM meters WHERE ts >= '2018-09-20 00:00:00.000' AND ts < '2018-09-20 01:00:00.000' PARTITION BY tbname INTERVAL(5m) FILL(NULL)"
|
||||||
|
tdSql.query(sql, queryTimes=1)
|
||||||
|
tdSql.checkRows(120)
|
||||||
|
|
||||||
|
sql = "SELECT * from (SELECT count(*), timediff(_wend, last(ts)) + t1, tbname FROM meters WHERE ts >= '2018-09-20 00:00:00.000' AND ts < '2018-09-20 01:00:00.000' PARTITION BY tbname, t1 INTERVAL(5m) FILL(NULL) LIMIT 1) order by tbname"
|
||||||
|
tdSql.query(sql, queryTimes=1)
|
||||||
|
tdSql.checkRows(10)
|
||||||
|
j = 0
|
||||||
|
for i in range(0, 10):
|
||||||
|
tdSql.checkData(i, 1, 300000 + j)
|
||||||
|
j = j + 1
|
||||||
|
if j == 5:
|
||||||
|
j = 0
|
||||||
|
|
||||||
|
sql = "SELECT count(*), timediff(_wend, last(ts)) + t1, tbname,t1 FROM meters WHERE ts >= '2018-09-20 00:00:00.000' AND ts < '2018-09-20 01:00:00.000' PARTITION BY tbname, t1 INTERVAL(5m) FILL(NULL) ORDER BY timediff(last(ts), _wstart)"
|
||||||
|
tdSql.query(sql, queryTimes=1)
|
||||||
|
tdSql.checkRows(120)
|
||||||
|
|
||||||
|
sql = "SELECT 1+1, count(*), timediff(_wend, last(ts)) + t1 FROM meters WHERE ts >= '2018-09-20 00:00:00.000' AND ts < '2018-09-20 01:00:00.000' PARTITION BY tbname, t1 INTERVAL(5m) FILL(NULL) HAVING(timediff(last(ts), _wstart)+ t1 >= 1) ORDER BY timediff(last(ts), _wstart)"
|
||||||
|
tdSql.query(sql, queryTimes=1)
|
||||||
|
tdSql.checkRows(48)
|
||||||
|
|
||||||
|
sql = "SELECT count(*), timediff(_wend, last(ts)) + t1, timediff('2018-09-20 01:00:00', _wstart) + t1, concat(to_char(_wstart, 'HH:MI:SS__'), tbname) FROM meters WHERE ts >= '2018-09-20 00:00:00.000' AND ts < '2018-09-20 01:00:00.000' PARTITION BY tbname, t1 INTERVAL(5m) FILL(NULL) HAVING(timediff(last(ts), _wstart) + t1 >= 1) ORDER BY timediff(last(ts), _wstart), tbname"
|
||||||
|
tdSql.query(sql, queryTimes=1)
|
||||||
|
tdSql.checkRows(48)
|
||||||
|
|
||||||
|
sql = "SELECT count(*) FROM meters WHERE ts >= '2018-09-20 00:00:00.000' AND ts < '2018-09-20 01:00:00.000' PARTITION BY tbname, t1 INTERVAL(5m) FILL(NULL) HAVING(timediff(last(ts), _wstart) >= 0)"
|
||||||
|
tdSql.query(sql, queryTimes=1)
|
||||||
|
tdSql.checkRows(60)
|
||||||
|
|
||||||
|
sql = "SELECT count(*) + 1 FROM meters WHERE ts >= '2018-09-20 00:00:00.000' AND ts < '2018-09-20 01:00:00.000' PARTITION BY tbname, t1 INTERVAL(5m) FILL(NULL) HAVING(count(*) > 1)"
|
||||||
|
tdSql.query(sql, queryTimes=1)
|
||||||
|
tdSql.checkRows(0)
|
||||||
|
|
||||||
|
sql = "SELECT count(*), timediff(_wend, last(ts)) + t1, timediff('2018-09-20 01:00:00', _wstart) + t1, concat(to_char(_wstart, 'HH:MI:SS__'), tbname) FROM meters WHERE ts >= '2018-09-20 00:00:00.000' AND ts < '2018-09-20 01:00:00.000' PARTITION BY tbname, t1 INTERVAL(5m) FILL(value, 0, 0) HAVING(timediff(last(ts), _wstart) + t1 >= 1) ORDER BY timediff(last(ts), _wstart), tbname"
|
||||||
|
tdSql.query(sql, queryTimes=1)
|
||||||
|
tdSql.checkRows(48)
|
||||||
|
sql = "SELECT count(*), timediff(_wend, last(ts)) + t1, timediff('2018-09-20 01:00:00', _wstart) + t1, concat(to_char(_wstart, 'HH:MI:SS__'), tbname) FROM meters WHERE ts >= '2018-09-20 00:00:00.000' AND ts < '2018-09-20 01:00:00.000' PARTITION BY tbname, t1 INTERVAL(5m) FILL(value, 0, 0) HAVING(count(*) >= 0) ORDER BY timediff(last(ts), _wstart), tbname"
|
||||||
|
tdSql.query(sql, queryTimes=1)
|
||||||
|
tdSql.checkRows(120)
|
||||||
|
sql = "SELECT count(*), timediff(_wend, last(ts)) + t1, timediff('2018-09-20 01:00:00', _wstart) + t1, concat(to_char(_wstart, 'HH:MI:SS__'), tbname) FROM meters WHERE ts >= '2018-09-20 00:00:00.000' AND ts < '2018-09-20 01:00:00.000' PARTITION BY tbname, t1 INTERVAL(5m) FILL(value, 0, 0) HAVING(count(*) > 0) ORDER BY timediff(last(ts), _wstart), tbname"
|
||||||
|
tdSql.query(sql, queryTimes=1)
|
||||||
|
tdSql.checkRows(60)
|
||||||
|
sql = "SELECT count(*), timediff(_wend, last(ts)) + t1, timediff('2018-09-20 01:00:00', _wstart) + t1, concat(to_char(_wstart, 'HH:MI:SS__'), tbname) FROM meters WHERE ts >= '2018-09-20 00:00:00.000' AND ts < '2018-09-20 01:00:00.000' PARTITION BY tbname INTERVAL(5m) FILL(linear) HAVING(count(*) >= 0 and t1 <= 1) ORDER BY timediff(last(ts), _wstart), tbname, t1"
|
||||||
|
tdSql.query(sql, queryTimes=1)
|
||||||
|
tdSql.checkRows(44)
|
||||||
|
sql = "SELECT count(*), timediff(_wend, last(ts)) + t1, timediff('2018-09-20 01:00:00', _wstart) + t1, concat(to_char(_wstart, 'HH:MI:SS__'), tbname) FROM meters WHERE ts >= '2018-09-20 00:00:00.000' AND ts < '2018-09-20 01:00:00.000' PARTITION BY tbname INTERVAL(5m) FILL(prev) HAVING(count(*) >= 0 and t1 > 1) ORDER BY timediff(last(ts), _wstart), tbname, t1"
|
||||||
|
tdSql.query(sql, queryTimes=1)
|
||||||
|
tdSql.checkRows(72)
|
||||||
|
|
||||||
|
sql = "SELECT 1+1, count(*), timediff(_wend, last(ts)) + t1, timediff('2018-09-20 01:00:00', _wstart) + t1, concat(to_char(_wstart, 'HH:MI:SS__'), tbname) FROM meters WHERE ts >= '2018-09-20 00:00:00.000' AND ts < '2018-09-20 01:00:00.000' PARTITION BY tbname INTERVAL(5m) FILL(linear) ORDER BY tbname, _wstart;"
|
||||||
|
tdSql.query(sql, queryTimes=1)
|
||||||
|
tdSql.checkRows(120)
|
||||||
|
for i in range(11, 120, 12):
|
||||||
|
tdSql.checkData(i, 1, None)
|
||||||
|
for i in range(0, 120):
|
||||||
|
tdSql.checkData(i, 0, 2)
|
||||||
|
|
||||||
|
sql = "SELECT count(*), timediff(_wend, last(ts)) + t1, timediff('2018-09-20 01:00:00', _wstart) + t1, concat(to_char(_wstart, 'HH:MI:SS__'), tbname) FROM meters WHERE ts >= '2018-09-20 00:00:00.000' AND ts < '2018-09-20 01:00:00.000' PARTITION BY tbname INTERVAL(5m) FILL(linear) HAVING(count(*) >= 0) ORDER BY tbname;"
|
||||||
|
tdSql.query(sql, queryTimes=1)
|
||||||
|
tdSql.checkRows(110)
|
||||||
|
for i in range(0, 110, 11):
|
||||||
|
lastCol = tdSql.getData(i, 3)
|
||||||
|
tdLog.debug(f"lastCol: {lastCol}")
|
||||||
|
if lastCol[-1:] != str(i//11):
|
||||||
|
tdLog.exit(f"query error: lastCol: {lastCol}")
|
||||||
|
|
||||||
|
sql = "SELECT 1+1, count(*), timediff(_wend, last(ts)) + t1, timediff('2018-09-20 01:00:00', _wstart) + t1,t1 FROM meters WHERE ts >= '2018-09-20 00:00:00.000' AND ts < '2018-09-20 01:00:00.000' PARTITION BY t1 INTERVAL(5m) FILL(linear) ORDER BY t1, _wstart;"
|
||||||
|
tdSql.query(sql, queryTimes=1)
|
||||||
|
tdSql.checkRows(60)
|
||||||
|
|
||||||
|
sql = "SELECT 1+1, count(*), timediff(_wend, last(ts)) + t1, timediff('2018-09-20 01:00:00', _wstart) + t1,t1 FROM meters WHERE ts >= '2018-09-20 00:00:00.000' AND ts < '2018-09-20 01:00:00.000' PARTITION BY t1 INTERVAL(5m) FILL(linear) HAVING(count(*) > 0) ORDER BY t1, _wstart;"
|
||||||
|
tdSql.query(sql, queryTimes=1)
|
||||||
|
tdSql.checkRows(55)
|
||||||
|
|
||||||
|
# TODO Fix Me!
|
||||||
|
sql = "explain SELECT count(*), timediff(_wend, last(ts)), timediff('2018-09-20 01:00:00', _wstart) FROM meters WHERE ts >= '2018-09-20 00:00:00.000' AND ts < '2018-09-20 01:00:00.000' PARTITION BY concat(tbname, 'asd') INTERVAL(5m) having(concat(tbname, 'asd') like '%asd');"
|
||||||
|
tdSql.error(sql, -2147473664) # Error: Planner internal error
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
self.prepareTestEnv()
|
self.prepareTestEnv()
|
||||||
self.test_partition_by_with_interval_fill_prev_new_group_fill_error()
|
self.test_partition_by_with_interval_fill_prev_new_group_fill_error()
|
||||||
self.test_fill_with_order_by()
|
self.test_fill_with_order_by()
|
||||||
self.test_fill_with_order_by2()
|
self.test_fill_with_order_by2()
|
||||||
|
self.test_fill_with_complex_expr()
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
tdSql.close()
|
tdSql.close()
|
||||||
|
|
Loading…
Reference in New Issue