commit
220f667042
|
@ -1947,12 +1947,13 @@ bool isValidDistinctSql(SQueryInfo* pQueryInfo) {
|
|||
if (pQueryInfo == NULL) {
|
||||
return false;
|
||||
}
|
||||
if ((pQueryInfo->type & TSDB_QUERY_TYPE_STABLE_QUERY) != TSDB_QUERY_TYPE_STABLE_QUERY) {
|
||||
if ((pQueryInfo->type & TSDB_QUERY_TYPE_STABLE_QUERY) != TSDB_QUERY_TYPE_STABLE_QUERY
|
||||
&& (pQueryInfo->type & TSDB_QUERY_TYPE_TABLE_QUERY) != TSDB_QUERY_TYPE_TABLE_QUERY) {
|
||||
return false;
|
||||
}
|
||||
if (tscQueryTags(pQueryInfo) && tscNumOfExprs(pQueryInfo) == 1){
|
||||
if (tscNumOfExprs(pQueryInfo) == 1){
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -2046,7 +2047,7 @@ int32_t validateSelectNodeList(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SArray* pS
|
|||
const char* msg1 = "too many items in selection clause";
|
||||
const char* msg2 = "functions or others can not be mixed up";
|
||||
const char* msg3 = "not support query expression";
|
||||
const char* msg4 = "only support distinct one tag";
|
||||
const char* msg4 = "only support distinct one column or tag";
|
||||
const char* msg5 = "invalid function name";
|
||||
|
||||
// too many result columns not support order by in query
|
||||
|
@ -2106,13 +2107,13 @@ int32_t validateSelectNodeList(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SArray* pS
|
|||
}
|
||||
|
||||
if (hasDistinct == true) {
|
||||
if (!isValidDistinctSql(pQueryInfo)) {
|
||||
if (!isValidDistinctSql(pQueryInfo) ) {
|
||||
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg4);
|
||||
}
|
||||
|
||||
pQueryInfo->distinctTag = true;
|
||||
pQueryInfo->distinct = true;
|
||||
}
|
||||
|
||||
|
||||
// there is only one user-defined column in the final result field, add the timestamp column.
|
||||
size_t numOfSrcCols = taosArrayGetSize(pQueryInfo->colList);
|
||||
if ((numOfSrcCols <= 0 || !hasNoneUserDefineExpr(pQueryInfo)) && !tscQueryTags(pQueryInfo) && !tscQueryBlockInfo(pQueryInfo)) {
|
||||
|
@ -3976,8 +3977,10 @@ static int32_t getTablenameCond(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSqlExpr*
|
|||
|
||||
static int32_t getColumnQueryCondInfo(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSqlExpr* pExpr, int32_t relOptr) {
|
||||
if (pExpr == NULL) {
|
||||
pQueryInfo->onlyHasTagCond &= true;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
pQueryInfo->onlyHasTagCond &= false;
|
||||
|
||||
if (!tSqlExprIsParentOfLeaf(pExpr)) { // internal node
|
||||
int32_t ret = getColumnQueryCondInfo(pCmd, pQueryInfo, pExpr->pLeft, pExpr->tokenId);
|
||||
|
@ -4104,6 +4107,7 @@ static int32_t checkAndSetJoinCondInfo(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tS
|
|||
|
||||
static int32_t getJoinCondInfo(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSqlExpr* pExpr) {
|
||||
if (pExpr == NULL) {
|
||||
pQueryInfo->onlyHasTagCond &= true;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -4783,8 +4787,11 @@ static int32_t getTimeRangeFromExpr(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSqlE
|
|||
int32_t code = 0;
|
||||
|
||||
if (pExpr == NULL) {
|
||||
pQueryInfo->onlyHasTagCond &= true;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
pQueryInfo->onlyHasTagCond &= false;
|
||||
|
||||
|
||||
if (!tSqlExprIsParentOfLeaf(pExpr)) {
|
||||
if (pExpr->tokenId == TK_OR) {
|
||||
|
@ -4833,11 +4840,13 @@ static int32_t validateJoinExpr(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SCondExpr
|
|||
|
||||
if (!QUERY_IS_JOIN_QUERY(pQueryInfo->type)) {
|
||||
if (pQueryInfo->numOfTables == 1) {
|
||||
pQueryInfo->onlyHasTagCond &= true;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
} else {
|
||||
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg3);
|
||||
}
|
||||
}
|
||||
pQueryInfo->onlyHasTagCond &= false;
|
||||
|
||||
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
||||
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { // for stable join, tag columns
|
||||
|
@ -5150,7 +5159,7 @@ int32_t validateWhereNode(SQueryInfo* pQueryInfo, tSqlExpr** pExpr, SSqlObj* pSq
|
|||
if (pExpr == NULL) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
const char* msg1 = "invalid expression";
|
||||
const char* msg2 = "invalid filter expression";
|
||||
|
||||
|
@ -5183,6 +5192,7 @@ int32_t validateWhereNode(SQueryInfo* pQueryInfo, tSqlExpr** pExpr, SSqlObj* pSq
|
|||
if ((ret = getTimeRangeFromExpr(&pSql->cmd, pQueryInfo, condExpr.pTimewindow)) != TSDB_CODE_SUCCESS) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
// 3. get the tag query condition
|
||||
if ((ret = getTagQueryCondExpr(&pSql->cmd, pQueryInfo, &condExpr, pExpr)) != TSDB_CODE_SUCCESS) {
|
||||
|
@ -5492,7 +5502,7 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq
|
|||
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
||||
|
||||
|
||||
if (pQueryInfo->distinctTag == true) {
|
||||
if (pQueryInfo->distinct == true) {
|
||||
pQueryInfo->order.order = TSDB_ORDER_ASC;
|
||||
pQueryInfo->order.orderColId = 0;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -8565,7 +8575,7 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
|
|||
if (validateGroupbyNode(pQueryInfo, pSqlNode->pGroupby, pCmd) != TSDB_CODE_SUCCESS) {
|
||||
return TSDB_CODE_TSC_INVALID_OPERATION;
|
||||
}
|
||||
|
||||
pQueryInfo->onlyHasTagCond = true;
|
||||
// set where info
|
||||
if (pSqlNode->pWhere != NULL) {
|
||||
if (validateWhereNode(pQueryInfo, &pSqlNode->pWhere, pSql) != TSDB_CODE_SUCCESS) {
|
||||
|
@ -8588,6 +8598,10 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
|
|||
return TSDB_CODE_TSC_INVALID_OPERATION;
|
||||
}
|
||||
|
||||
if (isSTable && tscQueryTags(pQueryInfo) && pQueryInfo->distinct && !pQueryInfo->onlyHasTagCond) {
|
||||
return TSDB_CODE_TSC_INVALID_OPERATION;
|
||||
}
|
||||
|
||||
// parse the window_state
|
||||
if (validateStateWindowNode(pCmd, pQueryInfo, pSqlNode, isSTable) != TSDB_CODE_SUCCESS) {
|
||||
return TSDB_CODE_TSC_INVALID_OPERATION;
|
||||
|
|
|
@ -2892,7 +2892,7 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR
|
|||
tscDebug("0x%"PRIx64" sub:0x%"PRIx64" retrieve numOfRows:%d totalNumOfRows:%" PRIu64 " from ep:%s, orderOfSub:%d",
|
||||
pParentSql->self, pSql->self, pRes->numOfRows, pState->numOfRetrievedRows, pSql->epSet.fqdn[pSql->epSet.inUse], idx);
|
||||
|
||||
if (num > tsMaxNumOfOrderedResults && tscIsProjectionQueryOnSTable(pQueryInfo, 0) && !(tscGetQueryInfo(&pParentSql->cmd)->distinctTag)) {
|
||||
if (num > tsMaxNumOfOrderedResults && /*tscIsProjectionQueryOnSTable(pQueryInfo, 0) &&*/ !(tscGetQueryInfo(&pParentSql->cmd)->distinct)) {
|
||||
tscError("0x%"PRIx64" sub:0x%"PRIx64" num of OrderedRes is too many, max allowed:%" PRId32 " , current:%" PRId64,
|
||||
pParentSql->self, pSql->self, tsMaxNumOfOrderedResults, num);
|
||||
tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_TSC_SORTED_RES_TOO_MANY);
|
||||
|
|
|
@ -4614,7 +4614,7 @@ int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAt
|
|||
pQueryAttr->queryBlockDist = isBlockDistQuery(pQueryInfo);
|
||||
pQueryAttr->pointInterpQuery = tscIsPointInterpQuery(pQueryInfo);
|
||||
pQueryAttr->timeWindowInterpo = timeWindowInterpoRequired(pQueryInfo);
|
||||
pQueryAttr->distinctTag = pQueryInfo->distinctTag;
|
||||
pQueryAttr->distinct = pQueryInfo->distinct;
|
||||
pQueryAttr->sw = pQueryInfo->sessionWindow;
|
||||
pQueryAttr->stateWindow = pQueryInfo->stateWindow;
|
||||
|
||||
|
|
|
@ -216,7 +216,7 @@ typedef struct SQueryAttr {
|
|||
bool simpleAgg;
|
||||
bool pointInterpQuery; // point interpolation query
|
||||
bool needReverseScan; // need reverse scan
|
||||
bool distinctTag; // distinct tag query
|
||||
bool distinct; // distinct query or not
|
||||
bool stateWindow; // window State on sub/normal table
|
||||
bool createFilterOperator; // if filter operator is needed
|
||||
int32_t interBufSize; // intermediate buffer sizse
|
||||
|
@ -514,6 +514,7 @@ typedef struct SDistinctOperatorInfo {
|
|||
bool recordNullVal; //has already record the null value, no need to try again
|
||||
int64_t threshold;
|
||||
int64_t outputCapacity;
|
||||
int32_t colIndex;
|
||||
} SDistinctOperatorInfo;
|
||||
|
||||
struct SGlobalMerger;
|
||||
|
|
|
@ -121,7 +121,8 @@ typedef struct SQueryInfo {
|
|||
int64_t vgroupLimit; // table limit in case of super table projection query + global order + limit
|
||||
|
||||
int32_t udColumnId; // current user-defined constant output field column id, monotonically decreases from TSDB_UD_COLUMN_INDEX
|
||||
bool distinctTag; // distinct tag or not
|
||||
bool distinct; // distinct tag or not
|
||||
bool onlyHasTagCond;
|
||||
int32_t round; // 0/1/....
|
||||
int32_t bufLen;
|
||||
char* buf;
|
||||
|
|
|
@ -6479,7 +6479,7 @@ static SSDataBlock* doTagScan(void* param, bool* newgroup) {
|
|||
pOperator->status = OP_EXEC_DONE;
|
||||
qDebug("QInfo:0x%"PRIx64" create count(tbname) query, res:%d rows:1", GET_QID(pRuntimeEnv), count);
|
||||
} else { // return only the tags|table name etc.
|
||||
SExprInfo* pExprInfo = pOperator->pExpr; // todo use the column list instead of exprinfo
|
||||
SExprInfo* pExprInfo = &pOperator->pExpr[0]; // todo use the column list instead of exprinfo
|
||||
|
||||
count = 0;
|
||||
while(pInfo->curPos < pInfo->totalTables && count < maxNumOfTables) {
|
||||
|
@ -6565,13 +6565,25 @@ static SSDataBlock* hashDistinct(void* param, bool* newgroup) {
|
|||
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
|
||||
|
||||
if (pBlock == NULL) {
|
||||
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
|
||||
pOperator->status = OP_EXEC_DONE;
|
||||
break;
|
||||
}
|
||||
if (pInfo->colIndex == -1) {
|
||||
for (int i = 0; i < taosArrayGetSize(pBlock->pDataBlock); i++) {
|
||||
SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, i);
|
||||
if (pColDataInfo->info.colId == pOperator->pExpr[0].base.resColId) {
|
||||
pInfo->colIndex = i;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (pInfo->colIndex == -1) {
|
||||
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
|
||||
pOperator->status = OP_EXEC_DONE;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
assert(pBlock->info.numOfCols == 1);
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, 0);
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->colIndex);
|
||||
|
||||
int16_t bytes = pColInfoData->info.bytes;
|
||||
int16_t type = pColInfoData->info.type;
|
||||
|
@ -6623,7 +6635,8 @@ static SSDataBlock* hashDistinct(void* param, bool* newgroup) {
|
|||
|
||||
SOperatorInfo* createDistinctOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
|
||||
SDistinctOperatorInfo* pInfo = calloc(1, sizeof(SDistinctOperatorInfo));
|
||||
|
||||
pInfo->colIndex = -1;
|
||||
pInfo->threshold = 10000000; // distinct result threshold
|
||||
pInfo->outputCapacity = 4096;
|
||||
pInfo->pSet = taosHashInit(64, taosGetDefaultHashFunction(pExpr->base.colType), false, HASH_NO_LOCK);
|
||||
pInfo->pRes = createOutputBuf(pExpr, numOfOutput, (int32_t) pInfo->outputCapacity);
|
||||
|
@ -6638,6 +6651,7 @@ SOperatorInfo* createDistinctOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperat
|
|||
pOperator->info = pInfo;
|
||||
pOperator->pRuntimeEnv = pRuntimeEnv;
|
||||
pOperator->exec = hashDistinct;
|
||||
pOperator->pExpr = pExpr;
|
||||
pOperator->cleanup = destroyDistinctOperatorInfo;
|
||||
|
||||
appendUpstream(pOperator, upstream);
|
||||
|
|
|
@ -104,7 +104,7 @@ static SQueryNode* doAddTableColumnNode(SQueryInfo* pQueryInfo, STableMetaInfo*
|
|||
int32_t num = (int32_t) taosArrayGetSize(pExprs);
|
||||
SQueryNode* pNode = createQueryNode(QNODE_TAGSCAN, "TableTagScan", NULL, 0, pExprs->pData, num, info, NULL);
|
||||
|
||||
if (pQueryInfo->distinctTag) {
|
||||
if (pQueryInfo->distinct) {
|
||||
pNode = createQueryNode(QNODE_DISTINCT, "Distinct", &pNode, 1, pExprs->pData, num, info, NULL);
|
||||
}
|
||||
|
||||
|
@ -551,9 +551,11 @@ SArray* createExecOperatorPlan(SQueryAttr* pQueryAttr) {
|
|||
int32_t op = 0;
|
||||
|
||||
if (onlyQueryTags(pQueryAttr)) { // do nothing for tags query
|
||||
op = OP_TagScan;
|
||||
taosArrayPush(plan, &op);
|
||||
if (pQueryAttr->distinctTag) {
|
||||
if (onlyQueryTags(pQueryAttr)) {
|
||||
op = OP_TagScan;
|
||||
taosArrayPush(plan, &op);
|
||||
}
|
||||
if (pQueryAttr->distinct) {
|
||||
op = OP_Distinct;
|
||||
taosArrayPush(plan, &op);
|
||||
}
|
||||
|
@ -630,8 +632,13 @@ SArray* createExecOperatorPlan(SQueryAttr* pQueryAttr) {
|
|||
} else {
|
||||
op = OP_Project;
|
||||
taosArrayPush(plan, &op);
|
||||
if (pQueryAttr->distinct) {
|
||||
op = OP_Distinct;
|
||||
taosArrayPush(plan, &op);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
if (pQueryAttr->limit.limit > 0 || pQueryAttr->limit.offset > 0) {
|
||||
op = OP_Limit;
|
||||
|
@ -651,7 +658,7 @@ SArray* createGlobalMergePlan(SQueryAttr* pQueryAttr) {
|
|||
int32_t op = OP_MultiwayMergeSort;
|
||||
taosArrayPush(plan, &op);
|
||||
|
||||
if (pQueryAttr->distinctTag) {
|
||||
if (pQueryAttr->distinct) {
|
||||
op = OP_Distinct;
|
||||
taosArrayPush(plan, &op);
|
||||
}
|
||||
|
|
|
@ -51,7 +51,7 @@ class TDTestCase:
|
|||
tdSql.error("select last_row as latest from st")
|
||||
|
||||
# query distinct on normal colnum
|
||||
tdSql.error("select distinct tagtype from st")
|
||||
#tdSql.error("select distinct tagtype from st")
|
||||
|
||||
# query .. order by non-time field
|
||||
tdSql.error("select * from st order by name")
|
||||
|
|
Loading…
Reference in New Issue