Merge remote-tracking branch 'origin/3.0' into feature/check
This commit is contained in:
commit
bf79e04e48
|
@ -1,6 +1,6 @@
|
||||||
cmake_minimum_required(VERSION 3.16)
|
cmake_minimum_required(VERSION 3.16)
|
||||||
|
|
||||||
set(CMAKE_VERBOSE_MAKEFILE ON)
|
set(CMAKE_VERBOSE_MAKEFILE OFF)
|
||||||
|
|
||||||
#set output directory
|
#set output directory
|
||||||
SET(LIBRARY_OUTPUT_PATH ${PROJECT_BINARY_DIR}/build/lib)
|
SET(LIBRARY_OUTPUT_PATH ${PROJECT_BINARY_DIR}/build/lib)
|
||||||
|
|
|
@ -46,7 +46,7 @@ typedef struct SScanLogicNode {
|
||||||
struct STableMeta* pMeta;
|
struct STableMeta* pMeta;
|
||||||
SVgroupsInfo* pVgroupList;
|
SVgroupsInfo* pVgroupList;
|
||||||
EScanType scanType;
|
EScanType scanType;
|
||||||
uint8_t scanFlag; // denotes reversed scan of data or not
|
uint8_t scanSeq[2]; // first is scan count, and second is reverse scan count
|
||||||
STimeWindow scanRange;
|
STimeWindow scanRange;
|
||||||
SName tableName;
|
SName tableName;
|
||||||
bool showRewrite;
|
bool showRewrite;
|
||||||
|
@ -189,9 +189,6 @@ typedef struct SScanPhysiNode {
|
||||||
SNodeList* pScanCols;
|
SNodeList* pScanCols;
|
||||||
uint64_t uid; // unique id of the table
|
uint64_t uid; // unique id of the table
|
||||||
int8_t tableType;
|
int8_t tableType;
|
||||||
int32_t order; // scan order: TSDB_ORDER_ASC|TSDB_ORDER_DESC
|
|
||||||
int32_t count; // repeat count
|
|
||||||
int32_t reverse; // reverse scan count
|
|
||||||
SName tableName;
|
SName tableName;
|
||||||
} SScanPhysiNode;
|
} SScanPhysiNode;
|
||||||
|
|
||||||
|
@ -207,7 +204,7 @@ typedef struct SSystemTableScanPhysiNode {
|
||||||
|
|
||||||
typedef struct STableScanPhysiNode {
|
typedef struct STableScanPhysiNode {
|
||||||
SScanPhysiNode scan;
|
SScanPhysiNode scan;
|
||||||
uint8_t scanFlag; // denotes reversed scan of data or not
|
uint8_t scanSeq[2]; // first is scan count, and second is reverse scan count
|
||||||
STimeWindow scanRange;
|
STimeWindow scanRange;
|
||||||
double ratio;
|
double ratio;
|
||||||
int32_t dataRequired;
|
int32_t dataRequired;
|
||||||
|
|
|
@ -37,6 +37,8 @@ typedef struct SPlanContext {
|
||||||
bool isStmtQuery;
|
bool isStmtQuery;
|
||||||
void* pTransporter;
|
void* pTransporter;
|
||||||
struct SCatalog* pCatalog;
|
struct SCatalog* pCatalog;
|
||||||
|
char* pMsg;
|
||||||
|
int32_t msgLen;
|
||||||
} SPlanContext;
|
} SPlanContext;
|
||||||
|
|
||||||
// Create the physical plan for the query, according to the AST.
|
// Create the physical plan for the query, according to the AST.
|
||||||
|
|
|
@ -623,6 +623,7 @@ int32_t* taosGetErrno();
|
||||||
#define TSDB_CODE_PAR_INVALID_DAYS_VALUE TAOS_DEF_ERROR_CODE(0, 0x2636)
|
#define TSDB_CODE_PAR_INVALID_DAYS_VALUE TAOS_DEF_ERROR_CODE(0, 0x2636)
|
||||||
#define TSDB_CODE_PAR_OFFSET_LESS_ZERO TAOS_DEF_ERROR_CODE(0, 0x2637)
|
#define TSDB_CODE_PAR_OFFSET_LESS_ZERO TAOS_DEF_ERROR_CODE(0, 0x2637)
|
||||||
#define TSDB_CODE_PAR_SLIMIT_LEAK_PARTITION_BY TAOS_DEF_ERROR_CODE(0, 0x2638)
|
#define TSDB_CODE_PAR_SLIMIT_LEAK_PARTITION_BY TAOS_DEF_ERROR_CODE(0, 0x2638)
|
||||||
|
#define TSDB_CODE_PAR_INVALID_TOPIC_QUERY TAOS_DEF_ERROR_CODE(0, 0x2639)
|
||||||
|
|
||||||
//planner
|
//planner
|
||||||
#define TSDB_CODE_PLAN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x2700)
|
#define TSDB_CODE_PLAN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x2700)
|
||||||
|
|
|
@ -232,7 +232,9 @@ int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArra
|
||||||
.mgmtEpSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp),
|
.mgmtEpSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp),
|
||||||
.pAstRoot = pQuery->pRoot,
|
.pAstRoot = pQuery->pRoot,
|
||||||
.showRewrite = pQuery->showRewrite,
|
.showRewrite = pQuery->showRewrite,
|
||||||
.pTransporter = pRequest->pTscObj->pAppInfo->pTransporter
|
.pTransporter = pRequest->pTscObj->pAppInfo->pTransporter,
|
||||||
|
.pMsg = pRequest->msgBuf,
|
||||||
|
.msgLen = ERROR_MSG_BUF_DEFAULT_SIZE
|
||||||
};
|
};
|
||||||
int32_t code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &cxt.pCatalog);
|
int32_t code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &cxt.pCatalog);
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
|
|
@ -146,10 +146,10 @@ int32_t dmProcessCreateNodeReq(SDnode *pDnode, EDndNodeType ntype, SNodeMsg *pMs
|
||||||
dError("node:%s, failed to create since %s", pWrapper->name, terrstr());
|
dError("node:%s, failed to create since %s", pWrapper->name, terrstr());
|
||||||
} else {
|
} else {
|
||||||
dDebug("node:%s, has been created", pWrapper->name);
|
dDebug("node:%s, has been created", pWrapper->name);
|
||||||
|
(void)dmOpenNode(pWrapper);
|
||||||
pWrapper->required = true;
|
pWrapper->required = true;
|
||||||
pWrapper->deployed = true;
|
pWrapper->deployed = true;
|
||||||
pWrapper->procType = pDnode->ptype;
|
pWrapper->procType = pDnode->ptype;
|
||||||
(void)dmOpenNode(pWrapper);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
taosThreadMutexUnlock(&pDnode->mutex);
|
taosThreadMutexUnlock(&pDnode->mutex);
|
||||||
|
@ -171,13 +171,13 @@ int32_t dmProcessDropNodeReq(SDnode *pDnode, EDndNodeType ntype, SNodeMsg *pMsg)
|
||||||
dError("node:%s, failed to drop since %s", pWrapper->name, terrstr());
|
dError("node:%s, failed to drop since %s", pWrapper->name, terrstr());
|
||||||
} else {
|
} else {
|
||||||
dDebug("node:%s, has been dropped", pWrapper->name);
|
dDebug("node:%s, has been dropped", pWrapper->name);
|
||||||
|
pWrapper->required = false;
|
||||||
|
pWrapper->deployed = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
dmReleaseWrapper(pWrapper);
|
dmReleaseWrapper(pWrapper);
|
||||||
|
|
||||||
if (code == 0) {
|
if (code == 0) {
|
||||||
pWrapper->required = false;
|
|
||||||
pWrapper->deployed = false;
|
|
||||||
dmCloseNode(pWrapper);
|
dmCloseNode(pWrapper);
|
||||||
taosRemoveDir(pWrapper->path);
|
taosRemoveDir(pWrapper->path);
|
||||||
}
|
}
|
||||||
|
|
|
@ -71,12 +71,15 @@ static void dmProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpSe
|
||||||
SNodeMsg *pMsg = NULL;
|
SNodeMsg *pMsg = NULL;
|
||||||
NodeMsgFp msgFp = NULL;
|
NodeMsgFp msgFp = NULL;
|
||||||
uint16_t msgType = pRpc->msgType;
|
uint16_t msgType = pRpc->msgType;
|
||||||
|
bool needRelease = false;
|
||||||
|
|
||||||
if (pEpSet && pEpSet->numOfEps > 0 && msgType == TDMT_MND_STATUS_RSP) {
|
if (pEpSet && pEpSet->numOfEps > 0 && msgType == TDMT_MND_STATUS_RSP) {
|
||||||
dmSetMnodeEpSet(pWrapper->pDnode, pEpSet);
|
dmSetMnodeEpSet(pWrapper->pDnode, pEpSet);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (dmMarkWrapper(pWrapper) != 0) goto _OVER;
|
if (dmMarkWrapper(pWrapper) != 0) goto _OVER;
|
||||||
|
|
||||||
|
needRelease = true;
|
||||||
if ((msgFp = dmGetMsgFp(pWrapper, pRpc)) == NULL) goto _OVER;
|
if ((msgFp = dmGetMsgFp(pWrapper, pRpc)) == NULL) goto _OVER;
|
||||||
if ((pMsg = taosAllocateQitem(sizeof(SNodeMsg))) == NULL) goto _OVER;
|
if ((pMsg = taosAllocateQitem(sizeof(SNodeMsg))) == NULL) goto _OVER;
|
||||||
if (dmBuildMsg(pMsg, pRpc) != 0) goto _OVER;
|
if (dmBuildMsg(pMsg, pRpc) != 0) goto _OVER;
|
||||||
|
@ -119,7 +122,9 @@ _OVER:
|
||||||
rpcFreeCont(pRpc->pCont);
|
rpcFreeCont(pRpc->pCont);
|
||||||
}
|
}
|
||||||
|
|
||||||
dmReleaseWrapper(pWrapper);
|
if (needRelease) {
|
||||||
|
dmReleaseWrapper(pWrapper);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void dmProcessMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
static void dmProcessMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||||
|
|
|
@ -1510,7 +1510,6 @@ static int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, char *pData, int64_t indexUid,
|
||||||
}
|
}
|
||||||
|
|
||||||
STSma *pTSma = pItem->pSma;
|
STSma *pTSma = pItem->pSma;
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
STSmaReadH tReadH = {0};
|
STSmaReadH tReadH = {0};
|
||||||
|
|
|
@ -344,11 +344,6 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
|
||||||
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
||||||
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pTagScanNode->node.pOutputDataBlockDesc->totalRowSize);
|
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pTagScanNode->node.pOutputDataBlockDesc->totalRowSize);
|
||||||
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
||||||
EXPLAIN_ROW_APPEND(EXPLAIN_LOOPS_FORMAT, pTagScanNode->count);
|
|
||||||
if (pTagScanNode->reverse) {
|
|
||||||
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
|
||||||
EXPLAIN_ROW_APPEND(EXPLAIN_REVERSE_FORMAT, pTagScanNode->reverse);
|
|
||||||
}
|
|
||||||
EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT);
|
EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT);
|
||||||
EXPLAIN_ROW_END();
|
EXPLAIN_ROW_END();
|
||||||
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level));
|
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level));
|
||||||
|
@ -361,10 +356,6 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
|
||||||
EXPLAIN_ROW_END();
|
EXPLAIN_ROW_END();
|
||||||
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
|
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
|
||||||
|
|
||||||
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_ORDER_FORMAT, EXPLAIN_ORDER_STRING(pTagScanNode->order));
|
|
||||||
EXPLAIN_ROW_END();
|
|
||||||
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
|
|
||||||
|
|
||||||
if (pResNode->pExecInfo) {
|
if (pResNode->pExecInfo) {
|
||||||
QRY_ERR_RET(qExplainBufAppendVerboseExecInfo(pResNode->pExecInfo, tbuf, &tlen));
|
QRY_ERR_RET(qExplainBufAppendVerboseExecInfo(pResNode->pExecInfo, tbuf, &tlen));
|
||||||
if (tlen) {
|
if (tlen) {
|
||||||
|
@ -388,11 +379,6 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
|
||||||
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
||||||
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pTblScanNode->scan.node.pOutputDataBlockDesc->totalRowSize);
|
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pTblScanNode->scan.node.pOutputDataBlockDesc->totalRowSize);
|
||||||
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
||||||
EXPLAIN_ROW_APPEND(EXPLAIN_LOOPS_FORMAT, pTblScanNode->scan.count);
|
|
||||||
if (pTblScanNode->scan.reverse) {
|
|
||||||
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
|
||||||
EXPLAIN_ROW_APPEND(EXPLAIN_REVERSE_FORMAT, pTblScanNode->scan.reverse);
|
|
||||||
}
|
|
||||||
EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT);
|
EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT);
|
||||||
EXPLAIN_ROW_END();
|
EXPLAIN_ROW_END();
|
||||||
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level));
|
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level));
|
||||||
|
@ -405,10 +391,6 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
|
||||||
EXPLAIN_ROW_END();
|
EXPLAIN_ROW_END();
|
||||||
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
|
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
|
||||||
|
|
||||||
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_ORDER_FORMAT, EXPLAIN_ORDER_STRING(pTblScanNode->scan.order));
|
|
||||||
EXPLAIN_ROW_END();
|
|
||||||
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
|
|
||||||
|
|
||||||
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_TIMERANGE_FORMAT, pTblScanNode->scanRange.skey, pTblScanNode->scanRange.ekey);
|
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_TIMERANGE_FORMAT, pTblScanNode->scanRange.skey, pTblScanNode->scanRange.ekey);
|
||||||
EXPLAIN_ROW_END();
|
EXPLAIN_ROW_END();
|
||||||
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
|
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
|
||||||
|
@ -434,11 +416,6 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
|
||||||
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
||||||
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pSTblScanNode->scan.node.pOutputDataBlockDesc->totalRowSize);
|
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pSTblScanNode->scan.node.pOutputDataBlockDesc->totalRowSize);
|
||||||
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
||||||
EXPLAIN_ROW_APPEND(EXPLAIN_LOOPS_FORMAT, pSTblScanNode->scan.count);
|
|
||||||
if (pSTblScanNode->scan.reverse) {
|
|
||||||
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
|
||||||
EXPLAIN_ROW_APPEND(EXPLAIN_REVERSE_FORMAT, pSTblScanNode->scan.reverse);
|
|
||||||
}
|
|
||||||
EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT);
|
EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT);
|
||||||
EXPLAIN_ROW_END();
|
EXPLAIN_ROW_END();
|
||||||
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level));
|
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level));
|
||||||
|
@ -451,10 +428,6 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
|
||||||
EXPLAIN_ROW_END();
|
EXPLAIN_ROW_END();
|
||||||
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
|
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
|
||||||
|
|
||||||
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_ORDER_FORMAT, EXPLAIN_ORDER_STRING(pSTblScanNode->scan.order));
|
|
||||||
EXPLAIN_ROW_END();
|
|
||||||
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
|
|
||||||
|
|
||||||
if (pSTblScanNode->scan.node.pConditions) {
|
if (pSTblScanNode->scan.node.pConditions) {
|
||||||
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT);
|
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT);
|
||||||
QRY_ERR_RET(nodesNodeToSQL(pSTblScanNode->scan.node.pConditions, tbuf + VARSTR_HEADER_SIZE, TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen));
|
QRY_ERR_RET(nodesNodeToSQL(pSTblScanNode->scan.node.pConditions, tbuf + VARSTR_HEADER_SIZE, TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen));
|
||||||
|
|
|
@ -1121,6 +1121,10 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt
|
||||||
// todo avoid case: top(k, 12), 12 is the value parameter.
|
// todo avoid case: top(k, 12), 12 is the value parameter.
|
||||||
// sum(11), 11 is also the value parameter.
|
// sum(11), 11 is also the value parameter.
|
||||||
if (createDummyCol && pOneExpr->base.numOfParams == 1) {
|
if (createDummyCol && pOneExpr->base.numOfParams == 1) {
|
||||||
|
pInput->totalRows = pBlock->info.rows;
|
||||||
|
pInput->numOfRows = pBlock->info.rows;
|
||||||
|
pInput->startRowIndex = 0;
|
||||||
|
|
||||||
code = doCreateConstantValColumnInfo(pInput, pFuncParam, pFuncParam->param.nType, j, pBlock->info.rows);
|
code = doCreateConstantValColumnInfo(pInput, pFuncParam, pFuncParam->param.nType, j, pBlock->info.rows);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
|
@ -6571,9 +6575,9 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
.offset = pTableScanNode->offset,
|
.offset = pTableScanNode->offset,
|
||||||
};
|
};
|
||||||
|
|
||||||
return createTableScanOperatorInfo(pDataReader, pScanPhyNode->order, numOfCols, pTableScanNode->dataRequired,
|
return createTableScanOperatorInfo(pDataReader, pTableScanNode->scanSeq[0] > 0 ? TSDB_ORDER_ASC : TSDB_ORDER_DESC,
|
||||||
pScanPhyNode->count, pScanPhyNode->reverse, pColList, pResBlock,
|
numOfCols, pTableScanNode->dataRequired, pTableScanNode->scanSeq[0], pTableScanNode->scanSeq[1], pColList,
|
||||||
pScanPhyNode->node.pConditions, &interval, pTableScanNode->ratio, pTaskInfo);
|
pResBlock, pScanPhyNode->node.pConditions, &interval, pTableScanNode->ratio, pTaskInfo);
|
||||||
} else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == type) {
|
} else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == type) {
|
||||||
SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pPhyNode;
|
SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pPhyNode;
|
||||||
SSDataBlock* pResBlock = createResDataBlock(pExchange->node.pOutputDataBlockDesc);
|
SSDataBlock* pResBlock = createResDataBlock(pExchange->node.pOutputDataBlockDesc);
|
||||||
|
@ -6721,7 +6725,7 @@ static tsdbReaderT createDataReaderImpl(STableScanPhysiNode* pTableScanNode, STa
|
||||||
void* readHandle, uint64_t queryId, uint64_t taskId) {
|
void* readHandle, uint64_t queryId, uint64_t taskId) {
|
||||||
STsdbQueryCond cond = {.loadExternalRows = false};
|
STsdbQueryCond cond = {.loadExternalRows = false};
|
||||||
|
|
||||||
cond.order = pTableScanNode->scan.order;
|
cond.order = pTableScanNode->scanSeq[0] > 0 ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
|
||||||
cond.numOfCols = LIST_LENGTH(pTableScanNode->scan.pScanCols);
|
cond.numOfCols = LIST_LENGTH(pTableScanNode->scan.pScanCols);
|
||||||
cond.colList = taosMemoryCalloc(cond.numOfCols, sizeof(SColumnInfo));
|
cond.colList = taosMemoryCalloc(cond.numOfCols, sizeof(SColumnInfo));
|
||||||
if (cond.colList == NULL) {
|
if (cond.colList == NULL) {
|
||||||
|
|
|
@ -40,6 +40,11 @@ bool getMinmaxFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||||
int32_t minFunction(SqlFunctionCtx* pCtx);
|
int32_t minFunction(SqlFunctionCtx* pCtx);
|
||||||
int32_t maxFunction(SqlFunctionCtx *pCtx);
|
int32_t maxFunction(SqlFunctionCtx *pCtx);
|
||||||
|
|
||||||
|
bool getAvgFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||||
|
bool avgFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
||||||
|
int32_t avgFunction(SqlFunctionCtx* pCtx);
|
||||||
|
int32_t avgFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t slotId);
|
||||||
|
|
||||||
bool getStddevFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
bool getStddevFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||||
bool stddevFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
bool stddevFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
||||||
int32_t stddevFunction(SqlFunctionCtx* pCtx);
|
int32_t stddevFunction(SqlFunctionCtx* pCtx);
|
||||||
|
|
|
@ -104,7 +104,7 @@ static int32_t translateCount(SFunctionNode* pFunc, char* pErrBuf, int32_t len)
|
||||||
if (1 != LIST_LENGTH(pFunc->pParameterList)) {
|
if (1 != LIST_LENGTH(pFunc->pParameterList)) {
|
||||||
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
|
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
|
||||||
}
|
}
|
||||||
pFunc->node.resType = (SDataType){.bytes = sizeof(int64_t), .type = TSDB_DATA_TYPE_BIGINT};
|
pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes, .type = TSDB_DATA_TYPE_BIGINT};
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -479,6 +479,16 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
.processFunc = stddevFunction,
|
.processFunc = stddevFunction,
|
||||||
.finalizeFunc = stddevFinalize
|
.finalizeFunc = stddevFinalize
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
.name = "avg",
|
||||||
|
.type = FUNCTION_TYPE_AVG,
|
||||||
|
.classification = FUNC_MGT_AGG_FUNC,
|
||||||
|
.translateFunc = translateInNumOutDou,
|
||||||
|
.getEnvFunc = getAvgFuncEnv,
|
||||||
|
.initFunc = avgFunctionSetup,
|
||||||
|
.processFunc = avgFunction,
|
||||||
|
.finalizeFunc = avgFinalize
|
||||||
|
},
|
||||||
{
|
{
|
||||||
.name = "percentile",
|
.name = "percentile",
|
||||||
.type = FUNCTION_TYPE_PERCENTILE,
|
.type = FUNCTION_TYPE_PERCENTILE,
|
||||||
|
|
|
@ -20,6 +20,59 @@
|
||||||
#include "tdatablock.h"
|
#include "tdatablock.h"
|
||||||
#include "tpercentile.h"
|
#include "tpercentile.h"
|
||||||
|
|
||||||
|
typedef struct SSumRes {
|
||||||
|
union {
|
||||||
|
int64_t isum;
|
||||||
|
uint64_t usum;
|
||||||
|
double dsum;
|
||||||
|
};
|
||||||
|
} SSumRes;
|
||||||
|
|
||||||
|
typedef struct SAvgRes {
|
||||||
|
double result;
|
||||||
|
SSumRes sum;
|
||||||
|
int64_t count;
|
||||||
|
} SAvgRes;
|
||||||
|
|
||||||
|
typedef struct STopBotResItem {
|
||||||
|
SVariant v;
|
||||||
|
uint64_t uid; // it is a table uid, used to extract tag data during building of the final result for the tag data
|
||||||
|
struct {
|
||||||
|
int32_t pageId;
|
||||||
|
int32_t offset;
|
||||||
|
} tuplePos; // tuple data of this chosen row
|
||||||
|
} STopBotResItem;
|
||||||
|
|
||||||
|
typedef struct STopBotRes {
|
||||||
|
int32_t pageId;
|
||||||
|
// int32_t num;
|
||||||
|
STopBotResItem *pItems;
|
||||||
|
} STopBotRes;
|
||||||
|
|
||||||
|
typedef struct SStddevRes {
|
||||||
|
double result;
|
||||||
|
int64_t count;
|
||||||
|
union {double quadraticDSum; int64_t quadraticISum;};
|
||||||
|
union {double dsum; int64_t isum;};
|
||||||
|
} SStddevRes;
|
||||||
|
|
||||||
|
typedef struct SPercentileInfo {
|
||||||
|
double result;
|
||||||
|
tMemBucket *pMemBucket;
|
||||||
|
int32_t stage;
|
||||||
|
double minval;
|
||||||
|
double maxval;
|
||||||
|
int64_t numOfElems;
|
||||||
|
} SPercentileInfo;
|
||||||
|
|
||||||
|
typedef struct SDiffInfo {
|
||||||
|
bool hasPrev;
|
||||||
|
bool includeNull;
|
||||||
|
bool ignoreNegative;
|
||||||
|
bool firstOutput;
|
||||||
|
union { int64_t i64; double d64;} prev;
|
||||||
|
} SDiffInfo;
|
||||||
|
|
||||||
#define SET_VAL(_info, numOfElem, res) \
|
#define SET_VAL(_info, numOfElem, res) \
|
||||||
do { \
|
do { \
|
||||||
if ((numOfElem) <= 0) { \
|
if ((numOfElem) <= 0) { \
|
||||||
|
@ -28,13 +81,50 @@
|
||||||
(_info)->numOfRes = (res); \
|
(_info)->numOfRes = (res); \
|
||||||
} while (0)
|
} while (0)
|
||||||
|
|
||||||
typedef struct SSumRes {
|
#define GET_TS_LIST(x) ((TSKEY*)((x)->ptsList))
|
||||||
union {
|
#define GET_TS_DATA(x, y) (GET_TS_LIST(x)[(y)])
|
||||||
int64_t isum;
|
|
||||||
uint64_t usum;
|
#define DO_UPDATE_TAG_COLUMNS_WITHOUT_TS(ctx) \
|
||||||
double dsum;
|
do { \
|
||||||
};
|
for (int32_t _i = 0; _i < (ctx)->tagInfo.numOfTagCols; ++_i) { \
|
||||||
} SSumRes;
|
SqlFunctionCtx *__ctx = (ctx)->tagInfo.pTagCtxList[_i]; \
|
||||||
|
__ctx->fpSet.process(__ctx); \
|
||||||
|
} \
|
||||||
|
} while (0);
|
||||||
|
|
||||||
|
#define DO_UPDATE_SUBSID_RES(ctx, ts) \
|
||||||
|
do { \
|
||||||
|
for (int32_t _i = 0; _i < (ctx)->subsidiaryRes.numOfCols; ++_i) { \
|
||||||
|
SqlFunctionCtx *__ctx = (ctx)->subsidiaryRes.pCtx[_i]; \
|
||||||
|
if (__ctx->functionId == FUNCTION_TS_DUMMY) { \
|
||||||
|
__ctx->tag.i = (ts); \
|
||||||
|
__ctx->tag.nType = TSDB_DATA_TYPE_BIGINT; \
|
||||||
|
} \
|
||||||
|
__ctx->fpSet.process(__ctx); \
|
||||||
|
} \
|
||||||
|
} while (0)
|
||||||
|
|
||||||
|
#define UPDATE_DATA(ctx, left, right, num, sign, _ts) \
|
||||||
|
do { \
|
||||||
|
if (((left) < (right)) ^ (sign)) { \
|
||||||
|
(left) = (right); \
|
||||||
|
DO_UPDATE_SUBSID_RES(ctx, _ts); \
|
||||||
|
(num) += 1; \
|
||||||
|
} \
|
||||||
|
} while (0)
|
||||||
|
|
||||||
|
#define LOOPCHECK_N(val, _col, ctx, _t, _nrow, _start, sign, num) \
|
||||||
|
do { \
|
||||||
|
_t *d = (_t *)((_col)->pData); \
|
||||||
|
for (int32_t i = (_start); i < (_nrow) + (_start); ++i) { \
|
||||||
|
if (((_col)->hasNull) && colDataIsNull_f((_col)->nullbitmap, i)) { \
|
||||||
|
continue; \
|
||||||
|
} \
|
||||||
|
TSKEY ts = (ctx)->ptsList != NULL ? GET_TS_DATA(ctx, i) : 0; \
|
||||||
|
UPDATE_DATA(ctx, val, d[i], num, sign, ts); \
|
||||||
|
} \
|
||||||
|
} while (0)
|
||||||
|
|
||||||
|
|
||||||
bool functionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) {
|
bool functionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) {
|
||||||
if (pResultInfo->initialized) {
|
if (pResultInfo->initialized) {
|
||||||
|
@ -135,7 +225,7 @@ int32_t sumFunction(SqlFunctionCtx *pCtx) {
|
||||||
int32_t type = pInput->pData[0]->info.type;
|
int32_t type = pInput->pData[0]->info.type;
|
||||||
|
|
||||||
SSumRes* pSumRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
SSumRes* pSumRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||||
|
|
||||||
if (pInput->colDataAggIsSet) {
|
if (pInput->colDataAggIsSet) {
|
||||||
numOfElem = pInput->numOfRows - pAgg->numOfNull;
|
numOfElem = pInput->numOfRows - pAgg->numOfNull;
|
||||||
ASSERT(numOfElem >= 0);
|
ASSERT(numOfElem >= 0);
|
||||||
|
@ -190,6 +280,145 @@ bool getSumFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool getAvgFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
|
||||||
|
pEnv->calcMemSize = sizeof(double);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool avgFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) {
|
||||||
|
if (!functionSetup(pCtx, pResultInfo)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
SAvgRes* pRes = GET_ROWCELL_INTERBUF(pResultInfo);
|
||||||
|
memset(pRes, 0, sizeof(SAvgRes));
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t avgFunction(SqlFunctionCtx* pCtx) {
|
||||||
|
int32_t numOfElem = 0;
|
||||||
|
|
||||||
|
// Only the pre-computing information loaded and actual data does not loaded
|
||||||
|
SInputColumnInfoData* pInput = &pCtx->input;
|
||||||
|
int32_t type = pInput->pData[0]->info.type;
|
||||||
|
|
||||||
|
SAvgRes* pAvgRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||||
|
|
||||||
|
// computing based on the true data block
|
||||||
|
SColumnInfoData* pCol = pInput->pData[0];
|
||||||
|
|
||||||
|
int32_t start = pInput->startRowIndex;
|
||||||
|
int32_t numOfRows = pInput->numOfRows;
|
||||||
|
|
||||||
|
switch (type) {
|
||||||
|
case TSDB_DATA_TYPE_TINYINT: {
|
||||||
|
int8_t* plist = (int8_t*)pCol->pData;
|
||||||
|
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
|
||||||
|
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
numOfElem += 1;
|
||||||
|
pAvgRes->count += 1;
|
||||||
|
pAvgRes->sum.isum += plist[i];
|
||||||
|
}
|
||||||
|
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
case TSDB_DATA_TYPE_SMALLINT: {
|
||||||
|
int16_t* plist = (int16_t*)pCol->pData;
|
||||||
|
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
|
||||||
|
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
numOfElem += 1;
|
||||||
|
pAvgRes->count += 1;
|
||||||
|
pAvgRes->sum.isum += plist[i];
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
case TSDB_DATA_TYPE_INT: {
|
||||||
|
int32_t* plist = (int32_t*)pCol->pData;
|
||||||
|
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
|
||||||
|
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
numOfElem += 1;
|
||||||
|
pAvgRes->count += 1;
|
||||||
|
pAvgRes->sum.isum += plist[i];
|
||||||
|
}
|
||||||
|
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
case TSDB_DATA_TYPE_BIGINT: {
|
||||||
|
int64_t* plist = (int64_t*)pCol->pData;
|
||||||
|
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
|
||||||
|
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
numOfElem += 1;
|
||||||
|
pAvgRes->count += 1;
|
||||||
|
pAvgRes->sum.isum += plist[i];
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
case TSDB_DATA_TYPE_FLOAT: {
|
||||||
|
float* plist = (float*)pCol->pData;
|
||||||
|
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
|
||||||
|
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
numOfElem += 1;
|
||||||
|
pAvgRes->count += 1;
|
||||||
|
pAvgRes->sum.dsum += plist[i];
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
case TSDB_DATA_TYPE_DOUBLE: {
|
||||||
|
double* plist = (double*)pCol->pData;
|
||||||
|
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
|
||||||
|
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
numOfElem += 1;
|
||||||
|
pAvgRes->count += 1;
|
||||||
|
pAvgRes->sum.dsum += plist[i];
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
default:
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
// data in the check operation are all null, not output
|
||||||
|
SET_VAL(GET_RES_INFO(pCtx), numOfElem, 1);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t avgFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t slotId) {
|
||||||
|
SInputColumnInfoData* pInput = &pCtx->input;
|
||||||
|
int32_t type = pInput->pData[0]->info.type;
|
||||||
|
SAvgRes* pAvgRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||||
|
if (IS_INTEGER_TYPE(type)) {
|
||||||
|
pAvgRes->result = pAvgRes->sum.isum / ((double) pAvgRes->count);
|
||||||
|
} else {
|
||||||
|
pAvgRes->result = pAvgRes->sum.dsum / ((double) pAvgRes->count);
|
||||||
|
}
|
||||||
|
|
||||||
|
return functionFinalize(pCtx, pBlock, slotId);
|
||||||
|
}
|
||||||
|
|
||||||
EFuncDataRequired statisDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow){
|
EFuncDataRequired statisDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow){
|
||||||
return FUNC_DATA_REQUIRED_STATIS_LOAD;
|
return FUNC_DATA_REQUIRED_STATIS_LOAD;
|
||||||
}
|
}
|
||||||
|
@ -292,49 +521,6 @@ bool getMinmaxFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
#define GET_TS_LIST(x) ((TSKEY*)((x)->ptsList))
|
|
||||||
#define GET_TS_DATA(x, y) (GET_TS_LIST(x)[(y)])
|
|
||||||
|
|
||||||
#define DO_UPDATE_TAG_COLUMNS_WITHOUT_TS(ctx) \
|
|
||||||
do { \
|
|
||||||
for (int32_t _i = 0; _i < (ctx)->tagInfo.numOfTagCols; ++_i) { \
|
|
||||||
SqlFunctionCtx *__ctx = (ctx)->tagInfo.pTagCtxList[_i]; \
|
|
||||||
__ctx->fpSet.process(__ctx); \
|
|
||||||
} \
|
|
||||||
} while (0);
|
|
||||||
|
|
||||||
#define DO_UPDATE_SUBSID_RES(ctx, ts) \
|
|
||||||
do { \
|
|
||||||
for (int32_t _i = 0; _i < (ctx)->subsidiaryRes.numOfCols; ++_i) { \
|
|
||||||
SqlFunctionCtx *__ctx = (ctx)->subsidiaryRes.pCtx[_i]; \
|
|
||||||
if (__ctx->functionId == FUNCTION_TS_DUMMY) { \
|
|
||||||
__ctx->tag.i = (ts); \
|
|
||||||
__ctx->tag.nType = TSDB_DATA_TYPE_BIGINT; \
|
|
||||||
} \
|
|
||||||
__ctx->fpSet.process(__ctx); \
|
|
||||||
} \
|
|
||||||
} while (0)
|
|
||||||
|
|
||||||
#define UPDATE_DATA(ctx, left, right, num, sign, _ts) \
|
|
||||||
do { \
|
|
||||||
if (((left) < (right)) ^ (sign)) { \
|
|
||||||
(left) = (right); \
|
|
||||||
DO_UPDATE_SUBSID_RES(ctx, _ts); \
|
|
||||||
(num) += 1; \
|
|
||||||
} \
|
|
||||||
} while (0)
|
|
||||||
|
|
||||||
#define LOOPCHECK_N(val, _col, ctx, _t, _nrow, _start, sign, num) \
|
|
||||||
do { \
|
|
||||||
_t *d = (_t *)((_col)->pData); \
|
|
||||||
for (int32_t i = (_start); i < (_nrow) + (_start); ++i) { \
|
|
||||||
if (((_col)->hasNull) && colDataIsNull_f((_col)->nullbitmap, i)) { \
|
|
||||||
continue; \
|
|
||||||
} \
|
|
||||||
TSKEY ts = (ctx)->ptsList != NULL ? GET_TS_DATA(ctx, i) : 0; \
|
|
||||||
UPDATE_DATA(ctx, val, d[i], num, sign, ts); \
|
|
||||||
} \
|
|
||||||
} while (0)
|
|
||||||
|
|
||||||
int32_t doMinMaxHelper(SqlFunctionCtx *pCtx, int32_t isMinFunc) {
|
int32_t doMinMaxHelper(SqlFunctionCtx *pCtx, int32_t isMinFunc) {
|
||||||
int32_t numOfElems = 0;
|
int32_t numOfElems = 0;
|
||||||
|
@ -479,13 +665,6 @@ int32_t maxFunction(SqlFunctionCtx *pCtx) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
typedef struct SStddevRes {
|
|
||||||
double result;
|
|
||||||
int64_t count;
|
|
||||||
union {double quadraticDSum; int64_t quadraticISum;};
|
|
||||||
union {double dsum; int64_t isum;};
|
|
||||||
} SStddevRes;
|
|
||||||
|
|
||||||
bool getStddevFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
bool getStddevFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
||||||
pEnv->calcMemSize = sizeof(SStddevRes);
|
pEnv->calcMemSize = sizeof(SStddevRes);
|
||||||
return true;
|
return true;
|
||||||
|
@ -588,8 +767,8 @@ int32_t stddevFunction(SqlFunctionCtx* pCtx) {
|
||||||
|
|
||||||
numOfElem += 1;
|
numOfElem += 1;
|
||||||
pStddevRes->count += 1;
|
pStddevRes->count += 1;
|
||||||
pStddevRes->isum += plist[i];
|
pStddevRes->dsum += plist[i];
|
||||||
pStddevRes->quadraticISum += plist[i] * plist[i];
|
pStddevRes->quadraticDSum += plist[i] * plist[i];
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -603,8 +782,8 @@ int32_t stddevFunction(SqlFunctionCtx* pCtx) {
|
||||||
|
|
||||||
numOfElem += 1;
|
numOfElem += 1;
|
||||||
pStddevRes->count += 1;
|
pStddevRes->count += 1;
|
||||||
pStddevRes->isum += plist[i];
|
pStddevRes->dsum += plist[i];
|
||||||
pStddevRes->quadraticISum += plist[i] * plist[i];
|
pStddevRes->quadraticDSum += plist[i] * plist[i];
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -619,21 +798,21 @@ int32_t stddevFunction(SqlFunctionCtx* pCtx) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t stddevFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t slotId) {
|
int32_t stddevFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t slotId) {
|
||||||
|
SInputColumnInfoData* pInput = &pCtx->input;
|
||||||
|
int32_t type = pInput->pData[0]->info.type;
|
||||||
SStddevRes* pStddevRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
SStddevRes* pStddevRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||||
double avg = pStddevRes->isum / ((double) pStddevRes->count);
|
double avg;
|
||||||
pStddevRes->result = sqrt(pStddevRes->quadraticISum/((double)pStddevRes->count) - avg*avg);
|
if (IS_INTEGER_TYPE(type)) {
|
||||||
|
avg = pStddevRes->isum / ((double) pStddevRes->count);
|
||||||
|
pStddevRes->result = sqrt(pStddevRes->quadraticISum/((double)pStddevRes->count) - avg*avg);
|
||||||
|
} else {
|
||||||
|
avg = pStddevRes->dsum / ((double) pStddevRes->count);
|
||||||
|
pStddevRes->result = sqrt(pStddevRes->quadraticDSum/((double)pStddevRes->count) - avg*avg);
|
||||||
|
}
|
||||||
|
|
||||||
return functionFinalize(pCtx, pBlock, slotId);
|
return functionFinalize(pCtx, pBlock, slotId);
|
||||||
}
|
}
|
||||||
|
|
||||||
typedef struct SPercentileInfo {
|
|
||||||
double result;
|
|
||||||
tMemBucket *pMemBucket;
|
|
||||||
int32_t stage;
|
|
||||||
double minval;
|
|
||||||
double maxval;
|
|
||||||
int64_t numOfElems;
|
|
||||||
} SPercentileInfo;
|
|
||||||
|
|
||||||
bool getPercentileFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
bool getPercentileFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
||||||
pEnv->calcMemSize = sizeof(SPercentileInfo);
|
pEnv->calcMemSize = sizeof(SPercentileInfo);
|
||||||
return true;
|
return true;
|
||||||
|
@ -928,14 +1107,6 @@ int32_t lastFunction(SqlFunctionCtx *pCtx) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
typedef struct SDiffInfo {
|
|
||||||
bool hasPrev;
|
|
||||||
bool includeNull;
|
|
||||||
bool ignoreNegative;
|
|
||||||
bool firstOutput;
|
|
||||||
union { int64_t i64; double d64;} prev;
|
|
||||||
} SDiffInfo;
|
|
||||||
|
|
||||||
bool getDiffFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
|
bool getDiffFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
|
||||||
pEnv->calcMemSize = sizeof(SDiffInfo);
|
pEnv->calcMemSize = sizeof(SDiffInfo);
|
||||||
return true;
|
return true;
|
||||||
|
@ -1168,21 +1339,6 @@ int32_t diffFunction(SqlFunctionCtx *pCtx) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
typedef struct STopBotResItem {
|
|
||||||
SVariant v;
|
|
||||||
uint64_t uid; // it is a table uid, used to extract tag data during building of the final result for the tag data
|
|
||||||
struct {
|
|
||||||
int32_t pageId;
|
|
||||||
int32_t offset;
|
|
||||||
} tuplePos; // tuple data of this chosen row
|
|
||||||
} STopBotResItem;
|
|
||||||
|
|
||||||
typedef struct STopBotRes {
|
|
||||||
int32_t pageId;
|
|
||||||
// int32_t num;
|
|
||||||
STopBotResItem *pItems;
|
|
||||||
} STopBotRes;
|
|
||||||
|
|
||||||
bool getTopBotFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
bool getTopBotFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
||||||
SValueNode* pkNode = (SValueNode*) nodesListGetNode(pFunc->pParameterList, 1);
|
SValueNode* pkNode = (SValueNode*) nodesListGetNode(pFunc->pParameterList, 1);
|
||||||
pEnv->calcMemSize = sizeof(STopBotRes) + pkNode->datum.i * sizeof(STopBotResItem);
|
pEnv->calcMemSize = sizeof(STopBotRes) + pkNode->datum.i * sizeof(STopBotResItem);
|
||||||
|
@ -1335,4 +1491,4 @@ int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t slotId
|
||||||
return pEntryInfo->numOfRes;
|
return pEntryInfo->numOfRes;
|
||||||
|
|
||||||
// return functionFinalize(pCtx, pBlock, slotId);
|
// return functionFinalize(pCtx, pBlock, slotId);
|
||||||
}
|
}
|
||||||
|
|
|
@ -671,9 +671,6 @@ static int32_t jsonToName(const SJson* pJson, void* pObj) {
|
||||||
static const char* jkScanPhysiPlanScanCols = "ScanCols";
|
static const char* jkScanPhysiPlanScanCols = "ScanCols";
|
||||||
static const char* jkScanPhysiPlanTableId = "TableId";
|
static const char* jkScanPhysiPlanTableId = "TableId";
|
||||||
static const char* jkScanPhysiPlanTableType = "TableType";
|
static const char* jkScanPhysiPlanTableType = "TableType";
|
||||||
static const char* jkScanPhysiPlanScanOrder = "ScanOrder";
|
|
||||||
static const char* jkScanPhysiPlanScanCount = "ScanCount";
|
|
||||||
static const char* jkScanPhysiPlanReverseScanCount = "ReverseScanCount";
|
|
||||||
static const char* jkScanPhysiPlanTableName = "TableName";
|
static const char* jkScanPhysiPlanTableName = "TableName";
|
||||||
|
|
||||||
static int32_t physiScanNodeToJson(const void* pObj, SJson* pJson) {
|
static int32_t physiScanNodeToJson(const void* pObj, SJson* pJson) {
|
||||||
|
@ -689,15 +686,6 @@ static int32_t physiScanNodeToJson(const void* pObj, SJson* pJson) {
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = tjsonAddIntegerToObject(pJson, jkScanPhysiPlanTableType, pNode->tableType);
|
code = tjsonAddIntegerToObject(pJson, jkScanPhysiPlanTableType, pNode->tableType);
|
||||||
}
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
|
||||||
code = tjsonAddIntegerToObject(pJson, jkScanPhysiPlanScanOrder, pNode->order);
|
|
||||||
}
|
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
|
||||||
code = tjsonAddIntegerToObject(pJson, jkScanPhysiPlanScanCount, pNode->count);
|
|
||||||
}
|
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
|
||||||
code = tjsonAddIntegerToObject(pJson, jkScanPhysiPlanReverseScanCount, pNode->reverse);
|
|
||||||
}
|
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = tjsonAddObject(pJson, jkScanPhysiPlanTableName, nameToJson, &pNode->tableName);
|
code = tjsonAddObject(pJson, jkScanPhysiPlanTableName, nameToJson, &pNode->tableName);
|
||||||
}
|
}
|
||||||
|
@ -718,15 +706,6 @@ static int32_t jsonToPhysiScanNode(const SJson* pJson, void* pObj) {
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = tjsonGetTinyIntValue(pJson, jkScanPhysiPlanTableType, &pNode->tableType);
|
code = tjsonGetTinyIntValue(pJson, jkScanPhysiPlanTableType, &pNode->tableType);
|
||||||
}
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
|
||||||
code = tjsonGetIntValue(pJson, jkScanPhysiPlanScanOrder, &pNode->order);
|
|
||||||
}
|
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
|
||||||
code = tjsonGetIntValue(pJson, jkScanPhysiPlanScanCount, &pNode->count);
|
|
||||||
}
|
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
|
||||||
code = tjsonGetIntValue(pJson, jkScanPhysiPlanReverseScanCount, &pNode->reverse);
|
|
||||||
}
|
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = tjsonToObject(pJson, jkScanPhysiPlanTableName, jsonToName, &pNode->tableName);
|
code = tjsonToObject(pJson, jkScanPhysiPlanTableName, jsonToName, &pNode->tableName);
|
||||||
}
|
}
|
||||||
|
@ -742,7 +721,8 @@ static int32_t jsonToPhysiTagScanNode(const SJson* pJson, void* pObj) {
|
||||||
return jsonToPhysiScanNode(pJson, pObj);
|
return jsonToPhysiScanNode(pJson, pObj);
|
||||||
}
|
}
|
||||||
|
|
||||||
static const char* jkTableScanPhysiPlanScanFlag = "ScanFlag";
|
static const char* jkTableScanPhysiPlanScanCount = "ScanCount";
|
||||||
|
static const char* jkTableScanPhysiPlanReverseScanCount = "ReverseScanCount";
|
||||||
static const char* jkTableScanPhysiPlanStartKey = "StartKey";
|
static const char* jkTableScanPhysiPlanStartKey = "StartKey";
|
||||||
static const char* jkTableScanPhysiPlanEndKey = "EndKey";
|
static const char* jkTableScanPhysiPlanEndKey = "EndKey";
|
||||||
static const char* jkTableScanPhysiPlanRatio = "Ratio";
|
static const char* jkTableScanPhysiPlanRatio = "Ratio";
|
||||||
|
@ -759,7 +739,10 @@ static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) {
|
||||||
|
|
||||||
int32_t code = physiScanNodeToJson(pObj, pJson);
|
int32_t code = physiScanNodeToJson(pObj, pJson);
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = tjsonAddIntegerToObject(pJson, jkTableScanPhysiPlanScanFlag, pNode->scanFlag);
|
code = tjsonAddIntegerToObject(pJson, jkTableScanPhysiPlanScanCount, pNode->scanSeq[0]);
|
||||||
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = tjsonAddIntegerToObject(pJson, jkTableScanPhysiPlanReverseScanCount, pNode->scanSeq[1]);
|
||||||
}
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = tjsonAddIntegerToObject(pJson, jkTableScanPhysiPlanStartKey, pNode->scanRange.skey);
|
code = tjsonAddIntegerToObject(pJson, jkTableScanPhysiPlanStartKey, pNode->scanRange.skey);
|
||||||
|
@ -800,7 +783,10 @@ static int32_t jsonToPhysiTableScanNode(const SJson* pJson, void* pObj) {
|
||||||
|
|
||||||
int32_t code = jsonToPhysiScanNode(pJson, pObj);
|
int32_t code = jsonToPhysiScanNode(pJson, pObj);
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = tjsonGetUTinyIntValue(pJson, jkTableScanPhysiPlanScanFlag, &pNode->scanFlag);
|
code = tjsonGetUTinyIntValue(pJson, jkTableScanPhysiPlanScanCount, &pNode->scanSeq[0]);
|
||||||
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = tjsonGetUTinyIntValue(pJson, jkTableScanPhysiPlanReverseScanCount, &pNode->scanSeq[1]);
|
||||||
}
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = tjsonGetBigIntValue(pJson, jkTableScanPhysiPlanStartKey, &pNode->scanRange.skey);
|
code = tjsonGetBigIntValue(pJson, jkTableScanPhysiPlanStartKey, &pNode->scanRange.skey);
|
||||||
|
|
|
@ -611,6 +611,7 @@ function_expression(A) ::= function_name(B) NK_LP expression_list(C) NK_RP(D).
|
||||||
function_expression(A) ::= star_func(B) NK_LP star_func_para_list(C) NK_RP(D). { A = createRawExprNodeExt(pCxt, &B, &D, createFunctionNode(pCxt, &B, C)); }
|
function_expression(A) ::= star_func(B) NK_LP star_func_para_list(C) NK_RP(D). { A = createRawExprNodeExt(pCxt, &B, &D, createFunctionNode(pCxt, &B, C)); }
|
||||||
function_expression(A) ::= CAST(B) NK_LP expression(C) AS type_name(D) NK_RP(E). { A = createRawExprNodeExt(pCxt, &B, &E, createCastFunctionNode(pCxt, releaseRawExprNode(pCxt, C), D)); }
|
function_expression(A) ::= CAST(B) NK_LP expression(C) AS type_name(D) NK_RP(E). { A = createRawExprNodeExt(pCxt, &B, &E, createCastFunctionNode(pCxt, releaseRawExprNode(pCxt, C), D)); }
|
||||||
function_expression(A) ::= noarg_func(B) NK_LP NK_RP(C). { A = createRawExprNodeExt(pCxt, &B, &C, createFunctionNodeNoArg(pCxt, &B)); }
|
function_expression(A) ::= noarg_func(B) NK_LP NK_RP(C). { A = createRawExprNodeExt(pCxt, &B, &C, createFunctionNodeNoArg(pCxt, &B)); }
|
||||||
|
//function_expression(A) ::= NOW(B). { A = createRawExprNode(pCxt, &B, createFunctionNode(pCxt, &B, NULL)); }
|
||||||
|
|
||||||
%type noarg_func { SToken }
|
%type noarg_func { SToken }
|
||||||
%destructor noarg_func { }
|
%destructor noarg_func { }
|
||||||
|
|
|
@ -83,7 +83,7 @@ static EDealRes calcConstOperator(SOperatorNode** pNode, void* pContext) {
|
||||||
|
|
||||||
static EDealRes calcConstFunction(SFunctionNode** pNode, void* pContext) {
|
static EDealRes calcConstFunction(SFunctionNode** pNode, void* pContext) {
|
||||||
SFunctionNode* pFunc = *pNode;
|
SFunctionNode* pFunc = *pNode;
|
||||||
if (!fmIsScalarFunc(pFunc->funcId)) {
|
if (!fmIsScalarFunc(pFunc->funcId) || fmIsUserDefinedFunc(pFunc->funcId)) {
|
||||||
return DEAL_RES_CONTINUE;
|
return DEAL_RES_CONTINUE;
|
||||||
}
|
}
|
||||||
SNode* pParam = NULL;
|
SNode* pParam = NULL;
|
||||||
|
|
|
@ -2616,37 +2616,62 @@ static int32_t translateDropComponentNode(STranslateContext* pCxt, SDropComponen
|
||||||
(FSerializeFunc)tSerializeSCreateDropMQSBNodeReq, &dropReq);
|
(FSerializeFunc)tSerializeSCreateDropMQSBNodeReq, &dropReq);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t translateCreateTopic(STranslateContext* pCxt, SCreateTopicStmt* pStmt) {
|
static int32_t buildCreateTopicReq(STranslateContext* pCxt, SCreateTopicStmt* pStmt, SCMCreateTopicReq* pReq) {
|
||||||
SCMCreateTopicReq createReq = {0};
|
SName name;
|
||||||
|
// tNameSetDbName(&name, pCxt->pParseCxt->acctId, pStmt->topicName, strlen(pStmt->topicName));
|
||||||
|
// tNameGetFullDbName(&name, pReq->name);
|
||||||
|
tNameExtractFullName(toName(pCxt->pParseCxt->acctId, pCxt->pParseCxt->db, pStmt->topicName, &name), pReq->name);
|
||||||
|
pReq->igExists = pStmt->ignoreExists;
|
||||||
|
pReq->withTbName = pStmt->pOptions->withTable;
|
||||||
|
pReq->withSchema = pStmt->pOptions->withSchema;
|
||||||
|
pReq->withTag = pStmt->pOptions->withTag;
|
||||||
|
|
||||||
if (NULL != pStmt->pQuery) {
|
pReq->sql = strdup(pCxt->pParseCxt->pSql);
|
||||||
pCxt->pParseCxt->topicQuery = true;
|
if (NULL == pReq->sql) {
|
||||||
int32_t code = translateQuery(pCxt, pStmt->pQuery);
|
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
|
||||||
code = nodesNodeToString(pStmt->pQuery, false, &createReq.ast, NULL);
|
|
||||||
}
|
|
||||||
if (TSDB_CODE_SUCCESS != code) {
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
strcpy(createReq.subscribeDbName, pStmt->subscribeDbName);
|
|
||||||
}
|
|
||||||
|
|
||||||
createReq.sql = strdup(pCxt->pParseCxt->pSql);
|
|
||||||
if (NULL == createReq.sql) {
|
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
|
||||||
SName name;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
// tNameSetDbName(&name, pCxt->pParseCxt->acctId, pCxt->pParseCxt->db, strlen(pCxt->pParseCxt->db));
|
|
||||||
// tNameGetFullDbName(&name, createReq.name);
|
|
||||||
tNameExtractFullName(toName(pCxt->pParseCxt->acctId, pCxt->pParseCxt->db, pStmt->topicName, &name), createReq.name);
|
|
||||||
createReq.igExists = pStmt->ignoreExists;
|
|
||||||
createReq.withTbName = pStmt->pOptions->withTable;
|
|
||||||
createReq.withSchema = pStmt->pOptions->withSchema;
|
|
||||||
createReq.withTag = pStmt->pOptions->withTag;
|
|
||||||
|
|
||||||
int32_t code = buildCmdMsg(pCxt, TDMT_MND_CREATE_TOPIC, (FSerializeFunc)tSerializeSCMCreateTopicReq, &createReq);
|
if (NULL != pStmt->pQuery) {
|
||||||
|
strcpy(pReq->subscribeDbName, ((SRealTableNode*)(((SSelectStmt*)pStmt->pQuery)->pFromTable))->table.dbName);
|
||||||
|
pCxt->pParseCxt->topicQuery = true;
|
||||||
|
code = translateQuery(pCxt, pStmt->pQuery);
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = nodesNodeToString(pStmt->pQuery, false, &pReq->ast, NULL);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
strcpy(pReq->subscribeDbName, pStmt->subscribeDbName);
|
||||||
|
}
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t checkCreateTopic(STranslateContext* pCxt, SCreateTopicStmt* pStmt) {
|
||||||
|
if (NULL == pStmt->pQuery) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (QUERY_NODE_SELECT_STMT == nodeType(pStmt->pQuery)) {
|
||||||
|
SSelectStmt* pSelect = (SSelectStmt*)pStmt->pQuery;
|
||||||
|
if (!pSelect->isDistinct && QUERY_NODE_REAL_TABLE == nodeType(pSelect->pFromTable) && NULL == pSelect->pGroupByList &&
|
||||||
|
NULL == pSelect->pLimit && NULL == pSelect->pSlimit && NULL == pSelect->pOrderByList && NULL == pSelect->pPartitionByList) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_TOPIC_QUERY);
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t translateCreateTopic(STranslateContext* pCxt, SCreateTopicStmt* pStmt) {
|
||||||
|
SCMCreateTopicReq createReq = {0};
|
||||||
|
int32_t code = checkCreateTopic(pCxt, pStmt);
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = buildCreateTopicReq(pCxt, pStmt, &createReq);
|
||||||
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = buildCmdMsg(pCxt, TDMT_MND_CREATE_TOPIC, (FSerializeFunc)tSerializeSCMCreateTopicReq, &createReq);
|
||||||
|
}
|
||||||
tFreeSCMCreateTopicReq(&createReq);
|
tFreeSCMCreateTopicReq(&createReq);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
|
@ -128,6 +128,8 @@ static char* getSyntaxErrFormat(int32_t errCode) {
|
||||||
return "soffset/offset can not be less than 0";
|
return "soffset/offset can not be less than 0";
|
||||||
case TSDB_CODE_PAR_SLIMIT_LEAK_PARTITION_BY:
|
case TSDB_CODE_PAR_SLIMIT_LEAK_PARTITION_BY:
|
||||||
return "slimit/soffset only available for PARTITION BY query";
|
return "slimit/soffset only available for PARTITION BY query";
|
||||||
|
case TSDB_CODE_PAR_INVALID_TOPIC_QUERY:
|
||||||
|
return "Invalid topic query";
|
||||||
case TSDB_CODE_OUT_OF_MEMORY:
|
case TSDB_CODE_OUT_OF_MEMORY:
|
||||||
return "Out of memory";
|
return "Out of memory";
|
||||||
default:
|
default:
|
||||||
|
|
|
@ -199,7 +199,8 @@ static int32_t createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
|
||||||
|
|
||||||
TSWAP(pScan->pMeta, pRealTable->pMeta, STableMeta*);
|
TSWAP(pScan->pMeta, pRealTable->pMeta, STableMeta*);
|
||||||
TSWAP(pScan->pVgroupList, pRealTable->pVgroupList, SVgroupsInfo*);
|
TSWAP(pScan->pVgroupList, pRealTable->pVgroupList, SVgroupsInfo*);
|
||||||
pScan->scanFlag = MAIN_SCAN;
|
pScan->scanSeq[0] = 1;
|
||||||
|
pScan->scanSeq[1] = 0;
|
||||||
pScan->scanRange = TSWINDOW_INITIALIZER;
|
pScan->scanRange = TSWINDOW_INITIALIZER;
|
||||||
pScan->tableName.type = TSDB_TABLE_NAME_T;
|
pScan->tableName.type = TSDB_TABLE_NAME_T;
|
||||||
pScan->tableName.acctId = pCxt->pPlanCxt->acctId;
|
pScan->tableName.acctId = pCxt->pPlanCxt->acctId;
|
||||||
|
|
|
@ -20,11 +20,14 @@
|
||||||
#define OPTIMIZE_FLAG_MASK(n) (1 << n)
|
#define OPTIMIZE_FLAG_MASK(n) (1 << n)
|
||||||
|
|
||||||
#define OPTIMIZE_FLAG_OSD OPTIMIZE_FLAG_MASK(0)
|
#define OPTIMIZE_FLAG_OSD OPTIMIZE_FLAG_MASK(0)
|
||||||
|
#define OPTIMIZE_FLAG_CPD OPTIMIZE_FLAG_MASK(1)
|
||||||
|
#define OPTIMIZE_FLAG_OPK OPTIMIZE_FLAG_MASK(2)
|
||||||
|
|
||||||
#define OPTIMIZE_FLAG_SET_MASK(val, mask) (val) |= (mask)
|
#define OPTIMIZE_FLAG_SET_MASK(val, mask) (val) |= (mask)
|
||||||
#define OPTIMIZE_FLAG_TEST_MASK(val, mask) (((val) & (mask)) != 0)
|
#define OPTIMIZE_FLAG_TEST_MASK(val, mask) (((val) & (mask)) != 0)
|
||||||
|
|
||||||
typedef struct SOptimizeContext {
|
typedef struct SOptimizeContext {
|
||||||
|
SPlanContext* pPlanCxt;
|
||||||
bool optimized;
|
bool optimized;
|
||||||
} SOptimizeContext;
|
} SOptimizeContext;
|
||||||
|
|
||||||
|
@ -57,7 +60,23 @@ typedef enum ECondAction {
|
||||||
// after supporting outer join, there are other possibilities
|
// after supporting outer join, there are other possibilities
|
||||||
} ECondAction;
|
} ECondAction;
|
||||||
|
|
||||||
EDealRes haveNormalColImpl(SNode* pNode, void* pContext) {
|
typedef bool (*FMayBeOptimized)(SLogicNode* pNode);
|
||||||
|
|
||||||
|
static SLogicNode* optFindPossibleNode(SLogicNode* pNode, FMayBeOptimized func) {
|
||||||
|
if (func(pNode)) {
|
||||||
|
return pNode;
|
||||||
|
}
|
||||||
|
SNode* pChild;
|
||||||
|
FOREACH(pChild, pNode->pChildren) {
|
||||||
|
SLogicNode* pScanNode = optFindPossibleNode((SLogicNode*)pChild, func);
|
||||||
|
if (NULL != pScanNode) {
|
||||||
|
return pScanNode;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
EDealRes osdHaveNormalColImpl(SNode* pNode, void* pContext) {
|
||||||
if (QUERY_NODE_COLUMN == nodeType(pNode)) {
|
if (QUERY_NODE_COLUMN == nodeType(pNode)) {
|
||||||
*((bool*)pContext) = (COLUMN_TYPE_TAG != ((SColumnNode*)pNode)->colType);
|
*((bool*)pContext) = (COLUMN_TYPE_TAG != ((SColumnNode*)pNode)->colType);
|
||||||
return *((bool*)pContext) ? DEAL_RES_END : DEAL_RES_IGNORE_CHILD;
|
return *((bool*)pContext) ? DEAL_RES_END : DEAL_RES_IGNORE_CHILD;
|
||||||
|
@ -65,9 +84,9 @@ EDealRes haveNormalColImpl(SNode* pNode, void* pContext) {
|
||||||
return DEAL_RES_CONTINUE;
|
return DEAL_RES_CONTINUE;
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool haveNormalCol(SNodeList* pList) {
|
static bool osdHaveNormalCol(SNodeList* pList) {
|
||||||
bool res = false;
|
bool res = false;
|
||||||
nodesWalkExprsPostOrder(pList, haveNormalColImpl, &res);
|
nodesWalkExprsPostOrder(pList, osdHaveNormalColImpl, &res);
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -89,21 +108,7 @@ static bool osdMayBeOptimized(SLogicNode* pNode) {
|
||||||
if (QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pNode->pParent)) {
|
if (QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pNode->pParent)) {
|
||||||
return (WINDOW_TYPE_INTERVAL == ((SWindowLogicNode*)pNode->pParent)->winType);
|
return (WINDOW_TYPE_INTERVAL == ((SWindowLogicNode*)pNode->pParent)->winType);
|
||||||
}
|
}
|
||||||
return !haveNormalCol(((SAggLogicNode*)pNode->pParent)->pGroupKeys);
|
return !osdHaveNormalCol(((SAggLogicNode*)pNode->pParent)->pGroupKeys);
|
||||||
}
|
|
||||||
|
|
||||||
static SLogicNode* osdFindPossibleScanNode(SLogicNode* pNode) {
|
|
||||||
if (osdMayBeOptimized(pNode)) {
|
|
||||||
return pNode;
|
|
||||||
}
|
|
||||||
SNode* pChild;
|
|
||||||
FOREACH(pChild, pNode->pChildren) {
|
|
||||||
SLogicNode* pScanNode = osdFindPossibleScanNode((SLogicNode*)pChild);
|
|
||||||
if (NULL != pScanNode) {
|
|
||||||
return pScanNode;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return NULL;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static SNodeList* osdGetAllFuncs(SLogicNode* pNode) {
|
static SNodeList* osdGetAllFuncs(SLogicNode* pNode) {
|
||||||
|
@ -138,7 +143,7 @@ static int32_t osdGetRelatedFuncs(SScanLogicNode* pScan, SNodeList** pSdrFuncs,
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t osdMatch(SOptimizeContext* pCxt, SLogicNode* pLogicNode, SOsdInfo* pInfo) {
|
static int32_t osdMatch(SOptimizeContext* pCxt, SLogicNode* pLogicNode, SOsdInfo* pInfo) {
|
||||||
pInfo->pScan = (SScanLogicNode*)osdFindPossibleScanNode(pLogicNode);
|
pInfo->pScan = (SScanLogicNode*)optFindPossibleNode(pLogicNode, osdMayBeOptimized);
|
||||||
if (NULL == pInfo->pScan) {
|
if (NULL == pInfo->pScan) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -345,7 +350,7 @@ static int32_t cpdCalcTimeRange(SScanLogicNode* pScan, SNode** pPrimaryKeyCond,
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t cpdOptimizeScanCondition(SOptimizeContext* pCxt, SScanLogicNode* pScan) {
|
static int32_t cpdOptimizeScanCondition(SOptimizeContext* pCxt, SScanLogicNode* pScan) {
|
||||||
if (NULL == pScan->node.pConditions) {
|
if (NULL == pScan->node.pConditions || OPTIMIZE_FLAG_TEST_MASK(pScan->node.optimizedFlag, OPTIMIZE_FLAG_CPD)) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -359,7 +364,10 @@ static int32_t cpdOptimizeScanCondition(SOptimizeContext* pCxt, SScanLogicNode*
|
||||||
pScan->node.pConditions = pOtherCond;
|
pScan->node.pConditions = pOtherCond;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (TSDB_CODE_SUCCESS != code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
OPTIMIZE_FLAG_SET_MASK(pScan->node.optimizedFlag, OPTIMIZE_FLAG_CPD);
|
||||||
|
pCxt->optimized = true;
|
||||||
|
} else {
|
||||||
nodesDestroyNode(pPrimaryKeyCond);
|
nodesDestroyNode(pPrimaryKeyCond);
|
||||||
nodesDestroyNode(pOtherCond);
|
nodesDestroyNode(pOtherCond);
|
||||||
}
|
}
|
||||||
|
@ -367,7 +375,7 @@ static int32_t cpdOptimizeScanCondition(SOptimizeContext* pCxt, SScanLogicNode*
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool belongThisTable(SNode* pCondCol, SNodeList* pTableCols) {
|
static bool cpdBelongThisTable(SNode* pCondCol, SNodeList* pTableCols) {
|
||||||
SNode* pTableCol = NULL;
|
SNode* pTableCol = NULL;
|
||||||
FOREACH(pTableCol, pTableCols) {
|
FOREACH(pTableCol, pTableCols) {
|
||||||
if (nodesEqualNode(pCondCol, pTableCol)) {
|
if (nodesEqualNode(pCondCol, pTableCol)) {
|
||||||
|
@ -380,9 +388,9 @@ static bool belongThisTable(SNode* pCondCol, SNodeList* pTableCols) {
|
||||||
static EDealRes cpdIsMultiTableCondImpl(SNode* pNode, void* pContext) {
|
static EDealRes cpdIsMultiTableCondImpl(SNode* pNode, void* pContext) {
|
||||||
SCpdIsMultiTableCondCxt* pCxt = pContext;
|
SCpdIsMultiTableCondCxt* pCxt = pContext;
|
||||||
if (QUERY_NODE_COLUMN == nodeType(pNode)) {
|
if (QUERY_NODE_COLUMN == nodeType(pNode)) {
|
||||||
if (belongThisTable(pNode, pCxt->pLeftCols)) {
|
if (cpdBelongThisTable(pNode, pCxt->pLeftCols)) {
|
||||||
pCxt->havaLeftCol = true;
|
pCxt->havaLeftCol = true;
|
||||||
} else if (belongThisTable(pNode, pCxt->pRightCols)) {
|
} else if (cpdBelongThisTable(pNode, pCxt->pRightCols)) {
|
||||||
pCxt->haveRightCol = true;
|
pCxt->haveRightCol = true;
|
||||||
}
|
}
|
||||||
return pCxt->havaLeftCol && pCxt->haveRightCol ? DEAL_RES_END : DEAL_RES_CONTINUE;
|
return pCxt->havaLeftCol && pCxt->haveRightCol ? DEAL_RES_END : DEAL_RES_CONTINUE;
|
||||||
|
@ -508,11 +516,76 @@ static int32_t cpdPushCondToChild(SOptimizeContext* pCxt, SLogicNode* pChild, SN
|
||||||
return TSDB_CODE_PLAN_INTERNAL_ERROR;
|
return TSDB_CODE_PLAN_INTERNAL_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static bool cpdIsPrimaryKey(SNode* pNode, SNodeList* pTableCols) {
|
||||||
|
if (QUERY_NODE_COLUMN != nodeType(pNode)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
SColumnNode* pCol = (SColumnNode*)pNode;
|
||||||
|
if (PRIMARYKEY_TIMESTAMP_COL_ID != pCol->colId) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return cpdBelongThisTable(pNode, pTableCols);
|
||||||
|
}
|
||||||
|
|
||||||
|
static bool cpdIsPrimaryKeyEqualCond(SJoinLogicNode* pJoin, SNode* pCond) {
|
||||||
|
if (QUERY_NODE_OPERATOR != nodeType(pCond)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
SNodeList* pLeftCols = ((SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 0))->pTargets;
|
||||||
|
SNodeList* pRightCols = ((SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 1))->pTargets;
|
||||||
|
SOperatorNode* pOper = (SOperatorNode*)pJoin->pOnConditions;
|
||||||
|
if (cpdIsPrimaryKey(pOper->pLeft, pLeftCols)) {
|
||||||
|
return cpdIsPrimaryKey(pOper->pRight, pRightCols);
|
||||||
|
} else if (cpdIsPrimaryKey(pOper->pLeft, pRightCols)) {
|
||||||
|
return cpdIsPrimaryKey(pOper->pRight, pLeftCols);
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t cpdCheckOpCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin, SNode* pOnCond) {
|
||||||
|
if (!cpdIsPrimaryKeyEqualCond(pJoin, pOnCond)) {
|
||||||
|
snprintf(pCxt->pPlanCxt->pMsg, pCxt->pPlanCxt->msgLen, "l.ts = r.ts is expected in join expression");
|
||||||
|
return TSDB_CODE_FAILED;
|
||||||
|
}
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t cpdCheckLogicCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin, SLogicConditionNode* pOnCond) {
|
||||||
|
if (LOGIC_COND_TYPE_AND != pOnCond->condType) {
|
||||||
|
snprintf(pCxt->pPlanCxt->pMsg, pCxt->pPlanCxt->msgLen, "l.ts = r.ts is expected in join expression");
|
||||||
|
return TSDB_CODE_FAILED;
|
||||||
|
}
|
||||||
|
SNode* pCond = NULL;
|
||||||
|
FOREACH(pCond, pOnCond->pParameterList) {
|
||||||
|
if (!cpdIsPrimaryKeyEqualCond(pJoin, pCond)) {
|
||||||
|
snprintf(pCxt->pPlanCxt->pMsg, pCxt->pPlanCxt->msgLen, "l.ts = r.ts is expected in join expression");
|
||||||
|
return TSDB_CODE_FAILED;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t cpdCheckJoinOnCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) {
|
||||||
|
if (NULL == pJoin->pOnConditions) {
|
||||||
|
snprintf(pCxt->pPlanCxt->pMsg, pCxt->pPlanCxt->msgLen, "not support cross join");
|
||||||
|
return TSDB_CODE_FAILED;
|
||||||
|
}
|
||||||
|
if (QUERY_NODE_LOGIC_CONDITION == nodeType(pJoin->pOnConditions)) {
|
||||||
|
return cpdCheckLogicCond(pCxt, pJoin, (SLogicConditionNode*)pJoin->pOnConditions);
|
||||||
|
} else {
|
||||||
|
return cpdCheckOpCond(pCxt, pJoin, pJoin->pOnConditions);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t cpdPushJoinCondition(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) {
|
static int32_t cpdPushJoinCondition(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) {
|
||||||
if (NULL == pJoin->node.pConditions) {
|
if (OPTIMIZE_FLAG_TEST_MASK(pJoin->node.optimizedFlag, OPTIMIZE_FLAG_CPD)) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (NULL == pJoin->node.pConditions) {
|
||||||
|
return cpdCheckJoinOnCond(pCxt, pJoin);
|
||||||
|
}
|
||||||
|
|
||||||
SNode* pOnCond = NULL;
|
SNode* pOnCond = NULL;
|
||||||
SNode* pLeftChildCond = NULL;
|
SNode* pLeftChildCond = NULL;
|
||||||
SNode* pRightChildCond = NULL;
|
SNode* pRightChildCond = NULL;
|
||||||
|
@ -527,7 +600,11 @@ static int32_t cpdPushJoinCondition(SOptimizeContext* pCxt, SJoinLogicNode* pJoi
|
||||||
code = cpdPushCondToChild(pCxt, (SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 1), &pRightChildCond);
|
code = cpdPushCondToChild(pCxt, (SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 1), &pRightChildCond);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (TSDB_CODE_SUCCESS != code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
OPTIMIZE_FLAG_SET_MASK(pJoin->node.optimizedFlag, OPTIMIZE_FLAG_CPD);
|
||||||
|
pCxt->optimized = true;
|
||||||
|
code = cpdCheckJoinOnCond(pCxt, pJoin);
|
||||||
|
} else {
|
||||||
nodesDestroyNode(pOnCond);
|
nodesDestroyNode(pOnCond);
|
||||||
nodesDestroyNode(pLeftChildCond);
|
nodesDestroyNode(pLeftChildCond);
|
||||||
nodesDestroyNode(pRightChildCond);
|
nodesDestroyNode(pRightChildCond);
|
||||||
|
@ -572,15 +649,129 @@ static int32_t cpdOptimize(SOptimizeContext* pCxt, SLogicNode* pLogicNode) {
|
||||||
return cpdPushCondition(pCxt, pLogicNode);
|
return cpdPushCondition(pCxt, pLogicNode);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static bool opkIsPrimaryKeyOrderBy(SNodeList* pSortKeys) {
|
||||||
|
if (1 != LIST_LENGTH(pSortKeys)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
SNode* pNode = ((SOrderByExprNode*)nodesListGetNode(pSortKeys, 0))->pExpr;
|
||||||
|
return (QUERY_NODE_COLUMN == nodeType(pNode) ? (PRIMARYKEY_TIMESTAMP_COL_ID == ((SColumnNode*)pNode)->colId) : false);
|
||||||
|
}
|
||||||
|
|
||||||
|
static bool opkSortMayBeOptimized(SLogicNode* pNode) {
|
||||||
|
if (QUERY_NODE_LOGIC_PLAN_SORT != nodeType(pNode)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (OPTIMIZE_FLAG_TEST_MASK(pNode->optimizedFlag, OPTIMIZE_FLAG_OPK)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t opkGetScanNodesImpl(SLogicNode* pNode, bool* pNotOptimize, SNodeList** pScanNodes) {
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
|
switch (nodeType(pNode)) {
|
||||||
|
case QUERY_NODE_LOGIC_PLAN_SCAN:
|
||||||
|
return nodesListMakeAppend(pScanNodes, pNode);
|
||||||
|
case QUERY_NODE_LOGIC_PLAN_JOIN:
|
||||||
|
code = opkGetScanNodesImpl(nodesListGetNode(pNode->pChildren, 0), pNotOptimize, pScanNodes);
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = opkGetScanNodesImpl(nodesListGetNode(pNode->pChildren, 1), pNotOptimize, pScanNodes);
|
||||||
|
}
|
||||||
|
return code;
|
||||||
|
case QUERY_NODE_LOGIC_PLAN_AGG:
|
||||||
|
*pNotOptimize = true;
|
||||||
|
return code;
|
||||||
|
default:
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (1 != LIST_LENGTH(pNode->pChildren)) {
|
||||||
|
*pNotOptimize = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
return opkGetScanNodesImpl(nodesListGetNode(pNode->pChildren, 0), pNotOptimize, pScanNodes);
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t opkGetScanNodes(SLogicNode* pNode, SNodeList** pScanNodes) {
|
||||||
|
bool notOptimize = false;
|
||||||
|
int32_t code = opkGetScanNodesImpl(pNode, ¬Optimize, pScanNodes);
|
||||||
|
if (TSDB_CODE_SUCCESS != code || notOptimize) {
|
||||||
|
nodesClearList(*pScanNodes);
|
||||||
|
}
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
static EOrder opkGetPrimaryKeyOrder(SSortLogicNode* pSort) {
|
||||||
|
return ((SOrderByExprNode*)nodesListGetNode(pSort->pSortKeys, 0))->order;
|
||||||
|
}
|
||||||
|
|
||||||
|
static SNode* opkRewriteDownNode(SSortLogicNode* pSort) {
|
||||||
|
SNode* pDownNode = nodesListGetNode(pSort->node.pChildren, 0);
|
||||||
|
// todo
|
||||||
|
pSort->node.pChildren = NULL;
|
||||||
|
return pDownNode;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t opkDoOptimized(SOptimizeContext* pCxt, SSortLogicNode* pSort, SNodeList* pScanNodes) {
|
||||||
|
EOrder order = opkGetPrimaryKeyOrder(pSort);
|
||||||
|
if (ORDER_DESC == order) {
|
||||||
|
SNode* pScan = NULL;
|
||||||
|
FOREACH(pScan, pScanNodes) {
|
||||||
|
((SScanLogicNode*)pScan)->scanSeq[0] = 0;
|
||||||
|
((SScanLogicNode*)pScan)->scanSeq[1] = 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (NULL == pSort->node.pParent) {
|
||||||
|
// todo
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
SNode* pDownNode = opkRewriteDownNode(pSort);
|
||||||
|
SNode* pNode;
|
||||||
|
FOREACH(pNode, pSort->node.pParent->pChildren) {
|
||||||
|
if (nodesEqualNode(pNode, pSort)) {
|
||||||
|
REPLACE_NODE(pDownNode);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
nodesDestroyNode(pSort);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t opkOptimizeImpl(SOptimizeContext* pCxt, SSortLogicNode* pSort) {
|
||||||
|
OPTIMIZE_FLAG_SET_MASK(pSort->node.optimizedFlag, OPTIMIZE_FLAG_OPK);
|
||||||
|
if (!opkIsPrimaryKeyOrderBy(pSort->pSortKeys) || 1 != LIST_LENGTH(pSort->node.pChildren)) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
SNodeList* pScanNodes = NULL;
|
||||||
|
int32_t code = opkGetScanNodes(nodesListGetNode(pSort->node.pChildren, 0), &pScanNodes);
|
||||||
|
if (TSDB_CODE_SUCCESS == code && NULL != pScanNodes) {
|
||||||
|
code = opkDoOptimized(pCxt, pSort, pScanNodes);
|
||||||
|
}
|
||||||
|
nodesClearList(pScanNodes);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t opkOptimize(SOptimizeContext* pCxt, SLogicNode* pLogicNode) {
|
||||||
|
SSortLogicNode* pSort = (SSortLogicNode*)optFindPossibleNode(pLogicNode, opkSortMayBeOptimized);
|
||||||
|
if (NULL == pSort) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
return opkOptimizeImpl(pCxt, pSort);
|
||||||
|
}
|
||||||
|
|
||||||
static const SOptimizeRule optimizeRuleSet[] = {
|
static const SOptimizeRule optimizeRuleSet[] = {
|
||||||
{ .pName = "OptimizeScanData", .optimizeFunc = osdOptimize },
|
{ .pName = "OptimizeScanData", .optimizeFunc = osdOptimize },
|
||||||
{ .pName = "ConditionPushDown", .optimizeFunc = cpdOptimize }
|
{ .pName = "ConditionPushDown", .optimizeFunc = cpdOptimize },
|
||||||
|
{ .pName = "OrderByPrimaryKey", .optimizeFunc = opkOptimize }
|
||||||
};
|
};
|
||||||
|
|
||||||
static const int32_t optimizeRuleNum = (sizeof(optimizeRuleSet) / sizeof(SOptimizeRule));
|
static const int32_t optimizeRuleNum = (sizeof(optimizeRuleSet) / sizeof(SOptimizeRule));
|
||||||
|
|
||||||
static int32_t applyOptimizeRule(SLogicNode* pLogicNode) {
|
static int32_t applyOptimizeRule(SPlanContext* pCxt, SLogicNode* pLogicNode) {
|
||||||
SOptimizeContext cxt = { .optimized = false };
|
SOptimizeContext cxt = { .pPlanCxt = pCxt, .optimized = false };
|
||||||
do {
|
do {
|
||||||
cxt.optimized = false;
|
cxt.optimized = false;
|
||||||
for (int32_t i = 0; i < optimizeRuleNum; ++i) {
|
for (int32_t i = 0; i < optimizeRuleNum; ++i) {
|
||||||
|
@ -594,5 +785,5 @@ static int32_t applyOptimizeRule(SLogicNode* pLogicNode) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t optimizeLogicPlan(SPlanContext* pCxt, SLogicNode* pLogicNode) {
|
int32_t optimizeLogicPlan(SPlanContext* pCxt, SLogicNode* pLogicNode) {
|
||||||
return applyOptimizeRule(pLogicNode);
|
return applyOptimizeRule(pCxt, pLogicNode);
|
||||||
}
|
}
|
||||||
|
|
|
@ -398,9 +398,6 @@ static int32_t createScanPhysiNodeFinalize(SPhysiPlanContext* pCxt, SScanLogicNo
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
pScanPhysiNode->uid = pScanLogicNode->pMeta->uid;
|
pScanPhysiNode->uid = pScanLogicNode->pMeta->uid;
|
||||||
pScanPhysiNode->tableType = pScanLogicNode->pMeta->tableType;
|
pScanPhysiNode->tableType = pScanLogicNode->pMeta->tableType;
|
||||||
pScanPhysiNode->order = TSDB_ORDER_ASC;
|
|
||||||
pScanPhysiNode->count = 1;
|
|
||||||
pScanPhysiNode->reverse = 0;
|
|
||||||
memcpy(&pScanPhysiNode->tableName, &pScanLogicNode->tableName, sizeof(SName));
|
memcpy(&pScanPhysiNode->tableName, &pScanLogicNode->tableName, sizeof(SName));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -432,7 +429,7 @@ static int32_t createTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubp
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
|
||||||
pTableScan->scanFlag = pScanLogicNode->scanFlag;
|
memcpy(pTableScan->scanSeq, pScanLogicNode->scanSeq, sizeof(pScanLogicNode->scanSeq));
|
||||||
pTableScan->scanRange = pScanLogicNode->scanRange;
|
pTableScan->scanRange = pScanLogicNode->scanRange;
|
||||||
pTableScan->ratio = pScanLogicNode->ratio;
|
pTableScan->ratio = pScanLogicNode->ratio;
|
||||||
vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
|
vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
|
||||||
|
|
|
@ -0,0 +1,32 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "planTestUtil.h"
|
||||||
|
#include "planner.h"
|
||||||
|
|
||||||
|
using namespace std;
|
||||||
|
|
||||||
|
class PlanOptimizeTest : public PlannerTestBase {
|
||||||
|
|
||||||
|
};
|
||||||
|
|
||||||
|
TEST_F(PlanOptimizeTest, orderByPrimaryKey) {
|
||||||
|
useDb("root", "test");
|
||||||
|
|
||||||
|
run("select * from t1 order by ts");
|
||||||
|
run("select * from t1 order by ts desc");
|
||||||
|
run("select c1 from t1 order by ts");
|
||||||
|
run("select c1 from t1 order by ts desc");
|
||||||
|
}
|
|
@ -1247,26 +1247,28 @@ int32_t timeDiffFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t nowFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
|
int32_t nowFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
|
||||||
if (inputNum != 1) {
|
int64_t ts = taosGetTimestamp(TSDB_TIME_PRECISION_MILLI);
|
||||||
return TSDB_CODE_FAILED;
|
for (int32_t i = 0; i < pInput->numOfRows; ++i) {
|
||||||
|
colDataAppendInt64(pOutput->columnData, i, &ts);
|
||||||
}
|
}
|
||||||
colDataAppendInt64(pOutput->columnData, pOutput->numOfRows, (int64_t *)colDataGetData(pInput->columnData, 0));
|
pOutput->numOfRows = pInput->numOfRows;
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t todayFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
|
int32_t todayFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
|
||||||
if (inputNum != 1) {
|
int64_t ts = taosGetTimestampToday(TSDB_TIME_PRECISION_MILLI);
|
||||||
return TSDB_CODE_FAILED;
|
for (int32_t i = 0; i < pInput->numOfRows; ++i) {
|
||||||
|
colDataAppendInt64(pOutput->columnData, i, &ts);
|
||||||
}
|
}
|
||||||
colDataAppendInt64(pOutput->columnData, pOutput->numOfRows, (int64_t *)colDataGetData(pInput->columnData, 0));
|
pOutput->numOfRows = pInput->numOfRows;
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t timezoneFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
|
int32_t timezoneFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
|
||||||
if (inputNum != 1) {
|
for (int32_t i = 0; i < pInput->numOfRows; ++i) {
|
||||||
return TSDB_CODE_FAILED;
|
colDataAppend(pOutput->columnData, i, tsTimezoneStr, false);
|
||||||
}
|
}
|
||||||
colDataAppend(pOutput->columnData, pOutput->numOfRows, (char *)colDataGetData(pInput->columnData, 0), false);
|
pOutput->numOfRows = pInput->numOfRows;
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -292,7 +292,7 @@ print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $
|
||||||
print ==> rows[1]: $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4] $data[1][5] $data[1][6]
|
print ==> rows[1]: $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4] $data[1][5] $data[1][6]
|
||||||
if $rows != 2 then
|
if $rows != 2 then
|
||||||
sleep 1000
|
sleep 1000
|
||||||
goto wait_consumer_end_from_ctb
|
goto wait_consumer_end_from_ntb
|
||||||
endi
|
endi
|
||||||
if $data[0][1] == 0 then
|
if $data[0][1] == 0 then
|
||||||
if $data[1][1] != 1 then
|
if $data[1][1] != 1 then
|
||||||
|
|
|
@ -125,13 +125,13 @@ void saveConfigToLogFile() {
|
||||||
for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) {
|
for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) {
|
||||||
taosFprintfFile(g_fp, "# consumer %d info:\n", g_stConfInfo.stThreads[i].consumerId);
|
taosFprintfFile(g_fp, "# consumer %d info:\n", g_stConfInfo.stThreads[i].consumerId);
|
||||||
taosFprintfFile(g_fp, " Topics: ");
|
taosFprintfFile(g_fp, " Topics: ");
|
||||||
for (int i = 0 ; i < g_stConfInfo.stThreads[i].numOfTopic; i++) {
|
for (int j = 0 ; j < g_stConfInfo.stThreads[i].numOfTopic; j++) {
|
||||||
taosFprintfFile(g_fp, "%s, ", g_stConfInfo.stThreads[i].topics[i]);
|
taosFprintfFile(g_fp, "%s, ", g_stConfInfo.stThreads[i].topics[j]);
|
||||||
}
|
}
|
||||||
taosFprintfFile(g_fp, "\n");
|
taosFprintfFile(g_fp, "\n");
|
||||||
taosFprintfFile(g_fp, " Key: ");
|
taosFprintfFile(g_fp, " Key: ");
|
||||||
for (int i = 0 ; i < g_stConfInfo.stThreads[i].numOfKey; i++) {
|
for (int k = 0 ; k < g_stConfInfo.stThreads[i].numOfKey; k++) {
|
||||||
taosFprintfFile(g_fp, "%s:%s, ", g_stConfInfo.stThreads[i].key[i], g_stConfInfo.stThreads[i].value[i]);
|
taosFprintfFile(g_fp, "%s:%s, ", g_stConfInfo.stThreads[i].key[k], g_stConfInfo.stThreads[i].value[k]);
|
||||||
}
|
}
|
||||||
taosFprintfFile(g_fp, "\n");
|
taosFprintfFile(g_fp, "\n");
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue