Merge pull request #14001 from taosdata/feature/3.0_debug_wxy
feat: sql function 'interp'
This commit is contained in:
commit
3f702941a6
|
@ -106,6 +106,9 @@ typedef struct SInterpFuncLogicNode {
|
|||
SNodeList* pFuncs;
|
||||
STimeWindow timeRange;
|
||||
int64_t interval;
|
||||
EFillMode fillMode;
|
||||
SNode* pFillValues; // SNodeListNode
|
||||
SNode* pTimeSeries; // SColumnNode
|
||||
} SInterpFuncLogicNode;
|
||||
|
||||
typedef enum EModifyTableType { MODIFY_TABLE_TYPE_INSERT = 1, MODIFY_TABLE_TYPE_DELETE } EModifyTableType;
|
||||
|
@ -309,6 +312,9 @@ typedef struct SInterpFuncPhysiNode {
|
|||
SNodeList* pFuncs;
|
||||
STimeWindow timeRange;
|
||||
int64_t interval;
|
||||
EFillMode fillMode;
|
||||
SNode* pFillValues; // SNodeListNode
|
||||
SNode* pTimeSeries; // SColumnNode
|
||||
} SInterpFuncPhysiNode;
|
||||
|
||||
typedef struct SJoinPhysiNode {
|
||||
|
|
|
@ -564,6 +564,7 @@ int32_t* taosGetErrno();
|
|||
#define TSDB_CODE_PAR_STREAM_NOT_ALLOWED_FUNC TAOS_DEF_ERROR_CODE(0, 0x265A)
|
||||
#define TSDB_CODE_PAR_GROUP_BY_NOT_ALLOWED_FUNC TAOS_DEF_ERROR_CODE(0, 0x265B)
|
||||
#define TSDB_CODE_PAR_INVALID_TABLE_OPTION TAOS_DEF_ERROR_CODE(0, 0x265C)
|
||||
#define TSDB_CODE_PAR_INVALID_INTERP_CLAUSE TAOS_DEF_ERROR_CODE(0, 0x265D)
|
||||
|
||||
//planner
|
||||
#define TSDB_CODE_PLAN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x2700)
|
||||
|
|
|
@ -464,6 +464,9 @@ static SNode* logicInterpFuncCopy(const SInterpFuncLogicNode* pSrc, SInterpFuncL
|
|||
CLONE_NODE_LIST_FIELD(pFuncs);
|
||||
COPY_OBJECT_FIELD(timeRange, sizeof(STimeWindow));
|
||||
COPY_SCALAR_FIELD(interval);
|
||||
COPY_SCALAR_FIELD(fillMode);
|
||||
CLONE_NODE_FIELD(pFillValues);
|
||||
CLONE_NODE_FIELD(pTimeSeries);
|
||||
return (SNode*)pDst;
|
||||
}
|
||||
|
||||
|
|
|
@ -2133,6 +2133,9 @@ static const char* jkInterpFuncPhysiPlanFuncs = "Funcs";
|
|||
static const char* jkInterpFuncPhysiPlanStartTime = "StartTime";
|
||||
static const char* jkInterpFuncPhysiPlanEndTime = "EndTime";
|
||||
static const char* jkInterpFuncPhysiPlanInterval = "Interval";
|
||||
static const char* jkInterpFuncPhysiPlanFillMode = "FillMode";
|
||||
static const char* jkInterpFuncPhysiPlanFillValues = "FillValues";
|
||||
static const char* jkInterpFuncPhysiPlanTimeSeries = "TimeSeries";
|
||||
|
||||
static int32_t physiInterpFuncNodeToJson(const void* pObj, SJson* pJson) {
|
||||
const SInterpFuncPhysiNode* pNode = (const SInterpFuncPhysiNode*)pObj;
|
||||
|
@ -2153,6 +2156,15 @@ static int32_t physiInterpFuncNodeToJson(const void* pObj, SJson* pJson) {
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddIntegerToObject(pJson, jkInterpFuncPhysiPlanInterval, pNode->interval);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddIntegerToObject(pJson, jkInterpFuncPhysiPlanFillMode, pNode->fillMode);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddObject(pJson, jkInterpFuncPhysiPlanFillValues, nodeToJson, pNode->pFillValues);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddObject(pJson, jkInterpFuncPhysiPlanTimeSeries, nodeToJson, pNode->pTimeSeries);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
@ -2176,6 +2188,15 @@ static int32_t jsonToPhysiInterpFuncNode(const SJson* pJson, void* pObj) {
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonGetBigIntValue(pJson, jkInterpFuncPhysiPlanInterval, &pNode->interval);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
tjsonGetNumberValue(pJson, jkInterpFuncPhysiPlanFillMode, pNode->fillMode, code);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = jsonToNodeObject(pJson, jkInterpFuncPhysiPlanFillValues, &pNode->pFillValues);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = jsonToNodeObject(pJson, jkInterpFuncPhysiPlanTimeSeries, &pNode->pTimeSeries);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -700,8 +700,11 @@ SNode* addEveryClause(SAstCreateContext* pCxt, SNode* pStmt, SNode* pEvery) {
|
|||
|
||||
SNode* addFillClause(SAstCreateContext* pCxt, SNode* pStmt, SNode* pFill) {
|
||||
CHECK_PARSER_STATUS(pCxt);
|
||||
if (QUERY_NODE_SELECT_STMT == nodeType(pStmt)) {
|
||||
((SSelectStmt*)pStmt)->pFill = pFill;
|
||||
if (QUERY_NODE_SELECT_STMT == nodeType(pStmt) && NULL != pFill) {
|
||||
SFillNode* pFillClause = (SFillNode*)pFill;
|
||||
nodesDestroyNode(pFillClause->pWStartTs);
|
||||
pFillClause->pWStartTs = createPrimaryKeyCol(pCxt);
|
||||
((SSelectStmt*)pStmt)->pFill = (SNode*)pFillClause;
|
||||
}
|
||||
return pStmt;
|
||||
}
|
||||
|
|
|
@ -1932,26 +1932,33 @@ static int32_t getFillTimeRange(STranslateContext* pCxt, SNode* pWhere, STimeWin
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t checkFill(STranslateContext* pCxt, SIntervalWindowNode* pInterval) {
|
||||
SFillNode* pFill = (SFillNode*)pInterval->pFill;
|
||||
static int32_t checkFill(STranslateContext* pCxt, SFillNode* pFill, SValueNode* pInterval) {
|
||||
if (FILL_MODE_NONE == pFill->mode) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (TSWINDOW_IS_EQUAL(pFill->timeRange, TSWINDOW_INITIALIZER) ||
|
||||
TSWINDOW_IS_EQUAL(pFill->timeRange, TSWINDOW_DESC_INITIALIZER)) {
|
||||
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_FILL_TIME_RANGE);
|
||||
}
|
||||
|
||||
// interp FILL clause
|
||||
if (NULL == pInterval) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int64_t timeRange = TABS(pFill->timeRange.skey - pFill->timeRange.ekey);
|
||||
int64_t intervalRange = 0;
|
||||
SValueNode* pInter = (SValueNode*)pInterval->pInterval;
|
||||
if (TIME_IS_VAR_DURATION(pInter->unit)) {
|
||||
if (TIME_IS_VAR_DURATION(pInterval->unit)) {
|
||||
int64_t f = 1;
|
||||
if (pInter->unit == 'n') {
|
||||
if (pInterval->unit == 'n') {
|
||||
f = 30L * MILLISECOND_PER_DAY;
|
||||
} else if (pInter->unit == 'y') {
|
||||
} else if (pInterval->unit == 'y') {
|
||||
f = 365L * MILLISECOND_PER_DAY;
|
||||
}
|
||||
intervalRange = pInter->datum.i * f;
|
||||
intervalRange = pInterval->datum.i * f;
|
||||
} else {
|
||||
intervalRange = pInter->datum.i;
|
||||
intervalRange = pInterval->datum.i;
|
||||
}
|
||||
if ((timeRange == 0) || (timeRange / intervalRange) >= MAX_INTERVAL_TIME_WINDOW) {
|
||||
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_FILL_TIME_RANGE);
|
||||
|
@ -1967,7 +1974,7 @@ static int32_t translateFill(STranslateContext* pCxt, SNode* pWhere, SIntervalWi
|
|||
|
||||
int32_t code = getFillTimeRange(pCxt, pWhere, &(((SFillNode*)pInterval->pFill)->timeRange));
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = checkFill(pCxt, pInterval);
|
||||
code = checkFill(pCxt, (SFillNode*)pInterval->pFill, (SValueNode*)pInterval->pInterval);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
@ -2109,6 +2116,64 @@ static int32_t translateWindow(STranslateContext* pCxt, SSelectStmt* pSelect) {
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t createDefaultFillNode(STranslateContext* pCxt, SNode** pOutput) {
|
||||
SFillNode* pFill = (SFillNode*)nodesMakeNode(QUERY_NODE_FILL);
|
||||
if (NULL == pFill) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
pFill->mode = FILL_MODE_NONE;
|
||||
|
||||
SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
|
||||
if (NULL == pCol) {
|
||||
nodesDestroyNode((SNode*)pFill);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
pCol->colId = PRIMARYKEY_TIMESTAMP_COL_ID;
|
||||
strcpy(pCol->colName, PK_TS_COL_INTERNAL_NAME);
|
||||
pFill->pWStartTs = (SNode*)pCol;
|
||||
|
||||
*pOutput = (SNode*)pFill;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t translateInterpFill(STranslateContext* pCxt, SSelectStmt* pSelect) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
if (NULL == pSelect->pFill) {
|
||||
code = createDefaultFillNode(pCxt, &pSelect->pFill);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = translateExpr(pCxt, &pSelect->pFill);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = getFillTimeRange(pCxt, pSelect->pRange, &(((SFillNode*)pSelect->pFill)->timeRange));
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = checkFill(pCxt, (SFillNode*)pSelect->pFill, (SValueNode*)pSelect->pEvery);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t translateInterp(STranslateContext* pCxt, SSelectStmt* pSelect) {
|
||||
if (!pSelect->hasInterpFunc) {
|
||||
if (NULL != pSelect->pRange || NULL != pSelect->pEvery || NULL != pSelect->pFill) {
|
||||
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_INTERP_CLAUSE);
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t code = translateExpr(pCxt, &pSelect->pRange);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = translateExpr(pCxt, &pSelect->pEvery);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = translateInterpFill(pCxt, pSelect);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t translatePartitionBy(STranslateContext* pCxt, SNodeList* pPartitionByList) {
|
||||
pCxt->currClause = SQL_CLAUSE_PARTITION_BY;
|
||||
return translateExprList(pCxt, pPartitionByList);
|
||||
|
@ -2378,6 +2443,9 @@ static int32_t translateSelect(STranslateContext* pCxt, SSelectStmt* pSelect) {
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = checkLimit(pCxt, pSelect);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = translateInterp(pCxt, pSelect);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = rewriteUniqueStmt(pCxt, pSelect);
|
||||
}
|
||||
|
@ -5467,8 +5535,7 @@ static int32_t buildUpdateOptionsReq(STranslateContext* pCxt, SAlterTableStmt* p
|
|||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
pReq->newCommentLen = strlen(pReq->newComment);
|
||||
}
|
||||
else{
|
||||
} else {
|
||||
pReq->newCommentLen = -1;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -196,6 +196,8 @@ static char* getSyntaxErrFormat(int32_t errCode) {
|
|||
return "%s function does not supportted in group query";
|
||||
case TSDB_CODE_PAR_INVALID_TABLE_OPTION:
|
||||
return "Invalid option %s";
|
||||
case TSDB_CODE_PAR_INVALID_INTERP_CLAUSE:
|
||||
return "Invalid usage of RANGE clause, EVERY clause or FILL clause";
|
||||
case TSDB_CODE_OUT_OF_MEMORY:
|
||||
return "Out of memory";
|
||||
default:
|
||||
|
|
|
@ -267,8 +267,6 @@ TEST_F(ParserSelectTest, interp) {
|
|||
|
||||
run("SELECT INTERP(c1) FROM t1 EVERY(5s)");
|
||||
|
||||
run("SELECT INTERP(c1) FROM t1 EVERY(5s) FILL(LINEAR)");
|
||||
|
||||
run("SELECT INTERP(c1) FROM t1 RANGE('2017-7-14 18:00:00', '2017-7-14 19:00:00') EVERY(5s)");
|
||||
|
||||
run("SELECT INTERP(c1) FROM t1 RANGE('2017-7-14 18:00:00', '2017-7-14 19:00:00') EVERY(5s) FILL(LINEAR)");
|
||||
|
|
|
@ -508,10 +508,15 @@ static int32_t createInterpFuncLogicNode(SLogicPlanContext* pCxt, SSelectStmt* p
|
|||
code = rewriteExprsForSelect(pInterpFunc->pFuncs, pSelect, SQL_CLAUSE_SELECT);
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code && NULL != pSelect->pRange) {
|
||||
// SRangeNode* pRange = (SRangeNode*)pSelect->pRange;
|
||||
// pInterpFunc->timeRange.skey = ((SValueNode*)pRange->pStart)->datum.i;
|
||||
// pInterpFunc->timeRange.ekey = ((SValueNode*)pRange->pEnd)->datum.i;
|
||||
if (TSDB_CODE_SUCCESS == code && NULL != pSelect->pFill) {
|
||||
SFillNode* pFill = (SFillNode*)pSelect->pFill;
|
||||
pInterpFunc->timeRange = pFill->timeRange;
|
||||
pInterpFunc->fillMode = pFill->mode;
|
||||
pInterpFunc->pTimeSeries = nodesCloneNode(pFill->pWStartTs);
|
||||
pInterpFunc->pFillValues = nodesCloneNode(pFill->pValues);
|
||||
if (NULL == pInterpFunc->pTimeSeries || (NULL != pFill->pValues && NULL == pInterpFunc->pFillValues)) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code && NULL != pSelect->pEvery) {
|
||||
|
|
|
@ -877,6 +877,12 @@ static int32_t createInterpFuncPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pCh
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
pInterpFunc->timeRange = pFuncLogicNode->timeRange;
|
||||
pInterpFunc->interval = pFuncLogicNode->interval;
|
||||
pInterpFunc->fillMode = pFuncLogicNode->fillMode;
|
||||
pInterpFunc->pFillValues = nodesCloneNode(pFuncLogicNode->pFillValues);
|
||||
pInterpFunc->pTimeSeries = nodesCloneNode(pFuncLogicNode->pTimeSeries);
|
||||
if (NULL == pInterpFunc->pTimeSeries || (NULL != pFuncLogicNode->pFillValues && NULL == pInterpFunc->pFillValues)) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
|
|
Loading…
Reference in New Issue