Merge pull request #7390 from taosdata/feature/TD-6087
[TD-6087]<merge> merge distinct to master
This commit is contained in:
commit
4c07b00472
|
@ -1940,20 +1940,6 @@ static void addPrimaryTsColIntoResult(SQueryInfo* pQueryInfo, SSqlCmd* pCmd) {
|
|||
pQueryInfo->type |= TSDB_QUERY_TYPE_PROJECTION_QUERY;
|
||||
}
|
||||
|
||||
bool isValidDistinctSql(SQueryInfo* pQueryInfo) {
|
||||
if (pQueryInfo == NULL) {
|
||||
return false;
|
||||
}
|
||||
if ((pQueryInfo->type & TSDB_QUERY_TYPE_STABLE_QUERY) != TSDB_QUERY_TYPE_STABLE_QUERY
|
||||
&& (pQueryInfo->type & TSDB_QUERY_TYPE_TABLE_QUERY) != TSDB_QUERY_TYPE_TABLE_QUERY) {
|
||||
return false;
|
||||
}
|
||||
if (tscNumOfExprs(pQueryInfo) == 1){
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
static bool hasNoneUserDefineExpr(SQueryInfo* pQueryInfo) {
|
||||
size_t numOfExprs = taosArrayGetSize(pQueryInfo->exprList);
|
||||
for (int32_t i = 0; i < numOfExprs; ++i) {
|
||||
|
@ -2043,9 +2029,11 @@ int32_t validateSelectNodeList(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SArray* pS
|
|||
const char* msg1 = "too many items in selection clause";
|
||||
const char* msg2 = "functions or others can not be mixed up";
|
||||
const char* msg3 = "not support query expression";
|
||||
const char* msg4 = "only support distinct one column or tag";
|
||||
const char* msg4 = "not support distinct mixed with proj/agg func";
|
||||
const char* msg5 = "invalid function name";
|
||||
const char* msg6 = "_block_dist not support subquery, only support stable/table";
|
||||
const char* msg6 = "not support distinct mixed with join";
|
||||
const char* msg7 = "not support distinct mixed with groupby";
|
||||
const char* msg8 = "not support distinct in nest query";
|
||||
|
||||
// too many result columns not support order by in query
|
||||
if (taosArrayGetSize(pSelNodeList) > TSDB_MAX_COLUMNS) {
|
||||
|
@ -2056,18 +2044,25 @@ int32_t validateSelectNodeList(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SArray* pS
|
|||
pQueryInfo->colList = taosArrayInit(4, POINTER_BYTES);
|
||||
}
|
||||
|
||||
|
||||
bool hasDistinct = false;
|
||||
bool hasAgg = false;
|
||||
size_t numOfExpr = taosArrayGetSize(pSelNodeList);
|
||||
int32_t distIdx = -1;
|
||||
for (int32_t i = 0; i < numOfExpr; ++i) {
|
||||
int32_t outputIndex = (int32_t)tscNumOfExprs(pQueryInfo);
|
||||
tSqlExprItem* pItem = taosArrayGet(pSelNodeList, i);
|
||||
|
||||
if (hasDistinct == false) {
|
||||
hasDistinct = (pItem->distinct == true);
|
||||
distIdx = hasDistinct ? i : -1;
|
||||
}
|
||||
|
||||
int32_t type = pItem->pNode->type;
|
||||
if (type == SQL_NODE_SQLFUNCTION) {
|
||||
hasAgg = true;
|
||||
if (hasDistinct) break;
|
||||
|
||||
pItem->pNode->functionId = isValidFunction(pItem->pNode->Expr.operand.z, pItem->pNode->Expr.operand.n);
|
||||
|
||||
if (pItem->pNode->functionId == TSDB_FUNC_BLKINFO && taosArrayGetSize(pQueryInfo->pUpstream) > 0) {
|
||||
|
@ -2108,10 +2103,22 @@ int32_t validateSelectNodeList(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SArray* pS
|
|||
}
|
||||
}
|
||||
|
||||
//TODO(dengyihao), refactor as function
|
||||
//handle distinct func mixed with other func
|
||||
if (hasDistinct == true) {
|
||||
if (!isValidDistinctSql(pQueryInfo) ) {
|
||||
if (distIdx != 0 || hasAgg) {
|
||||
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg4);
|
||||
}
|
||||
if (joinQuery) {
|
||||
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg6);
|
||||
}
|
||||
if (pQueryInfo->groupbyExpr.numOfGroupCols != 0) {
|
||||
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg7);
|
||||
}
|
||||
if (pQueryInfo->pDownstream != NULL) {
|
||||
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg8);
|
||||
}
|
||||
|
||||
pQueryInfo->distinct = true;
|
||||
}
|
||||
|
||||
|
@ -5512,6 +5519,7 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq
|
|||
const char* msg1 = "invalid column name";
|
||||
const char* msg2 = "order by primary timestamp, first tag or groupby column in groupby clause allowed";
|
||||
const char* msg3 = "invalid column in order by clause, only primary timestamp or first tag in groupby clause allowed";
|
||||
const char* msg4 = "orderby column must projected in subquery";
|
||||
|
||||
setDefaultOrderInfo(pQueryInfo);
|
||||
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
||||
|
@ -5627,6 +5635,17 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq
|
|||
|
||||
// orderby ts query on super table
|
||||
if (tscOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
|
||||
bool found = false;
|
||||
for (int32_t i = 0; i < tscNumOfExprs(pQueryInfo); ++i) {
|
||||
SExprInfo* pExpr = tscExprGet(pQueryInfo, i);
|
||||
if (pExpr->base.functionId == TSDB_FUNC_PRJ && pExpr->base.colInfo.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
|
||||
found = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!found && pQueryInfo->pDownstream) {
|
||||
return invalidOperationMsg(pMsgBuf, msg4);
|
||||
}
|
||||
addPrimaryTsColIntoResult(pQueryInfo, pCmd);
|
||||
}
|
||||
}
|
||||
|
@ -8422,6 +8441,7 @@ static int32_t doValidateSubquery(SSqlNode* pSqlNode, int32_t index, SSqlObj* pS
|
|||
pSub->pUdfInfo = pUdfInfo;
|
||||
pSub->udfCopy = true;
|
||||
|
||||
pSub->pDownstream = pQueryInfo;
|
||||
int32_t code = validateSqlNode(pSql, p, pSub);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
|
|
|
@ -409,7 +409,7 @@ static void doProcessMsgFromServer(SSchedMsg* pSchedMsg) {
|
|||
if ((TSDB_QUERY_HAS_TYPE(pQueryInfo->type, (TSDB_QUERY_TYPE_STABLE_SUBQUERY | TSDB_QUERY_TYPE_SUBQUERY |
|
||||
TSDB_QUERY_TYPE_TAG_FILTER_QUERY)) &&
|
||||
!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_PROJECTION_QUERY)) ||
|
||||
(TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_NEST_SUBQUERY))) {
|
||||
(TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_NEST_SUBQUERY)) || (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_STABLE_SUBQUERY) && pQueryInfo->distinct)) {
|
||||
// do nothing in case of super table subquery
|
||||
} else {
|
||||
pSql->retry += 1;
|
||||
|
|
|
@ -3533,8 +3533,10 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t
|
|||
pNewQueryInfo->numOfTables = 0;
|
||||
pNewQueryInfo->pTableMetaInfo = NULL;
|
||||
pNewQueryInfo->bufLen = pQueryInfo->bufLen;
|
||||
|
||||
pNewQueryInfo->buf = malloc(pQueryInfo->bufLen);
|
||||
|
||||
|
||||
pNewQueryInfo->distinct = pQueryInfo->distinct;
|
||||
if (pNewQueryInfo->buf == NULL) {
|
||||
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
goto _error;
|
||||
|
|
|
@ -59,6 +59,7 @@ extern char tsLocale[];
|
|||
extern char tsCharset[]; // default encode string
|
||||
extern int8_t tsEnableCoreFile;
|
||||
extern int32_t tsCompressMsgSize;
|
||||
extern int32_t tsMaxNumOfDistinctResults;
|
||||
extern char tsTempDir[];
|
||||
|
||||
//query buffer management
|
||||
|
|
|
@ -89,6 +89,9 @@ int32_t tsMaxNumOfOrderedResults = 100000;
|
|||
// 10 ms for sliding time, the value will changed in case of time precision changed
|
||||
int32_t tsMinSlidingTime = 10;
|
||||
|
||||
// the maxinum number of distict query result
|
||||
int32_t tsMaxNumOfDistinctResults = 1000 * 10000;
|
||||
|
||||
// 1 us for interval time range, changed accordingly
|
||||
int32_t tsMinIntervalTime = 1;
|
||||
|
||||
|
@ -546,6 +549,16 @@ static void doInitGlobalConfig(void) {
|
|||
cfg.unitType = TAOS_CFG_UTYPE_NONE;
|
||||
taosInitConfigOption(cfg);
|
||||
|
||||
cfg.option = "maxNumOfDistinctRes";
|
||||
cfg.ptr = &tsMaxNumOfDistinctResults;
|
||||
cfg.valType = TAOS_CFG_VTYPE_INT32;
|
||||
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW | TSDB_CFG_CTYPE_B_CLIENT;
|
||||
cfg.minValue = 10*10000;
|
||||
cfg.maxValue = 10000*10000;
|
||||
cfg.ptrLength = 0;
|
||||
cfg.unitType = TAOS_CFG_UTYPE_NONE;
|
||||
taosInitConfigOption(cfg);
|
||||
|
||||
cfg.option = "numOfMnodes";
|
||||
cfg.ptr = &tsNumOfMnodes;
|
||||
cfg.valType = TAOS_CFG_VTYPE_INT32;
|
||||
|
|
|
@ -510,13 +510,21 @@ typedef struct SStateWindowOperatorInfo {
|
|||
bool reptScan;
|
||||
} SStateWindowOperatorInfo ;
|
||||
|
||||
typedef struct SDistinctDataInfo {
|
||||
int32_t index;
|
||||
int32_t type;
|
||||
int32_t bytes;
|
||||
} SDistinctDataInfo;
|
||||
|
||||
typedef struct SDistinctOperatorInfo {
|
||||
SHashObj *pSet;
|
||||
SSDataBlock *pRes;
|
||||
bool recordNullVal; //has already record the null value, no need to try again
|
||||
int64_t threshold;
|
||||
int64_t outputCapacity;
|
||||
int32_t colIndex;
|
||||
int32_t totalBytes;
|
||||
char* buf;
|
||||
SArray* pDistinctDataInfo;
|
||||
} SDistinctOperatorInfo;
|
||||
|
||||
struct SGlobalMerger;
|
||||
|
|
|
@ -44,6 +44,10 @@
|
|||
|
||||
#define SDATA_BLOCK_INITIALIZER (SDataBlockInfo) {{0}, 0}
|
||||
|
||||
#define MULTI_KEY_DELIM "-"
|
||||
|
||||
#define HASH_CAPACITY_LIMIT 10000000
|
||||
|
||||
#define TIME_WINDOW_COPY(_dst, _src) do {\
|
||||
(_dst).skey = (_src).skey;\
|
||||
(_dst).ekey = (_src).ekey;\
|
||||
|
@ -3581,6 +3585,7 @@ void setDefaultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *pInfo, i
|
|||
SResultRowInfo* pResultRowInfo = &pInfo->resultRowInfo;
|
||||
|
||||
int64_t tid = 0;
|
||||
pRuntimeEnv->keyBuf = realloc(pRuntimeEnv->keyBuf, sizeof(tid) + sizeof(int64_t) + POINTER_BYTES);
|
||||
SResultRow* pRow = doSetResultOutBufByKey(pRuntimeEnv, pResultRowInfo, tid, (char *)&tid, sizeof(tid), true, uid);
|
||||
|
||||
for (int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) {
|
||||
|
@ -6534,6 +6539,8 @@ static void destroyConditionOperatorInfo(void* param, int32_t numOfOutput) {
|
|||
static void destroyDistinctOperatorInfo(void* param, int32_t numOfOutput) {
|
||||
SDistinctOperatorInfo* pInfo = (SDistinctOperatorInfo*) param;
|
||||
taosHashCleanup(pInfo->pSet);
|
||||
tfree(pInfo->buf);
|
||||
taosArrayDestroy(pInfo->pDistinctDataInfo);
|
||||
pInfo->pRes = destroyOutputBuf(pInfo->pRes);
|
||||
}
|
||||
|
||||
|
@ -7075,6 +7082,52 @@ SOperatorInfo* createTagScanOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInf
|
|||
|
||||
return pOperator;
|
||||
}
|
||||
static bool initMultiDistinctInfo(SDistinctOperatorInfo *pInfo, SOperatorInfo* pOperator, SSDataBlock *pBlock) {
|
||||
if (taosArrayGetSize(pInfo->pDistinctDataInfo) == pOperator->numOfOutput) {
|
||||
// distinct info already inited
|
||||
return true;
|
||||
}
|
||||
for (int i = 0; i < pOperator->numOfOutput; i++) {
|
||||
pInfo->totalBytes += pOperator->pExpr[i].base.colBytes;
|
||||
}
|
||||
for (int i = 0; i < pOperator->numOfOutput; i++) {
|
||||
int numOfBlock = (int)taosArrayGetSize(pBlock->pDataBlock);
|
||||
assert(i < numOfBlock);
|
||||
for (int j = 0; j < numOfBlock; j++) {
|
||||
SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, j);
|
||||
if (pColDataInfo->info.colId == pOperator->pExpr[i].base.resColId) {
|
||||
SDistinctDataInfo item = {.index = j, .type = pColDataInfo->info.type, .bytes = pColDataInfo->info.bytes};
|
||||
taosArrayInsert(pInfo->pDistinctDataInfo, i, &item);
|
||||
}
|
||||
}
|
||||
}
|
||||
pInfo->totalBytes += (int32_t)strlen(MULTI_KEY_DELIM) * (pOperator->numOfOutput);
|
||||
pInfo->buf = calloc(1, pInfo->totalBytes);
|
||||
return taosArrayGetSize(pInfo->pDistinctDataInfo) == pOperator->numOfOutput ? true : false;
|
||||
}
|
||||
static void buildMultiDistinctKey(SDistinctOperatorInfo *pInfo, SSDataBlock *pBlock, int32_t rowId) {
|
||||
char *p = pInfo->buf;
|
||||
memset(p, 0, pInfo->totalBytes);
|
||||
|
||||
for (int i = 0; i < taosArrayGetSize(pInfo->pDistinctDataInfo); i++) {
|
||||
SDistinctDataInfo* pDistDataInfo = (SDistinctDataInfo *)taosArrayGet(pInfo->pDistinctDataInfo, i);
|
||||
SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pDistDataInfo->index);
|
||||
char *val = ((char *)pColDataInfo->pData) + pColDataInfo->info.bytes * rowId;
|
||||
if (isNull(val, pDistDataInfo->type)) {
|
||||
p += pDistDataInfo->bytes;
|
||||
continue;
|
||||
}
|
||||
if (IS_VAR_DATA_TYPE(pDistDataInfo->type)) {
|
||||
memcpy(p, varDataVal(val), varDataLen(val));
|
||||
p += varDataLen(val);
|
||||
} else {
|
||||
memcpy(p, val, pDistDataInfo->bytes);
|
||||
p += pDistDataInfo->bytes;
|
||||
}
|
||||
memcpy(p, MULTI_KEY_DELIM, strlen(MULTI_KEY_DELIM));
|
||||
p += strlen(MULTI_KEY_DELIM);
|
||||
}
|
||||
}
|
||||
|
||||
static SSDataBlock* hashDistinct(void* param, bool* newgroup) {
|
||||
SOperatorInfo* pOperator = (SOperatorInfo*) param;
|
||||
|
@ -7082,11 +7135,9 @@ static SSDataBlock* hashDistinct(void* param, bool* newgroup) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
|
||||
SDistinctOperatorInfo* pInfo = pOperator->info;
|
||||
SSDataBlock* pRes = pInfo->pRes;
|
||||
|
||||
|
||||
pRes->info.rows = 0;
|
||||
SSDataBlock* pBlock = NULL;
|
||||
while(1) {
|
||||
|
@ -7099,77 +7150,60 @@ static SSDataBlock* hashDistinct(void* param, bool* newgroup) {
|
|||
pOperator->status = OP_EXEC_DONE;
|
||||
break;
|
||||
}
|
||||
if (pInfo->colIndex == -1) {
|
||||
for (int i = 0; i < taosArrayGetSize(pBlock->pDataBlock); i++) {
|
||||
SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, i);
|
||||
if (pColDataInfo->info.colId == pOperator->pExpr[0].base.resColId) {
|
||||
pInfo->colIndex = i;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (pInfo->colIndex == -1) {
|
||||
if (!initMultiDistinctInfo(pInfo, pOperator, pBlock)) {
|
||||
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
|
||||
pOperator->status = OP_EXEC_DONE;
|
||||
return NULL;
|
||||
}
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->colIndex);
|
||||
|
||||
int16_t bytes = pColInfoData->info.bytes;
|
||||
int16_t type = pColInfoData->info.type;
|
||||
|
||||
// ensure the output buffer size
|
||||
SColumnInfoData* pResultColInfoData = taosArrayGet(pRes->pDataBlock, 0);
|
||||
if (pRes->info.rows + pBlock->info.rows > pInfo->outputCapacity) {
|
||||
int32_t newSize = pRes->info.rows + pBlock->info.rows;
|
||||
char* tmp = realloc(pResultColInfoData->pData, newSize * bytes);
|
||||
if (tmp == NULL) {
|
||||
return NULL;
|
||||
} else {
|
||||
pResultColInfoData->pData = tmp;
|
||||
break;
|
||||
}
|
||||
|
||||
// ensure result output buf
|
||||
if (pRes->info.rows + pBlock->info.rows > pInfo->outputCapacity) {
|
||||
int32_t newSize = pRes->info.rows + pBlock->info.rows;
|
||||
for (int i = 0; i < taosArrayGetSize(pRes->pDataBlock); i++) {
|
||||
SColumnInfoData* pResultColInfoData = taosArrayGet(pRes->pDataBlock, i);
|
||||
SDistinctDataInfo* pDistDataInfo = taosArrayGet(pInfo->pDistinctDataInfo, i);
|
||||
char* tmp = realloc(pResultColInfoData->pData, newSize * pDistDataInfo->bytes);
|
||||
if (tmp == NULL) {
|
||||
return NULL;
|
||||
} else {
|
||||
pResultColInfoData->pData = tmp;
|
||||
}
|
||||
}
|
||||
pInfo->outputCapacity = newSize;
|
||||
}
|
||||
}
|
||||
|
||||
for(int32_t i = 0; i < pBlock->info.rows; ++i) {
|
||||
char* val = ((char*)pColInfoData->pData) + bytes * i;
|
||||
if (isNull(val, type)) {
|
||||
continue;
|
||||
}
|
||||
char* p = val;
|
||||
size_t keyLen = 0;
|
||||
if (IS_VAR_DATA_TYPE(pOperator->pExpr->base.colType)) {
|
||||
tstr* var = (tstr*)(val);
|
||||
p = var->data;
|
||||
keyLen = varDataLen(var);
|
||||
} else {
|
||||
keyLen = bytes;
|
||||
}
|
||||
|
||||
int dummy;
|
||||
void* res = taosHashGet(pInfo->pSet, p, keyLen);
|
||||
if (res == NULL) {
|
||||
taosHashPut(pInfo->pSet, p, keyLen, &dummy, sizeof(dummy));
|
||||
char* start = pResultColInfoData->pData + bytes * pInfo->pRes->info.rows;
|
||||
memcpy(start, val, bytes);
|
||||
}
|
||||
for (int32_t i = 0; i < pBlock->info.rows; i++) {
|
||||
buildMultiDistinctKey(pInfo, pBlock, i);
|
||||
if (taosHashGet(pInfo->pSet, pInfo->buf, pInfo->totalBytes) == NULL) {
|
||||
int32_t dummy;
|
||||
taosHashPut(pInfo->pSet, pInfo->buf, pInfo->totalBytes, &dummy, sizeof(dummy));
|
||||
for (int j = 0; j < taosArrayGetSize(pRes->pDataBlock); j++) {
|
||||
SDistinctDataInfo* pDistDataInfo = taosArrayGet(pInfo->pDistinctDataInfo, j); // distinct meta info
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pDistDataInfo->index); //src
|
||||
SColumnInfoData* pResultColInfoData = taosArrayGet(pRes->pDataBlock, j); // dist
|
||||
char* val = ((char*)pColInfoData->pData) + pDistDataInfo->bytes * i;
|
||||
char *start = pResultColInfoData->pData + pDistDataInfo->bytes * pInfo->pRes->info.rows;
|
||||
memcpy(start, val, pDistDataInfo->bytes);
|
||||
}
|
||||
pRes->info.rows += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (pRes->info.rows >= pInfo->threshold) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return (pInfo->pRes->info.rows > 0)? pInfo->pRes:NULL;
|
||||
}
|
||||
|
||||
SOperatorInfo* createDistinctOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
|
||||
SDistinctOperatorInfo* pInfo = calloc(1, sizeof(SDistinctOperatorInfo));
|
||||
pInfo->colIndex = -1;
|
||||
pInfo->threshold = 10000000; // distinct result threshold
|
||||
pInfo->outputCapacity = 4096;
|
||||
pInfo->pSet = taosHashInit(64, taosGetDefaultHashFunction(pExpr->base.colType), false, HASH_NO_LOCK);
|
||||
|
||||
pInfo->totalBytes = 0;
|
||||
pInfo->buf = NULL;
|
||||
pInfo->threshold = tsMaxNumOfDistinctResults; // distinct result threshold
|
||||
pInfo->outputCapacity = 4096;
|
||||
pInfo->pDistinctDataInfo = taosArrayInit(numOfOutput, sizeof(SDistinctDataInfo));
|
||||
pInfo->pSet = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
|
||||
pInfo->pRes = createOutputBuf(pExpr, numOfOutput, (int32_t) pInfo->outputCapacity);
|
||||
|
||||
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
|
||||
|
|
Loading…
Reference in New Issue