[td-5881]<feature>: Sort the result according to any single column in the outer query result is allowed.
This commit is contained in:
parent
842098942b
commit
d82e04be8c
|
@ -29,15 +29,16 @@ extern "C" {
|
|||
#include "tsched.h"
|
||||
#include "tsclient.h"
|
||||
|
||||
#define UTIL_TABLE_IS_SUPER_TABLE(metaInfo) \
|
||||
#define UTIL_TABLE_IS_SUPER_TABLE(metaInfo) \
|
||||
(((metaInfo)->pTableMeta != NULL) && ((metaInfo)->pTableMeta->tableType == TSDB_SUPER_TABLE))
|
||||
|
||||
#define UTIL_TABLE_IS_CHILD_TABLE(metaInfo) \
|
||||
(((metaInfo)->pTableMeta != NULL) && ((metaInfo)->pTableMeta->tableType == TSDB_CHILD_TABLE))
|
||||
|
||||
#define UTIL_TABLE_IS_NORMAL_TABLE(metaInfo)\
|
||||
(!(UTIL_TABLE_IS_SUPER_TABLE(metaInfo) || UTIL_TABLE_IS_CHILD_TABLE(metaInfo)))
|
||||
#define UTIL_TABLE_IS_NORMAL_TABLE(metaInfo) \
|
||||
(!(UTIL_TABLE_IS_SUPER_TABLE(metaInfo) || UTIL_TABLE_IS_CHILD_TABLE(metaInfo) || UTIL_TABLE_IS_TMP_TABLE(metaInfo)))
|
||||
|
||||
#define UTIL_TABLE_IS_TMP_TABLE(metaInfo) \
|
||||
#define UTIL_TABLE_IS_TMP_TABLE(metaInfo) \
|
||||
(((metaInfo)->pTableMeta != NULL) && ((metaInfo)->pTableMeta->tableType == TSDB_TEMP_TABLE))
|
||||
|
||||
#pragma pack(push,1)
|
||||
|
|
|
@ -5747,14 +5747,19 @@ static void setDefaultOrderInfo(SQueryInfo* pQueryInfo) {
|
|||
pQueryInfo->order.order = TSDB_ORDER_ASC;
|
||||
if (isTopBottomQuery(pQueryInfo)) {
|
||||
pQueryInfo->order.orderColId = PRIMARYKEY_TIMESTAMP_COL_INDEX;
|
||||
} else { // in case of select tbname from super_table, the defualt order column can not be the primary ts column
|
||||
pQueryInfo->order.orderColId = INT32_MIN;
|
||||
} else { // in case of select tbname from super_table, the default order column can not be the primary ts column
|
||||
pQueryInfo->order.orderColId = INT32_MIN; // todo define a macro
|
||||
}
|
||||
|
||||
/* for super table query, set default ascending order for group output */
|
||||
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
|
||||
pQueryInfo->groupbyExpr.orderType = TSDB_ORDER_ASC;
|
||||
}
|
||||
|
||||
if (pQueryInfo->distinct) {
|
||||
pQueryInfo->order.order = TSDB_ORDER_ASC;
|
||||
pQueryInfo->order.orderColId = PRIMARYKEY_TIMESTAMP_COL_INDEX;
|
||||
}
|
||||
}
|
||||
|
||||
int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSqlNode, SSchema* pSchema) {
|
||||
|
@ -5766,17 +5771,12 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq
|
|||
setDefaultOrderInfo(pQueryInfo);
|
||||
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
||||
|
||||
|
||||
if (pQueryInfo->distinct == true) {
|
||||
pQueryInfo->order.order = TSDB_ORDER_ASC;
|
||||
pQueryInfo->order.orderColId = 0;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
if (pSqlNode->pSortOrder == NULL) {
|
||||
if (pQueryInfo->distinct || pSqlNode->pSortOrder == NULL) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SArray* pSortorder = pSqlNode->pSortOrder;
|
||||
char* pMsgBuf = tscGetErrorMsgPayload(pCmd);
|
||||
SArray* pSortOrder = pSqlNode->pSortOrder;
|
||||
|
||||
/*
|
||||
* for table query, there is only one or none order option is allowed, which is the
|
||||
|
@ -5784,19 +5784,19 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq
|
|||
*
|
||||
* for super table query, the order option must be less than 3.
|
||||
*/
|
||||
size_t size = taosArrayGetSize(pSortorder);
|
||||
if (UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo)) {
|
||||
size_t size = taosArrayGetSize(pSortOrder);
|
||||
if (UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo) || UTIL_TABLE_IS_TMP_TABLE(pTableMetaInfo)) {
|
||||
if (size > 1) {
|
||||
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg0);
|
||||
return invalidOperationMsg(pMsgBuf, msg0);
|
||||
}
|
||||
} else {
|
||||
if (size > 2) {
|
||||
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg3);
|
||||
return invalidOperationMsg(pMsgBuf, msg3);
|
||||
}
|
||||
}
|
||||
|
||||
// handle the first part of order by
|
||||
tVariant* pVar = taosArrayGet(pSortorder, 0);
|
||||
tVariant* pVar = taosArrayGet(pSortOrder, 0);
|
||||
|
||||
// e.g., order by 1 asc, return directly with out further check.
|
||||
if (pVar->nType >= TSDB_DATA_TYPE_TINYINT && pVar->nType <= TSDB_DATA_TYPE_BIGINT) {
|
||||
|
@ -5808,7 +5808,7 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq
|
|||
|
||||
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { // super table query
|
||||
if (getColumnIndexByName(&columnName, pQueryInfo, &index, tscGetErrorMsgPayload(pCmd)) != TSDB_CODE_SUCCESS) {
|
||||
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1);
|
||||
return invalidOperationMsg(pMsgBuf, msg1);
|
||||
}
|
||||
|
||||
bool orderByTags = false;
|
||||
|
@ -5820,7 +5820,7 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq
|
|||
|
||||
// it is a tag column
|
||||
if (pQueryInfo->groupbyExpr.columnInfo == NULL) {
|
||||
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2);
|
||||
return invalidOperationMsg(pMsgBuf, msg2);
|
||||
}
|
||||
SColIndex* pColIndex = taosArrayGet(pQueryInfo->groupbyExpr.columnInfo, 0);
|
||||
if (relTagIndex == pColIndex->colIndex) {
|
||||
|
@ -5841,13 +5841,14 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq
|
|||
orderByGroupbyCol = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (!(orderByTags || orderByTS || orderByGroupbyCol) && !isTopBottomQuery(pQueryInfo)) {
|
||||
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg3);
|
||||
return invalidOperationMsg(pMsgBuf, msg3);
|
||||
} else { // order by top/bottom result value column is not supported in case of interval query.
|
||||
assert(!(orderByTags && orderByTS && orderByGroupbyCol));
|
||||
}
|
||||
|
||||
size_t s = taosArrayGetSize(pSortorder);
|
||||
size_t s = taosArrayGetSize(pSortOrder);
|
||||
if (s == 1) {
|
||||
if (orderByTags) {
|
||||
pQueryInfo->groupbyExpr.orderIndex = index.columnIndex - tscGetNumOfColumns(pTableMetaInfo->pTableMeta);
|
||||
|
@ -5866,7 +5867,7 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq
|
|||
|
||||
pExpr = tscExprGet(pQueryInfo, 1);
|
||||
if (pExpr->base.colInfo.colIndex != index.columnIndex && index.columnIndex != PRIMARYKEY_TIMESTAMP_COL_INDEX) {
|
||||
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2);
|
||||
return invalidOperationMsg(pMsgBuf, msg2);
|
||||
}
|
||||
|
||||
tVariantListItem* p1 = taosArrayGet(pSqlNode->pSortOrder, 0);
|
||||
|
@ -5884,9 +5885,7 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq
|
|||
addPrimaryTsColIntoResult(pQueryInfo, pCmd);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (s == 2) {
|
||||
} else {
|
||||
tVariantListItem *pItem = taosArrayGet(pSqlNode->pSortOrder, 0);
|
||||
if (orderByTags) {
|
||||
pQueryInfo->groupbyExpr.orderIndex = index.columnIndex - tscGetNumOfColumns(pTableMetaInfo->pTableMeta);
|
||||
|
@ -5903,22 +5902,23 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq
|
|||
tVariant* pVar2 = &pItem->pVar;
|
||||
SStrToken cname = {pVar2->nLen, pVar2->nType, pVar2->pz};
|
||||
if (getColumnIndexByName(&cname, pQueryInfo, &index, tscGetErrorMsgPayload(pCmd)) != TSDB_CODE_SUCCESS) {
|
||||
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1);
|
||||
return invalidOperationMsg(pMsgBuf, msg1);
|
||||
}
|
||||
|
||||
if (index.columnIndex != PRIMARYKEY_TIMESTAMP_COL_INDEX) {
|
||||
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2);
|
||||
return invalidOperationMsg(pMsgBuf, msg2);
|
||||
} else {
|
||||
tVariantListItem* p1 = taosArrayGet(pSortorder, 1);
|
||||
tVariantListItem* p1 = taosArrayGet(pSortOrder, 1);
|
||||
pQueryInfo->order.order = p1->sortOrder;
|
||||
pQueryInfo->order.orderColId = PRIMARYKEY_TIMESTAMP_COL_INDEX;
|
||||
}
|
||||
}
|
||||
|
||||
} else { // meter query
|
||||
if (getColumnIndexByName(&columnName, pQueryInfo, &index, tscGetErrorMsgPayload(pCmd)) != TSDB_CODE_SUCCESS) {
|
||||
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1);
|
||||
} else if (UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo) || UTIL_TABLE_IS_CHILD_TABLE(pTableMetaInfo)) { // check order by clause for normal table & temp table
|
||||
if (getColumnIndexByName(&columnName, pQueryInfo, &index, pMsgBuf) != TSDB_CODE_SUCCESS) {
|
||||
return invalidOperationMsg(pMsgBuf, msg1);
|
||||
}
|
||||
|
||||
if (index.columnIndex != PRIMARYKEY_TIMESTAMP_COL_INDEX && !isTopBottomQuery(pQueryInfo)) {
|
||||
bool validOrder = false;
|
||||
SArray *columnInfo = pQueryInfo->groupbyExpr.columnInfo;
|
||||
|
@ -5926,13 +5926,14 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq
|
|||
SColIndex* pColIndex = taosArrayGet(columnInfo, 0);
|
||||
validOrder = (pColIndex->colIndex == index.columnIndex);
|
||||
}
|
||||
|
||||
if (!validOrder) {
|
||||
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2);
|
||||
return invalidOperationMsg(pMsgBuf, msg2);
|
||||
}
|
||||
|
||||
tVariantListItem* p1 = taosArrayGet(pSqlNode->pSortOrder, 0);
|
||||
pQueryInfo->groupbyExpr.orderIndex = pSchema[index.columnIndex].colId;
|
||||
pQueryInfo->groupbyExpr.orderType = p1->sortOrder;
|
||||
|
||||
}
|
||||
|
||||
if (isTopBottomQuery(pQueryInfo)) {
|
||||
|
@ -5948,13 +5949,14 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq
|
|||
|
||||
pExpr = tscExprGet(pQueryInfo, 1);
|
||||
if (pExpr->base.colInfo.colIndex != index.columnIndex && index.columnIndex != PRIMARYKEY_TIMESTAMP_COL_INDEX) {
|
||||
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2);
|
||||
return invalidOperationMsg(pMsgBuf, msg2);
|
||||
}
|
||||
|
||||
validOrder = true;
|
||||
}
|
||||
|
||||
if (!validOrder) {
|
||||
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2);
|
||||
return invalidOperationMsg(pMsgBuf, msg2);
|
||||
}
|
||||
|
||||
tVariantListItem* pItem = taosArrayGet(pSqlNode->pSortOrder, 0);
|
||||
|
@ -5964,6 +5966,18 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
tVariantListItem* pItem = taosArrayGet(pSqlNode->pSortOrder, 0);
|
||||
pQueryInfo->order.order = pItem->sortOrder;
|
||||
pQueryInfo->order.orderColId = pSchema[index.columnIndex].colId;
|
||||
} else {
|
||||
// handle the temp table order by clause. You can order by any single column in case of the temp table, created by
|
||||
// inner subquery.
|
||||
assert(UTIL_TABLE_IS_TMP_TABLE(pTableMetaInfo) && taosArrayGetSize(pSqlNode->pSortOrder) == 1);
|
||||
|
||||
if (getColumnIndexByName(&columnName, pQueryInfo, &index, pMsgBuf) != TSDB_CODE_SUCCESS) {
|
||||
return invalidOperationMsg(pMsgBuf, msg1);
|
||||
}
|
||||
|
||||
tVariantListItem* pItem = taosArrayGet(pSqlNode->pSortOrder, 0);
|
||||
pQueryInfo->order.order = pItem->sortOrder;
|
||||
pQueryInfo->order.orderColId = pSchema[index.columnIndex].colId;
|
||||
|
@ -8736,8 +8750,7 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
|
|||
const char* msg8 = "condition missing for join query";
|
||||
const char* msg9 = "not support 3 level select";
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SSqlCmd* pCmd = &pSql->cmd;
|
||||
|
||||
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
||||
|
@ -9028,8 +9041,6 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
|
|||
pQueryInfo->simpleAgg = isSimpleAggregateRv(pQueryInfo);
|
||||
pQueryInfo->onlyTagQuery = onlyTagPrjFunction(pQueryInfo);
|
||||
pQueryInfo->groupbyColumn = tscGroupbyColumn(pQueryInfo);
|
||||
//pQueryInfo->globalMerge = tscIsTwoStageSTableQuery(pQueryInfo, 0);
|
||||
|
||||
pQueryInfo->arithmeticOnAgg = tsIsArithmeticQueryOnAggResult(pQueryInfo);
|
||||
pQueryInfo->orderProjectQuery = tscOrderedProjectionQueryOnSTable(pQueryInfo, 0);
|
||||
|
||||
|
|
|
@ -903,16 +903,16 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
|||
}
|
||||
|
||||
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd);
|
||||
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
||||
STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
|
||||
|
||||
SQueryAttr query = {{0}};
|
||||
tscCreateQueryFromQueryInfo(pQueryInfo, &query, pSql);
|
||||
query.vgId = pTableMeta->vgId;
|
||||
|
||||
SArray* tableScanOperator = createTableScanPlan(&query);
|
||||
SArray* queryOperator = createExecOperatorPlan(&query);
|
||||
|
||||
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
||||
STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
|
||||
|
||||
SQueryTableMsg *pQueryMsg = (SQueryTableMsg *)pCmd->payload;
|
||||
tstrncpy(pQueryMsg->version, version, tListLen(pQueryMsg->version));
|
||||
|
||||
|
|
|
@ -1228,11 +1228,9 @@ void handleDownstreamOperator(SSqlObj** pSqlObjList, int32_t numOfUpstream, SQue
|
|||
if (pCond && pCond->cond) {
|
||||
createQueryFilter(pCond->cond, pCond->len, &pFilters);
|
||||
}
|
||||
//createInputDataFlterInfo(px, numOfCol1, &numOfFilterCols, &pFilterInfo);
|
||||
}
|
||||
|
||||
SOperatorInfo* pSourceOperator = createDummyInputOperator(pSqlObjList[0], pSchema, numOfCol1, pFilters);
|
||||
|
||||
pOutput->precision = pSqlObjList[0]->res.precision;
|
||||
|
||||
SSchema* schema = NULL;
|
||||
|
|
|
@ -421,7 +421,6 @@ typedef struct STableScanInfo {
|
|||
int32_t *rowCellInfoOffset;
|
||||
SExprInfo *pExpr;
|
||||
SSDataBlock block;
|
||||
bool loadExternalRows; // load external rows (prev & next rows)
|
||||
int32_t numOfOutput;
|
||||
int64_t elapsedTime;
|
||||
|
||||
|
@ -579,6 +578,7 @@ SOperatorInfo* createFilterOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator
|
|||
int32_t numOfOutput, SColumnInfo* pCols, int32_t numOfFilter);
|
||||
|
||||
SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pUpstream, int32_t numOfUpstream, SSchema* pSchema, int32_t numOfOutput);
|
||||
SOperatorInfo *createOrderOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
|
||||
|
||||
SSDataBlock* doGlobalAggregate(void* param, bool* newgroup);
|
||||
SSDataBlock* doMultiwayMergeSort(void* param, bool* newgroup);
|
||||
|
|
|
@ -223,6 +223,7 @@ static void destroySFillOperatorInfo(void* param, int32_t numOfOutput);
|
|||
static void destroyGroupbyOperatorInfo(void* param, int32_t numOfOutput);
|
||||
static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput);
|
||||
static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput);
|
||||
static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput);
|
||||
static void destroySWindowOperatorInfo(void* param, int32_t numOfOutput);
|
||||
static void destroyStateWindowOperatorInfo(void* param, int32_t numOfOutput);
|
||||
static void destroyAggOperatorInfo(void* param, int32_t numOfOutput);
|
||||
|
@ -2036,6 +2037,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
|
|||
}
|
||||
break;
|
||||
}
|
||||
|
||||
case OP_StateWindow: {
|
||||
pRuntimeEnv->proot = createStatewindowOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
|
||||
int32_t opType = pRuntimeEnv->proot->upstream[0]->operatorType;
|
||||
|
@ -2052,24 +2054,20 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
|
|||
|
||||
case OP_Filter: { // todo refactor
|
||||
int32_t numOfFilterCols = 0;
|
||||
// if (pQueryAttr->numOfFilterCols > 0) {
|
||||
// pRuntimeEnv->proot = createFilterOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1,
|
||||
// pQueryAttr->numOfOutput, pQueryAttr->tableCols, pQueryAttr->numOfFilterCols);
|
||||
// } else {
|
||||
if (pQueryAttr->stableQuery) {
|
||||
SColumnInfo* pColInfo =
|
||||
extractColumnFilterInfo(pQueryAttr->pExpr3, pQueryAttr->numOfExpr3, &numOfFilterCols);
|
||||
pRuntimeEnv->proot = createFilterOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr3,
|
||||
pQueryAttr->numOfExpr3, pColInfo, numOfFilterCols);
|
||||
freeColumnInfo(pColInfo, pQueryAttr->numOfExpr3);
|
||||
} else {
|
||||
SColumnInfo* pColInfo =
|
||||
extractColumnFilterInfo(pQueryAttr->pExpr1, pQueryAttr->numOfOutput, &numOfFilterCols);
|
||||
pRuntimeEnv->proot = createFilterOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1,
|
||||
pQueryAttr->numOfOutput, pColInfo, numOfFilterCols);
|
||||
freeColumnInfo(pColInfo, pQueryAttr->numOfOutput);
|
||||
}
|
||||
// }
|
||||
if (pQueryAttr->stableQuery) {
|
||||
SColumnInfo* pColInfo =
|
||||
extractColumnFilterInfo(pQueryAttr->pExpr3, pQueryAttr->numOfExpr3, &numOfFilterCols);
|
||||
pRuntimeEnv->proot = createFilterOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr3,
|
||||
pQueryAttr->numOfExpr3, pColInfo, numOfFilterCols);
|
||||
freeColumnInfo(pColInfo, pQueryAttr->numOfExpr3);
|
||||
} else {
|
||||
SColumnInfo* pColInfo =
|
||||
extractColumnFilterInfo(pQueryAttr->pExpr1, pQueryAttr->numOfOutput, &numOfFilterCols);
|
||||
pRuntimeEnv->proot = createFilterOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1,
|
||||
pQueryAttr->numOfOutput, pColInfo, numOfFilterCols);
|
||||
freeColumnInfo(pColInfo, pQueryAttr->numOfOutput);
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -2081,11 +2079,12 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
|
|||
|
||||
case OP_MultiwayMergeSort: {
|
||||
bool groupMix = true;
|
||||
if(pQueryAttr->slimit.offset != 0 || pQueryAttr->slimit.limit != -1) {
|
||||
if (pQueryAttr->slimit.offset != 0 || pQueryAttr->slimit.limit != -1) {
|
||||
groupMix = false;
|
||||
}
|
||||
|
||||
pRuntimeEnv->proot = createMultiwaySortOperatorInfo(pRuntimeEnv, pQueryAttr->pExpr1, pQueryAttr->numOfOutput,
|
||||
4096, merger, groupMix); // TODO hack it
|
||||
4096, merger, groupMix); // TODO hack it
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -2106,6 +2105,11 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
|
|||
break;
|
||||
}
|
||||
|
||||
case OP_Order: {
|
||||
pRuntimeEnv->proot = createOrderOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
|
||||
break;
|
||||
}
|
||||
|
||||
default: {
|
||||
assert(0);
|
||||
}
|
||||
|
@ -5213,8 +5217,8 @@ static int32_t doMergeSDatablock(SSDataBlock* pDest, SSDataBlock* pSrc) {
|
|||
|
||||
int32_t numOfCols = pSrc->info.numOfCols;
|
||||
for(int32_t i = 0; i < numOfCols; ++i) {
|
||||
SColumnInfoData* pCol2 = taosArrayGet(pDest->pDataBlock->pData, i);
|
||||
SColumnInfoData* pCol1 = taosArrayGet(pSrc->pDataBlock->pData, i);
|
||||
SColumnInfoData* pCol2 = taosArrayGet(pDest->pDataBlock, i);
|
||||
SColumnInfoData* pCol1 = taosArrayGet(pSrc->pDataBlock, i);
|
||||
|
||||
int32_t newSize = (pDest->info.rows + pSrc->info.rows) * pCol2->info.bytes;
|
||||
char* tmp = realloc(pCol2->pData, newSize);
|
||||
|
@ -5227,6 +5231,7 @@ static int32_t doMergeSDatablock(SSDataBlock* pDest, SSDataBlock* pSrc) {
|
|||
}
|
||||
}
|
||||
|
||||
pDest->info.rows += pSrc->info.rows;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -5277,7 +5282,7 @@ static SSDataBlock* doSort(void* param, bool* newgroup) {
|
|||
return (pInfo->pDataBlock->info.rows > 0)? pInfo->pDataBlock:NULL;
|
||||
}
|
||||
|
||||
SOperatorInfo *createOrderOperatorInfo(SExprInfo* pExpr, int32_t numOfOutput) {
|
||||
SOperatorInfo *createOrderOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
|
||||
SOrderOperatorInfo* pInfo = calloc(1, sizeof(SOrderOperatorInfo));
|
||||
|
||||
{
|
||||
|
@ -5292,17 +5297,20 @@ SOperatorInfo *createOrderOperatorInfo(SExprInfo* pExpr, int32_t numOfOutput) {
|
|||
}
|
||||
|
||||
pDataBlock->info.numOfCols = numOfOutput;
|
||||
pInfo->pDataBlock = pDataBlock;
|
||||
}
|
||||
|
||||
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
|
||||
pOperator->name = "Order";
|
||||
pOperator->name = "InMemoryOrder";
|
||||
pOperator->operatorType = OP_Order;
|
||||
pOperator->blockingOptr = true;
|
||||
pOperator->status = OP_IN_EXECUTING;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->exec = doSort;
|
||||
pOperator->cleanup = NULL;
|
||||
pOperator->cleanup = destroyOrderOperatorInfo;
|
||||
pOperator->pRuntimeEnv = pRuntimeEnv;
|
||||
|
||||
appendUpstream(pOperator, upstream);
|
||||
return pOperator;
|
||||
}
|
||||
|
||||
|
@ -6197,6 +6205,11 @@ static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) {
|
|||
pInfo->pRes = destroyOutputBuf(pInfo->pRes);
|
||||
}
|
||||
|
||||
static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) {
|
||||
SOrderOperatorInfo* pInfo = (SOrderOperatorInfo*) param;
|
||||
pInfo->pDataBlock = destroyOutputBuf(pInfo->pDataBlock);
|
||||
}
|
||||
|
||||
static void destroyConditionOperatorInfo(void* param, int32_t numOfOutput) {
|
||||
SFilterOperatorInfo* pInfo = (SFilterOperatorInfo*) param;
|
||||
doDestroyFilterInfo(pInfo->pFilterInfo, pInfo->numOfFilterCols);
|
||||
|
@ -6494,7 +6507,6 @@ SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorIn
|
|||
pOperator->numOfOutput = numOfOutput;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->pRuntimeEnv = pRuntimeEnv;
|
||||
|
||||
pOperator->exec = doFill;
|
||||
pOperator->cleanup = destroySFillOperatorInfo;
|
||||
|
||||
|
|
|
@ -557,10 +557,9 @@ SArray* createExecOperatorPlan(SQueryAttr* pQueryAttr) {
|
|||
int32_t op = 0;
|
||||
|
||||
if (onlyQueryTags(pQueryAttr)) { // do nothing for tags query
|
||||
if (onlyQueryTags(pQueryAttr)) {
|
||||
op = OP_TagScan;
|
||||
taosArrayPush(plan, &op);
|
||||
}
|
||||
op = OP_TagScan;
|
||||
taosArrayPush(plan, &op);
|
||||
|
||||
if (pQueryAttr->distinct) {
|
||||
op = OP_Distinct;
|
||||
taosArrayPush(plan, &op);
|
||||
|
@ -643,8 +642,14 @@ SArray* createExecOperatorPlan(SQueryAttr* pQueryAttr) {
|
|||
taosArrayPush(plan, &op);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// outer query order by support
|
||||
int32_t orderColId = pQueryAttr->order.orderColId;
|
||||
if (pQueryAttr->vgId == 0 && orderColId != PRIMARYKEY_TIMESTAMP_COL_INDEX && orderColId != INT32_MIN) {
|
||||
op = OP_Order;
|
||||
taosArrayPush(plan, &op);
|
||||
}
|
||||
}
|
||||
|
||||
if (pQueryAttr->limit.limit > 0 || pQueryAttr->limit.offset > 0) {
|
||||
op = OP_Limit;
|
||||
|
|
Loading…
Reference in New Issue