TD-5438
This commit is contained in:
parent
0388ffa72f
commit
a54d3a2b96
|
@ -2892,7 +2892,7 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR
|
||||||
tscDebug("0x%"PRIx64" sub:0x%"PRIx64" retrieve numOfRows:%d totalNumOfRows:%" PRIu64 " from ep:%s, orderOfSub:%d",
|
tscDebug("0x%"PRIx64" sub:0x%"PRIx64" retrieve numOfRows:%d totalNumOfRows:%" PRIu64 " from ep:%s, orderOfSub:%d",
|
||||||
pParentSql->self, pSql->self, pRes->numOfRows, pState->numOfRetrievedRows, pSql->epSet.fqdn[pSql->epSet.inUse], idx);
|
pParentSql->self, pSql->self, pRes->numOfRows, pState->numOfRetrievedRows, pSql->epSet.fqdn[pSql->epSet.inUse], idx);
|
||||||
|
|
||||||
if (num > tsMaxNumOfOrderedResults && tscIsProjectionQueryOnSTable(pQueryInfo, 0) && !(tscGetQueryInfo(&pParentSql->cmd)->distinctTag)) {
|
if (num > tsMaxNumOfOrderedResults && /*tscIsProjectionQueryOnSTable(pQueryInfo, 0) &&*/ !(tscGetQueryInfo(&pParentSql->cmd)->distinct)) {
|
||||||
tscError("0x%"PRIx64" sub:0x%"PRIx64" num of OrderedRes is too many, max allowed:%" PRId32 " , current:%" PRId64,
|
tscError("0x%"PRIx64" sub:0x%"PRIx64" num of OrderedRes is too many, max allowed:%" PRId32 " , current:%" PRId64,
|
||||||
pParentSql->self, pSql->self, tsMaxNumOfOrderedResults, num);
|
pParentSql->self, pSql->self, tsMaxNumOfOrderedResults, num);
|
||||||
tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_TSC_SORTED_RES_TOO_MANY);
|
tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_TSC_SORTED_RES_TOO_MANY);
|
||||||
|
|
|
@ -4597,7 +4597,7 @@ int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAt
|
||||||
pQueryAttr->queryBlockDist = isBlockDistQuery(pQueryInfo);
|
pQueryAttr->queryBlockDist = isBlockDistQuery(pQueryInfo);
|
||||||
pQueryAttr->pointInterpQuery = tscIsPointInterpQuery(pQueryInfo);
|
pQueryAttr->pointInterpQuery = tscIsPointInterpQuery(pQueryInfo);
|
||||||
pQueryAttr->timeWindowInterpo = timeWindowInterpoRequired(pQueryInfo);
|
pQueryAttr->timeWindowInterpo = timeWindowInterpoRequired(pQueryInfo);
|
||||||
pQueryAttr->distinctTag = pQueryInfo->distinctTag;
|
pQueryAttr->distinct = pQueryInfo->distinct;
|
||||||
pQueryAttr->sw = pQueryInfo->sessionWindow;
|
pQueryAttr->sw = pQueryInfo->sessionWindow;
|
||||||
pQueryAttr->stateWindow = pQueryInfo->stateWindow;
|
pQueryAttr->stateWindow = pQueryInfo->stateWindow;
|
||||||
|
|
||||||
|
|
|
@ -216,7 +216,7 @@ typedef struct SQueryAttr {
|
||||||
bool simpleAgg;
|
bool simpleAgg;
|
||||||
bool pointInterpQuery; // point interpolation query
|
bool pointInterpQuery; // point interpolation query
|
||||||
bool needReverseScan; // need reverse scan
|
bool needReverseScan; // need reverse scan
|
||||||
bool distinctTag; // distinct tag query
|
bool distinct; // distinct query or not
|
||||||
bool stateWindow; // window State on sub/normal table
|
bool stateWindow; // window State on sub/normal table
|
||||||
bool createFilterOperator; // if filter operator is needed
|
bool createFilterOperator; // if filter operator is needed
|
||||||
int32_t interBufSize; // intermediate buffer sizse
|
int32_t interBufSize; // intermediate buffer sizse
|
||||||
|
@ -514,6 +514,7 @@ typedef struct SDistinctOperatorInfo {
|
||||||
bool recordNullVal; //has already record the null value, no need to try again
|
bool recordNullVal; //has already record the null value, no need to try again
|
||||||
int64_t threshold;
|
int64_t threshold;
|
||||||
int64_t outputCapacity;
|
int64_t outputCapacity;
|
||||||
|
int32_t colIndex;
|
||||||
} SDistinctOperatorInfo;
|
} SDistinctOperatorInfo;
|
||||||
|
|
||||||
struct SGlobalMerger;
|
struct SGlobalMerger;
|
||||||
|
|
|
@ -121,7 +121,8 @@ typedef struct SQueryInfo {
|
||||||
int64_t vgroupLimit; // table limit in case of super table projection query + global order + limit
|
int64_t vgroupLimit; // table limit in case of super table projection query + global order + limit
|
||||||
|
|
||||||
int32_t udColumnId; // current user-defined constant output field column id, monotonically decreases from TSDB_UD_COLUMN_INDEX
|
int32_t udColumnId; // current user-defined constant output field column id, monotonically decreases from TSDB_UD_COLUMN_INDEX
|
||||||
bool distinctTag; // distinct tag or not
|
bool distinct; // distinct tag or not
|
||||||
|
bool onlyHasTagCond;
|
||||||
int32_t round; // 0/1/....
|
int32_t round; // 0/1/....
|
||||||
int32_t bufLen;
|
int32_t bufLen;
|
||||||
char* buf;
|
char* buf;
|
||||||
|
|
|
@ -6479,7 +6479,7 @@ static SSDataBlock* doTagScan(void* param, bool* newgroup) {
|
||||||
pOperator->status = OP_EXEC_DONE;
|
pOperator->status = OP_EXEC_DONE;
|
||||||
qDebug("QInfo:0x%"PRIx64" create count(tbname) query, res:%d rows:1", GET_QID(pRuntimeEnv), count);
|
qDebug("QInfo:0x%"PRIx64" create count(tbname) query, res:%d rows:1", GET_QID(pRuntimeEnv), count);
|
||||||
} else { // return only the tags|table name etc.
|
} else { // return only the tags|table name etc.
|
||||||
SExprInfo* pExprInfo = pOperator->pExpr; // todo use the column list instead of exprinfo
|
SExprInfo* pExprInfo = &pOperator->pExpr[0]; // todo use the column list instead of exprinfo
|
||||||
|
|
||||||
count = 0;
|
count = 0;
|
||||||
while(pInfo->curPos < pInfo->totalTables && count < maxNumOfTables) {
|
while(pInfo->curPos < pInfo->totalTables && count < maxNumOfTables) {
|
||||||
|
@ -6565,13 +6565,25 @@ static SSDataBlock* hashDistinct(void* param, bool* newgroup) {
|
||||||
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
|
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
|
||||||
|
|
||||||
if (pBlock == NULL) {
|
if (pBlock == NULL) {
|
||||||
|
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
|
||||||
|
pOperator->status = OP_EXEC_DONE;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
if (pInfo->colIndex == -1) {
|
||||||
|
for (int i = 0; i < taosArrayGetSize(pBlock->pDataBlock); i++) {
|
||||||
|
SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, i);
|
||||||
|
if (pColDataInfo->info.colId == pOperator->pExpr[0].base.resColId) {
|
||||||
|
pInfo->colIndex = i;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (pInfo->colIndex == -1) {
|
||||||
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
|
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
|
||||||
pOperator->status = OP_EXEC_DONE;
|
pOperator->status = OP_EXEC_DONE;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->colIndex);
|
||||||
assert(pBlock->info.numOfCols == 1);
|
|
||||||
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, 0);
|
|
||||||
|
|
||||||
int16_t bytes = pColInfoData->info.bytes;
|
int16_t bytes = pColInfoData->info.bytes;
|
||||||
int16_t type = pColInfoData->info.type;
|
int16_t type = pColInfoData->info.type;
|
||||||
|
@ -6614,7 +6626,8 @@ static SSDataBlock* hashDistinct(void* param, bool* newgroup) {
|
||||||
|
|
||||||
SOperatorInfo* createDistinctOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
|
SOperatorInfo* createDistinctOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
|
||||||
SDistinctOperatorInfo* pInfo = calloc(1, sizeof(SDistinctOperatorInfo));
|
SDistinctOperatorInfo* pInfo = calloc(1, sizeof(SDistinctOperatorInfo));
|
||||||
|
pInfo->colIndex = -1;
|
||||||
|
pInfo->threshold = 10000000; // distinct result threshold
|
||||||
pInfo->outputCapacity = 4096;
|
pInfo->outputCapacity = 4096;
|
||||||
pInfo->pSet = taosHashInit(64, taosGetDefaultHashFunction(pExpr->base.colType), false, HASH_NO_LOCK);
|
pInfo->pSet = taosHashInit(64, taosGetDefaultHashFunction(pExpr->base.colType), false, HASH_NO_LOCK);
|
||||||
pInfo->pRes = createOutputBuf(pExpr, numOfOutput, (int32_t) pInfo->outputCapacity);
|
pInfo->pRes = createOutputBuf(pExpr, numOfOutput, (int32_t) pInfo->outputCapacity);
|
||||||
|
@ -6628,6 +6641,7 @@ SOperatorInfo* createDistinctOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperat
|
||||||
pOperator->info = pInfo;
|
pOperator->info = pInfo;
|
||||||
pOperator->pRuntimeEnv = pRuntimeEnv;
|
pOperator->pRuntimeEnv = pRuntimeEnv;
|
||||||
pOperator->exec = hashDistinct;
|
pOperator->exec = hashDistinct;
|
||||||
|
pOperator->pExpr = pExpr;
|
||||||
pOperator->cleanup = destroyDistinctOperatorInfo;
|
pOperator->cleanup = destroyDistinctOperatorInfo;
|
||||||
|
|
||||||
appendUpstream(pOperator, upstream);
|
appendUpstream(pOperator, upstream);
|
||||||
|
|
|
@ -104,7 +104,7 @@ static SQueryNode* doAddTableColumnNode(SQueryInfo* pQueryInfo, STableMetaInfo*
|
||||||
int32_t num = (int32_t) taosArrayGetSize(pExprs);
|
int32_t num = (int32_t) taosArrayGetSize(pExprs);
|
||||||
SQueryNode* pNode = createQueryNode(QNODE_TAGSCAN, "TableTagScan", NULL, 0, pExprs->pData, num, info, NULL);
|
SQueryNode* pNode = createQueryNode(QNODE_TAGSCAN, "TableTagScan", NULL, 0, pExprs->pData, num, info, NULL);
|
||||||
|
|
||||||
if (pQueryInfo->distinctTag) {
|
if (pQueryInfo->distinct) {
|
||||||
pNode = createQueryNode(QNODE_DISTINCT, "Distinct", &pNode, 1, pExprs->pData, num, info, NULL);
|
pNode = createQueryNode(QNODE_DISTINCT, "Distinct", &pNode, 1, pExprs->pData, num, info, NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -551,9 +551,11 @@ SArray* createExecOperatorPlan(SQueryAttr* pQueryAttr) {
|
||||||
int32_t op = 0;
|
int32_t op = 0;
|
||||||
|
|
||||||
if (onlyQueryTags(pQueryAttr)) { // do nothing for tags query
|
if (onlyQueryTags(pQueryAttr)) { // do nothing for tags query
|
||||||
op = OP_TagScan;
|
if (onlyQueryTags(pQueryAttr)) {
|
||||||
taosArrayPush(plan, &op);
|
op = OP_TagScan;
|
||||||
if (pQueryAttr->distinctTag) {
|
taosArrayPush(plan, &op);
|
||||||
|
}
|
||||||
|
if (pQueryAttr->distinct) {
|
||||||
op = OP_Distinct;
|
op = OP_Distinct;
|
||||||
taosArrayPush(plan, &op);
|
taosArrayPush(plan, &op);
|
||||||
}
|
}
|
||||||
|
@ -630,8 +632,13 @@ SArray* createExecOperatorPlan(SQueryAttr* pQueryAttr) {
|
||||||
} else {
|
} else {
|
||||||
op = OP_Project;
|
op = OP_Project;
|
||||||
taosArrayPush(plan, &op);
|
taosArrayPush(plan, &op);
|
||||||
|
if (pQueryAttr->distinct) {
|
||||||
|
op = OP_Distinct;
|
||||||
|
taosArrayPush(plan, &op);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
if (pQueryAttr->limit.limit > 0 || pQueryAttr->limit.offset > 0) {
|
if (pQueryAttr->limit.limit > 0 || pQueryAttr->limit.offset > 0) {
|
||||||
op = OP_Limit;
|
op = OP_Limit;
|
||||||
|
@ -651,7 +658,7 @@ SArray* createGlobalMergePlan(SQueryAttr* pQueryAttr) {
|
||||||
int32_t op = OP_MultiwayMergeSort;
|
int32_t op = OP_MultiwayMergeSort;
|
||||||
taosArrayPush(plan, &op);
|
taosArrayPush(plan, &op);
|
||||||
|
|
||||||
if (pQueryAttr->distinctTag) {
|
if (pQueryAttr->distinct) {
|
||||||
op = OP_Distinct;
|
op = OP_Distinct;
|
||||||
taosArrayPush(plan, &op);
|
taosArrayPush(plan, &op);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue