diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index b72bd78b1b..771e92a7e4 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -2892,7 +2892,7 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR tscDebug("0x%"PRIx64" sub:0x%"PRIx64" retrieve numOfRows:%d totalNumOfRows:%" PRIu64 " from ep:%s, orderOfSub:%d", pParentSql->self, pSql->self, pRes->numOfRows, pState->numOfRetrievedRows, pSql->epSet.fqdn[pSql->epSet.inUse], idx); - if (num > tsMaxNumOfOrderedResults && tscIsProjectionQueryOnSTable(pQueryInfo, 0) && !(tscGetQueryInfo(&pParentSql->cmd)->distinctTag)) { + if (num > tsMaxNumOfOrderedResults && /*tscIsProjectionQueryOnSTable(pQueryInfo, 0) &&*/ !(tscGetQueryInfo(&pParentSql->cmd)->distinct)) { tscError("0x%"PRIx64" sub:0x%"PRIx64" num of OrderedRes is too many, max allowed:%" PRId32 " , current:%" PRId64, pParentSql->self, pSql->self, tsMaxNumOfOrderedResults, num); tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_TSC_SORTED_RES_TOO_MANY); diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index a78e5fa5f2..e8750b7758 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -4597,7 +4597,7 @@ int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAt pQueryAttr->queryBlockDist = isBlockDistQuery(pQueryInfo); pQueryAttr->pointInterpQuery = tscIsPointInterpQuery(pQueryInfo); pQueryAttr->timeWindowInterpo = timeWindowInterpoRequired(pQueryInfo); - pQueryAttr->distinctTag = pQueryInfo->distinctTag; + pQueryAttr->distinct = pQueryInfo->distinct; pQueryAttr->sw = pQueryInfo->sessionWindow; pQueryAttr->stateWindow = pQueryInfo->stateWindow; diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index 4581ba258d..ce70a9ba4a 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -216,7 +216,7 @@ typedef struct SQueryAttr { bool simpleAgg; bool pointInterpQuery; // point interpolation query bool needReverseScan; // need reverse scan - bool distinctTag; // distinct tag query + bool distinct; // distinct query or not bool stateWindow; // window State on sub/normal table bool createFilterOperator; // if filter operator is needed int32_t interBufSize; // intermediate buffer sizse @@ -514,6 +514,7 @@ typedef struct SDistinctOperatorInfo { bool recordNullVal; //has already record the null value, no need to try again int64_t threshold; int64_t outputCapacity; + int32_t colIndex; } SDistinctOperatorInfo; struct SGlobalMerger; diff --git a/src/query/inc/qTableMeta.h b/src/query/inc/qTableMeta.h index 56eea6429f..520dade668 100644 --- a/src/query/inc/qTableMeta.h +++ b/src/query/inc/qTableMeta.h @@ -121,7 +121,8 @@ typedef struct SQueryInfo { int64_t vgroupLimit; // table limit in case of super table projection query + global order + limit int32_t udColumnId; // current user-defined constant output field column id, monotonically decreases from TSDB_UD_COLUMN_INDEX - bool distinctTag; // distinct tag or not + bool distinct; // distinct tag or not + bool onlyHasTagCond; int32_t round; // 0/1/.... int32_t bufLen; char* buf; diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 288a206491..1ed3f7759f 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -6479,7 +6479,7 @@ static SSDataBlock* doTagScan(void* param, bool* newgroup) { pOperator->status = OP_EXEC_DONE; qDebug("QInfo:0x%"PRIx64" create count(tbname) query, res:%d rows:1", GET_QID(pRuntimeEnv), count); } else { // return only the tags|table name etc. - SExprInfo* pExprInfo = pOperator->pExpr; // todo use the column list instead of exprinfo + SExprInfo* pExprInfo = &pOperator->pExpr[0]; // todo use the column list instead of exprinfo count = 0; while(pInfo->curPos < pInfo->totalTables && count < maxNumOfTables) { @@ -6565,13 +6565,25 @@ static SSDataBlock* hashDistinct(void* param, bool* newgroup) { publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { + setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); + pOperator->status = OP_EXEC_DONE; + break; + } + if (pInfo->colIndex == -1) { + for (int i = 0; i < taosArrayGetSize(pBlock->pDataBlock); i++) { + SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, i); + if (pColDataInfo->info.colId == pOperator->pExpr[0].base.resColId) { + pInfo->colIndex = i; + break; + } + } + } + if (pInfo->colIndex == -1) { setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); pOperator->status = OP_EXEC_DONE; return NULL; } - - assert(pBlock->info.numOfCols == 1); - SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, 0); + SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->colIndex); int16_t bytes = pColInfoData->info.bytes; int16_t type = pColInfoData->info.type; @@ -6614,7 +6626,8 @@ static SSDataBlock* hashDistinct(void* param, bool* newgroup) { SOperatorInfo* createDistinctOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SDistinctOperatorInfo* pInfo = calloc(1, sizeof(SDistinctOperatorInfo)); - + pInfo->colIndex = -1; + pInfo->threshold = 10000000; // distinct result threshold pInfo->outputCapacity = 4096; pInfo->pSet = taosHashInit(64, taosGetDefaultHashFunction(pExpr->base.colType), false, HASH_NO_LOCK); pInfo->pRes = createOutputBuf(pExpr, numOfOutput, (int32_t) pInfo->outputCapacity); @@ -6628,6 +6641,7 @@ SOperatorInfo* createDistinctOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperat pOperator->info = pInfo; pOperator->pRuntimeEnv = pRuntimeEnv; pOperator->exec = hashDistinct; + pOperator->pExpr = pExpr; pOperator->cleanup = destroyDistinctOperatorInfo; appendUpstream(pOperator, upstream); diff --git a/src/query/src/qPlan.c b/src/query/src/qPlan.c index e724b0418c..a94d015e06 100644 --- a/src/query/src/qPlan.c +++ b/src/query/src/qPlan.c @@ -104,7 +104,7 @@ static SQueryNode* doAddTableColumnNode(SQueryInfo* pQueryInfo, STableMetaInfo* int32_t num = (int32_t) taosArrayGetSize(pExprs); SQueryNode* pNode = createQueryNode(QNODE_TAGSCAN, "TableTagScan", NULL, 0, pExprs->pData, num, info, NULL); - if (pQueryInfo->distinctTag) { + if (pQueryInfo->distinct) { pNode = createQueryNode(QNODE_DISTINCT, "Distinct", &pNode, 1, pExprs->pData, num, info, NULL); } @@ -551,9 +551,11 @@ SArray* createExecOperatorPlan(SQueryAttr* pQueryAttr) { int32_t op = 0; if (onlyQueryTags(pQueryAttr)) { // do nothing for tags query - op = OP_TagScan; - taosArrayPush(plan, &op); - if (pQueryAttr->distinctTag) { + if (onlyQueryTags(pQueryAttr)) { + op = OP_TagScan; + taosArrayPush(plan, &op); + } + if (pQueryAttr->distinct) { op = OP_Distinct; taosArrayPush(plan, &op); } @@ -630,8 +632,13 @@ SArray* createExecOperatorPlan(SQueryAttr* pQueryAttr) { } else { op = OP_Project; taosArrayPush(plan, &op); + if (pQueryAttr->distinct) { + op = OP_Distinct; + taosArrayPush(plan, &op); + } } } + if (pQueryAttr->limit.limit > 0 || pQueryAttr->limit.offset > 0) { op = OP_Limit; @@ -651,7 +658,7 @@ SArray* createGlobalMergePlan(SQueryAttr* pQueryAttr) { int32_t op = OP_MultiwayMergeSort; taosArrayPush(plan, &op); - if (pQueryAttr->distinctTag) { + if (pQueryAttr->distinct) { op = OP_Distinct; taosArrayPush(plan, &op); }