Merge pull request #11280 from taosdata/feature/3.0_liaohj
Feature/3.0 liaohj
This commit is contained in:
commit
52ada128d8
|
@ -188,6 +188,7 @@ DLL_EXPORT void taos_stop_query(TAOS_RES *res);
|
||||||
DLL_EXPORT bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col);
|
DLL_EXPORT bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col);
|
||||||
DLL_EXPORT bool taos_is_update_query(TAOS_RES *res);
|
DLL_EXPORT bool taos_is_update_query(TAOS_RES *res);
|
||||||
DLL_EXPORT int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows);
|
DLL_EXPORT int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows);
|
||||||
|
DLL_EXPORT int *taos_get_column_data_offset(TAOS_RES *res, int columnIndex);
|
||||||
DLL_EXPORT int taos_validate_sql(TAOS *taos, const char *sql);
|
DLL_EXPORT int taos_validate_sql(TAOS *taos, const char *sql);
|
||||||
DLL_EXPORT void taos_reset_current_db(TAOS *taos);
|
DLL_EXPORT void taos_reset_current_db(TAOS *taos);
|
||||||
|
|
||||||
|
|
|
@ -385,11 +385,11 @@ bool taos_is_update_query(TAOS_RES *res) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) {
|
int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) {
|
||||||
if (res == NULL) {
|
SRequestObj *pRequest = (SRequestObj *)res;
|
||||||
|
if (pRequest == NULL) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
SRequestObj *pRequest = (SRequestObj *)res;
|
|
||||||
if (pRequest->type == TSDB_SQL_RETRIEVE_EMPTY_RESULT || pRequest->type == TSDB_SQL_INSERT ||
|
if (pRequest->type == TSDB_SQL_RETRIEVE_EMPTY_RESULT || pRequest->type == TSDB_SQL_INSERT ||
|
||||||
pRequest->code != TSDB_CODE_SUCCESS || taos_num_fields(res) == 0) {
|
pRequest->code != TSDB_CODE_SUCCESS || taos_num_fields(res) == 0) {
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -405,6 +405,25 @@ int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) {
|
||||||
return pResultInfo->numOfRows;
|
return pResultInfo->numOfRows;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int *taos_get_column_data_offset(TAOS_RES *res, int columnIndex) {
|
||||||
|
SRequestObj *pRequest = (SRequestObj *)res;
|
||||||
|
if (pRequest == NULL) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t numOfFields = taos_num_fields(pRequest);
|
||||||
|
if (columnIndex < 0 || columnIndex >= numOfFields || numOfFields == 0) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
TAOS_FIELD* pField = &pRequest->body.resInfo.userFields[columnIndex];
|
||||||
|
if (!IS_VAR_DATA_TYPE(pField->type)) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
return pRequest->body.resInfo.pCol[columnIndex].offset;
|
||||||
|
}
|
||||||
|
|
||||||
int taos_validate_sql(TAOS *taos, const char *sql) { return true; }
|
int taos_validate_sql(TAOS *taos, const char *sql) { return true; }
|
||||||
|
|
||||||
void taos_reset_current_db(TAOS *taos) {
|
void taos_reset_current_db(TAOS *taos) {
|
||||||
|
|
|
@ -625,24 +625,6 @@ typedef struct SSortOperatorInfo {
|
||||||
uint64_t totalElapsed; // total elapsed time
|
uint64_t totalElapsed; // total elapsed time
|
||||||
} SSortOperatorInfo;
|
} SSortOperatorInfo;
|
||||||
|
|
||||||
typedef struct SDistinctDataInfo {
|
|
||||||
int32_t index;
|
|
||||||
int32_t type;
|
|
||||||
int32_t bytes;
|
|
||||||
} SDistinctDataInfo;
|
|
||||||
|
|
||||||
typedef struct SDistinctOperatorInfo {
|
|
||||||
SHashObj* pSet;
|
|
||||||
SSDataBlock* pRes;
|
|
||||||
bool recordNullVal; // has already record the null value, no need to try again
|
|
||||||
// int64_t threshold; // todo remove it
|
|
||||||
// int64_t outputCapacity;// todo remove it
|
|
||||||
// int32_t totalBytes; // todo remove it
|
|
||||||
SResultInfo resInfo;
|
|
||||||
char* buf;
|
|
||||||
SArray* pDistinctDataInfo;
|
|
||||||
} SDistinctOperatorInfo;
|
|
||||||
|
|
||||||
int32_t operatorDummyOpenFn(SOperatorInfo* pOperator);
|
int32_t operatorDummyOpenFn(SOperatorInfo* pOperator);
|
||||||
void operatorDummyCloseFn(void* param, int32_t numOfCols);
|
void operatorDummyCloseFn(void* param, int32_t numOfCols);
|
||||||
int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num);
|
int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num);
|
||||||
|
@ -682,8 +664,10 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock*
|
||||||
SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SInterval* pInterval, SSDataBlock* pResBlock,
|
SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SInterval* pInterval, SSDataBlock* pResBlock,
|
||||||
int32_t fillType, char* fillVal, bool multigroupResult, SExecTaskInfo* pTaskInfo);
|
int32_t fillType, char* fillVal, bool multigroupResult, SExecTaskInfo* pTaskInfo);
|
||||||
SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo);
|
SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo);
|
||||||
SOperatorInfo* createDistinctOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo);
|
|
||||||
|
|
||||||
|
SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock,
|
||||||
|
SArray* pGroupColList, SNode* pCondition, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
|
||||||
|
#if 0
|
||||||
SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv);
|
SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv);
|
||||||
SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream,
|
SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream,
|
||||||
SExprInfo* pExpr, int32_t numOfOutput);
|
SExprInfo* pExpr, int32_t numOfOutput);
|
||||||
|
@ -705,6 +689,7 @@ SOperatorInfo* createSLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorI
|
||||||
|
|
||||||
SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pdownstream, int32_t numOfDownstream, SSchema* pSchema,
|
SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pdownstream, int32_t numOfDownstream, SSchema* pSchema,
|
||||||
int32_t numOfOutput);
|
int32_t numOfOutput);
|
||||||
|
#endif
|
||||||
|
|
||||||
void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order);
|
void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order);
|
||||||
|
|
||||||
|
|
|
@ -1629,7 +1629,7 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
|
||||||
// window start(end) key interpolation
|
// window start(end) key interpolation
|
||||||
doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->binfo.pCtx, pResult, &nextWin, startPos, forwardStep, pInfo->order, false);
|
doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->binfo.pCtx, pResult, &nextWin, startPos, forwardStep, pInfo->order, false);
|
||||||
|
|
||||||
updateTimeWindowInfo(&pInfo->timeWindowData, &win, true);
|
updateTimeWindowInfo(&pInfo->timeWindowData, &nextWin, true);
|
||||||
doApplyFunctions(pInfo->binfo.pCtx, &nextWin, &pInfo->timeWindowData, startPos, forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
|
doApplyFunctions(pInfo->binfo.pCtx, &nextWin, &pInfo->timeWindowData, startPos, forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3295,6 +3295,7 @@ void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock) {
|
||||||
|
|
||||||
SFilterInfo* filter = NULL;
|
SFilterInfo* filter = NULL;
|
||||||
|
|
||||||
|
// todo move to the initialization function
|
||||||
int32_t code = filterInitFromNode((SNode*)pFilterNode, &filter, 0);
|
int32_t code = filterInitFromNode((SNode*)pFilterNode, &filter, 0);
|
||||||
|
|
||||||
SFilterColumnParam param1 = {.numOfCols = pBlock->info.numOfCols, .pDataBlock = pBlock->pDataBlock};
|
SFilterColumnParam param1 = {.numOfCols = pBlock->info.numOfCols, .pDataBlock = pBlock->pDataBlock};
|
||||||
|
|
|
@ -131,7 +131,7 @@ static void recordNewGroupKeys(SGroupbyOperatorInfo* pInfo, SSDataBlock* pBlock,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t buildGroupValKey(void* pKey, int32_t* length, SArray* pGroupColVals) {
|
static int32_t buildGroupKeys(void* pKey, const SArray* pGroupColVals) {
|
||||||
ASSERT(pKey != NULL);
|
ASSERT(pKey != NULL);
|
||||||
size_t numOfGroupCols = taosArrayGetSize(pGroupColVals);
|
size_t numOfGroupCols = taosArrayGetSize(pGroupColVals);
|
||||||
|
|
||||||
|
@ -155,8 +155,7 @@ static int32_t buildGroupValKey(void* pKey, int32_t* length, SArray* pGroupColVa
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
*length = (pStart - (char*)pKey);
|
return (int32_t) (pStart - (char*)pKey);
|
||||||
return 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// assign the group keys or user input constant values if required
|
// assign the group keys or user input constant values if required
|
||||||
|
@ -217,7 +216,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*int32_t ret = */ buildGroupValKey(pInfo->keyBuf, &len, pInfo->pGroupColVals);
|
len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals);
|
||||||
int32_t ret = setGroupResultOutputBuf_rv(&(pInfo->binfo), pOperator->numOfOutput, pInfo->keyBuf, TSDB_DATA_TYPE_VARCHAR, len, 0, pInfo->aggSup.pResultBuf, pTaskInfo, &pInfo->aggSup);
|
int32_t ret = setGroupResultOutputBuf_rv(&(pInfo->binfo), pOperator->numOfOutput, pInfo->keyBuf, TSDB_DATA_TYPE_VARCHAR, len, 0, pInfo->aggSup.pResultBuf, pTaskInfo, &pInfo->aggSup);
|
||||||
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
|
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
|
||||||
longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR);
|
longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR);
|
||||||
|
@ -233,7 +232,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (num > 0) {
|
if (num > 0) {
|
||||||
/*int32_t ret = */ buildGroupValKey(pInfo->keyBuf, &len, pInfo->pGroupColVals);
|
len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals);
|
||||||
int32_t ret =
|
int32_t ret =
|
||||||
setGroupResultOutputBuf_rv(&(pInfo->binfo), pOperator->numOfOutput, pInfo->keyBuf, TSDB_DATA_TYPE_VARCHAR, len,
|
setGroupResultOutputBuf_rv(&(pInfo->binfo), pOperator->numOfOutput, pInfo->keyBuf, TSDB_DATA_TYPE_VARCHAR, len,
|
||||||
0, pInfo->aggSup.pResultBuf, pTaskInfo, &pInfo->aggSup);
|
0, pInfo->aggSup.pResultBuf, pTaskInfo, &pInfo->aggSup);
|
||||||
|
@ -351,164 +350,40 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
#define MULTI_KEY_DELIM "-"
|
SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock,
|
||||||
|
SArray* pGroupColList, SNode* pCondition, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo) {
|
||||||
static void destroyDistinctOperatorInfo(void* param, int32_t numOfOutput) {
|
SGroupbyOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SGroupbyOperatorInfo));
|
||||||
SDistinctOperatorInfo* pInfo = (SDistinctOperatorInfo*)param;
|
|
||||||
taosHashCleanup(pInfo->pSet);
|
|
||||||
taosMemoryFreeClear(pInfo->buf);
|
|
||||||
taosArrayDestroy(pInfo->pDistinctDataInfo);
|
|
||||||
pInfo->pRes = blockDataDestroy(pInfo->pRes);
|
|
||||||
}
|
|
||||||
|
|
||||||
static void buildMultiDistinctKey(SDistinctOperatorInfo* pInfo, SSDataBlock* pBlock, int32_t rowId) {
|
|
||||||
char* p = pInfo->buf;
|
|
||||||
// memset(p, 0, pInfo->totalBytes);
|
|
||||||
|
|
||||||
for (int i = 0; i < taosArrayGetSize(pInfo->pDistinctDataInfo); i++) {
|
|
||||||
SDistinctDataInfo* pDistDataInfo = (SDistinctDataInfo*)taosArrayGet(pInfo->pDistinctDataInfo, i);
|
|
||||||
SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pDistDataInfo->index);
|
|
||||||
|
|
||||||
char* val = ((char*)pColDataInfo->pData) + pColDataInfo->info.bytes * rowId;
|
|
||||||
if (isNull(val, pDistDataInfo->type)) {
|
|
||||||
p += pDistDataInfo->bytes;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
if (IS_VAR_DATA_TYPE(pDistDataInfo->type)) {
|
|
||||||
memcpy(p, varDataVal(val), varDataLen(val));
|
|
||||||
p += varDataLen(val);
|
|
||||||
} else {
|
|
||||||
memcpy(p, val, pDistDataInfo->bytes);
|
|
||||||
p += pDistDataInfo->bytes;
|
|
||||||
}
|
|
||||||
memcpy(p, MULTI_KEY_DELIM, strlen(MULTI_KEY_DELIM));
|
|
||||||
p += strlen(MULTI_KEY_DELIM);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static bool initMultiDistinctInfo(SDistinctOperatorInfo* pInfo, SOperatorInfo* pOperator) {
|
|
||||||
for (int i = 0; i < pOperator->numOfOutput; i++) {
|
|
||||||
// pInfo->totalBytes += pOperator->pExpr[i].base.colBytes;
|
|
||||||
}
|
|
||||||
#if 0
|
|
||||||
for (int i = 0; i < pOperator->numOfOutput; i++) {
|
|
||||||
int numOfCols = (int)(taosArrayGetSize(pBlock->pDataBlock));
|
|
||||||
assert(i < numOfCols);
|
|
||||||
|
|
||||||
for (int j = 0; j < numOfCols; j++) {
|
|
||||||
SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, j);
|
|
||||||
if (pColDataInfo->info.colId == pOperator->pExpr[i].base.resSchema.colId) {
|
|
||||||
SDistinctDataInfo item = {.index = j, .type = pColDataInfo->info.type, .bytes = pColDataInfo->info.bytes};
|
|
||||||
taosArrayInsert(pInfo->pDistinctDataInfo, i, &item);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
// pInfo->totalBytes += (int32_t)strlen(MULTI_KEY_DELIM) * (pOperator->numOfOutput);
|
|
||||||
// pInfo->buf = taosMemoryCalloc(1, pInfo->totalBytes);
|
|
||||||
return taosArrayGetSize(pInfo->pDistinctDataInfo) == pOperator->numOfOutput ? true : false;
|
|
||||||
}
|
|
||||||
|
|
||||||
static SSDataBlock* hashDistinct(SOperatorInfo* pOperator, bool* newgroup) {
|
|
||||||
if (pOperator->status == OP_EXEC_DONE) {
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
SDistinctOperatorInfo* pInfo = pOperator->info;
|
|
||||||
SSDataBlock* pRes = pInfo->pRes;
|
|
||||||
|
|
||||||
pRes->info.rows = 0;
|
|
||||||
SSDataBlock* pBlock = NULL;
|
|
||||||
|
|
||||||
SOperatorInfo* pDownstream = pOperator->pDownstream[0];
|
|
||||||
while (1) {
|
|
||||||
publishOperatorProfEvent(pDownstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
|
|
||||||
pBlock = pDownstream->getNextFn(pDownstream, newgroup);
|
|
||||||
publishOperatorProfEvent(pDownstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
|
|
||||||
|
|
||||||
if (pBlock == NULL) {
|
|
||||||
doSetOperatorCompleted(pOperator);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
// ensure result output buf
|
|
||||||
if (pRes->info.rows + pBlock->info.rows > pInfo->resInfo.capacity) {
|
|
||||||
int32_t newSize = pRes->info.rows + pBlock->info.rows;
|
|
||||||
for (int i = 0; i < taosArrayGetSize(pRes->pDataBlock); i++) {
|
|
||||||
SColumnInfoData* pResultColInfoData = taosArrayGet(pRes->pDataBlock, i);
|
|
||||||
SDistinctDataInfo* pDistDataInfo = taosArrayGet(pInfo->pDistinctDataInfo, i);
|
|
||||||
|
|
||||||
// char* tmp = taosMemoryRealloc(pResultColInfoData->pData, newSize * pDistDataInfo->bytes);
|
|
||||||
// if (tmp == NULL) {
|
|
||||||
// return NULL;
|
|
||||||
// } else {
|
|
||||||
// pResultColInfoData->pData = tmp;
|
|
||||||
// }
|
|
||||||
}
|
|
||||||
pInfo->resInfo.capacity = newSize;
|
|
||||||
}
|
|
||||||
|
|
||||||
for (int32_t i = 0; i < pBlock->info.rows; i++) {
|
|
||||||
buildMultiDistinctKey(pInfo, pBlock, i);
|
|
||||||
if (taosHashGet(pInfo->pSet, pInfo->buf, 0) == NULL) {
|
|
||||||
taosHashPut(pInfo->pSet, pInfo->buf, 0, NULL, 0);
|
|
||||||
|
|
||||||
for (int j = 0; j < taosArrayGetSize(pRes->pDataBlock); j++) {
|
|
||||||
SDistinctDataInfo* pDistDataInfo = taosArrayGet(pInfo->pDistinctDataInfo, j); // distinct meta info
|
|
||||||
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pDistDataInfo->index); // src
|
|
||||||
SColumnInfoData* pResultColInfoData = taosArrayGet(pRes->pDataBlock, j); // dist
|
|
||||||
|
|
||||||
char* val = ((char*)pColInfoData->pData) + pDistDataInfo->bytes * i;
|
|
||||||
char* start = pResultColInfoData->pData + pDistDataInfo->bytes * pInfo->pRes->info.rows;
|
|
||||||
memcpy(start, val, pDistDataInfo->bytes);
|
|
||||||
}
|
|
||||||
|
|
||||||
pRes->info.rows += 1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pRes->info.rows >= pInfo->resInfo.threshold) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return (pInfo->pRes->info.rows > 0) ? pInfo->pRes : NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
SOperatorInfo* createDistinctOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo) {
|
|
||||||
SDistinctOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SDistinctOperatorInfo));
|
|
||||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||||
if (pInfo == NULL || pOperator == NULL) {
|
if (pInfo == NULL || pOperator == NULL) {
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
pOperator->resultInfo.capacity = 4096; // todo extract function.
|
pInfo->pGroupCols = pGroupColList;
|
||||||
|
pInfo->pCondition = pCondition;
|
||||||
|
initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, 4096, pResultBlock, pTaskInfo->id.str);
|
||||||
|
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8);
|
||||||
|
|
||||||
// pInfo->totalBytes = 0;
|
int32_t code = initGroupOptrInfo(pInfo, pGroupColList);
|
||||||
pInfo->buf = NULL;
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
goto _error;
|
||||||
|
}
|
||||||
|
|
||||||
pInfo->pDistinctDataInfo = taosArrayInit(numOfCols, sizeof(SDistinctDataInfo));
|
pOperator->name = "PartitionByOperator";
|
||||||
initMultiDistinctInfo(pInfo, pOperator);
|
|
||||||
|
|
||||||
pInfo->pSet = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
|
|
||||||
|
|
||||||
pOperator->name = "DistinctOperator";
|
|
||||||
pOperator->blockingOptr = true;
|
pOperator->blockingOptr = true;
|
||||||
pOperator->status = OP_NOT_OPENED;
|
pOperator->status = OP_NOT_OPENED;
|
||||||
// pOperator->operatorType = DISTINCT;
|
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PARTITION;
|
||||||
pOperator->pExpr = pExpr;
|
pOperator->pExpr = pExprInfo;
|
||||||
pOperator->numOfOutput = numOfCols;
|
pOperator->numOfOutput = numOfCols;
|
||||||
pOperator->info = pInfo;
|
pOperator->info = pInfo;
|
||||||
pOperator->getNextFn = hashDistinct;
|
pOperator->_openFn = operatorDummyOpenFn;
|
||||||
pOperator->closeFn = destroyDistinctOperatorInfo;
|
pOperator->getNextFn = hashGroupbyAggregate;
|
||||||
|
pOperator->closeFn = destroyGroupbyOperatorInfo;
|
||||||
|
|
||||||
int32_t code = appendDownstream(pOperator, &downstream, 1);
|
code = appendDownstream(pOperator, &downstream, 1);
|
||||||
return pOperator;
|
return pOperator;
|
||||||
|
|
||||||
_error:
|
_error:
|
||||||
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
|
taosMemoryFreeClear(pInfo);
|
||||||
taosMemoryFree(pInfo);
|
taosMemoryFreeClear(pOperator);
|
||||||
taosMemoryFree(pOperator);
|
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
Loading…
Reference in New Issue