diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index aa018aa84f..ac8cd82213 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -625,24 +625,6 @@ typedef struct SSortOperatorInfo { uint64_t totalElapsed; // total elapsed time } SSortOperatorInfo; -typedef struct SDistinctDataInfo { - int32_t index; - int32_t type; - int32_t bytes; -} SDistinctDataInfo; - -typedef struct SDistinctOperatorInfo { - SHashObj* pSet; - SSDataBlock* pRes; - bool recordNullVal; // has already record the null value, no need to try again -// int64_t threshold; // todo remove it -// int64_t outputCapacity;// todo remove it -// int32_t totalBytes; // todo remove it - SResultInfo resInfo; - char* buf; - SArray* pDistinctDataInfo; -} SDistinctOperatorInfo; - int32_t operatorDummyOpenFn(SOperatorInfo* pOperator); void operatorDummyCloseFn(void* param, int32_t numOfCols); int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num); @@ -682,8 +664,10 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock* SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SInterval* pInterval, SSDataBlock* pResBlock, int32_t fillType, char* fillVal, bool multigroupResult, SExecTaskInfo* pTaskInfo); SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createDistinctOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, + SArray* pGroupColList, SNode* pCondition, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); +#if 0 SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv); SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput); @@ -705,6 +689,7 @@ SOperatorInfo* createSLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorI SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pdownstream, int32_t numOfDownstream, SSchema* pSchema, int32_t numOfOutput); +#endif void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 38fa1f0548..c5dca0c77d 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1610,7 +1610,7 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe // window start(end) key interpolation doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->binfo.pCtx, pResult, &nextWin, startPos, forwardStep, pInfo->order, false); - updateTimeWindowInfo(&pInfo->timeWindowData, &win, true); + updateTimeWindowInfo(&pInfo->timeWindowData, &nextWin, true); doApplyFunctions(pInfo->binfo.pCtx, &nextWin, &pInfo->timeWindowData, startPos, forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); } @@ -3276,6 +3276,7 @@ void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock) { SFilterInfo* filter = NULL; + // todo move to the initialization function int32_t code = filterInitFromNode((SNode*)pFilterNode, &filter, 0); SFilterColumnParam param1 = {.numOfCols = pBlock->info.numOfCols, .pDataBlock = pBlock->pDataBlock}; diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index a33a240328..b9733e118f 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -131,7 +131,7 @@ static void recordNewGroupKeys(SGroupbyOperatorInfo* pInfo, SSDataBlock* pBlock, } } -static int32_t buildGroupValKey(void* pKey, int32_t* length, SArray* pGroupColVals) { +static int32_t buildGroupKeys(void* pKey, const SArray* pGroupColVals) { ASSERT(pKey != NULL); size_t numOfGroupCols = taosArrayGetSize(pGroupColVals); @@ -155,8 +155,7 @@ static int32_t buildGroupValKey(void* pKey, int32_t* length, SArray* pGroupColVa } } - *length = (pStart - (char*)pKey); - return 0; + return (int32_t) (pStart - (char*)pKey); } // assign the group keys or user input constant values if required @@ -217,7 +216,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) { continue; } - /*int32_t ret = */ buildGroupValKey(pInfo->keyBuf, &len, pInfo->pGroupColVals); + len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals); int32_t ret = setGroupResultOutputBuf_rv(&(pInfo->binfo), pOperator->numOfOutput, pInfo->keyBuf, TSDB_DATA_TYPE_VARCHAR, len, 0, pInfo->aggSup.pResultBuf, pTaskInfo, &pInfo->aggSup); if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR); @@ -233,7 +232,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) { } if (num > 0) { - /*int32_t ret = */ buildGroupValKey(pInfo->keyBuf, &len, pInfo->pGroupColVals); + len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals); int32_t ret = setGroupResultOutputBuf_rv(&(pInfo->binfo), pOperator->numOfOutput, pInfo->keyBuf, TSDB_DATA_TYPE_VARCHAR, len, 0, pInfo->aggSup.pResultBuf, pTaskInfo, &pInfo->aggSup); @@ -351,164 +350,40 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx return NULL; } -#define MULTI_KEY_DELIM "-" - -static void destroyDistinctOperatorInfo(void* param, int32_t numOfOutput) { - SDistinctOperatorInfo* pInfo = (SDistinctOperatorInfo*)param; - taosHashCleanup(pInfo->pSet); - taosMemoryFreeClear(pInfo->buf); - taosArrayDestroy(pInfo->pDistinctDataInfo); - pInfo->pRes = blockDataDestroy(pInfo->pRes); -} - -static void buildMultiDistinctKey(SDistinctOperatorInfo* pInfo, SSDataBlock* pBlock, int32_t rowId) { - char* p = pInfo->buf; -// memset(p, 0, pInfo->totalBytes); - - for (int i = 0; i < taosArrayGetSize(pInfo->pDistinctDataInfo); i++) { - SDistinctDataInfo* pDistDataInfo = (SDistinctDataInfo*)taosArrayGet(pInfo->pDistinctDataInfo, i); - SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pDistDataInfo->index); - - char* val = ((char*)pColDataInfo->pData) + pColDataInfo->info.bytes * rowId; - if (isNull(val, pDistDataInfo->type)) { - p += pDistDataInfo->bytes; - continue; - } - if (IS_VAR_DATA_TYPE(pDistDataInfo->type)) { - memcpy(p, varDataVal(val), varDataLen(val)); - p += varDataLen(val); - } else { - memcpy(p, val, pDistDataInfo->bytes); - p += pDistDataInfo->bytes; - } - memcpy(p, MULTI_KEY_DELIM, strlen(MULTI_KEY_DELIM)); - p += strlen(MULTI_KEY_DELIM); - } -} - -static bool initMultiDistinctInfo(SDistinctOperatorInfo* pInfo, SOperatorInfo* pOperator) { - for (int i = 0; i < pOperator->numOfOutput; i++) { - // pInfo->totalBytes += pOperator->pExpr[i].base.colBytes; - } -#if 0 - for (int i = 0; i < pOperator->numOfOutput; i++) { - int numOfCols = (int)(taosArrayGetSize(pBlock->pDataBlock)); - assert(i < numOfCols); - - for (int j = 0; j < numOfCols; j++) { - SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, j); - if (pColDataInfo->info.colId == pOperator->pExpr[i].base.resSchema.colId) { - SDistinctDataInfo item = {.index = j, .type = pColDataInfo->info.type, .bytes = pColDataInfo->info.bytes}; - taosArrayInsert(pInfo->pDistinctDataInfo, i, &item); - } - } - } -#endif - -// pInfo->totalBytes += (int32_t)strlen(MULTI_KEY_DELIM) * (pOperator->numOfOutput); -// pInfo->buf = taosMemoryCalloc(1, pInfo->totalBytes); - return taosArrayGetSize(pInfo->pDistinctDataInfo) == pOperator->numOfOutput ? true : false; -} - -static SSDataBlock* hashDistinct(SOperatorInfo* pOperator, bool* newgroup) { - if (pOperator->status == OP_EXEC_DONE) { - return NULL; - } - - SDistinctOperatorInfo* pInfo = pOperator->info; - SSDataBlock* pRes = pInfo->pRes; - - pRes->info.rows = 0; - SSDataBlock* pBlock = NULL; - - SOperatorInfo* pDownstream = pOperator->pDownstream[0]; - while (1) { - publishOperatorProfEvent(pDownstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); - pBlock = pDownstream->getNextFn(pDownstream, newgroup); - publishOperatorProfEvent(pDownstream, QUERY_PROF_AFTER_OPERATOR_EXEC); - - if (pBlock == NULL) { - doSetOperatorCompleted(pOperator); - break; - } - - // ensure result output buf - if (pRes->info.rows + pBlock->info.rows > pInfo->resInfo.capacity) { - int32_t newSize = pRes->info.rows + pBlock->info.rows; - for (int i = 0; i < taosArrayGetSize(pRes->pDataBlock); i++) { - SColumnInfoData* pResultColInfoData = taosArrayGet(pRes->pDataBlock, i); - SDistinctDataInfo* pDistDataInfo = taosArrayGet(pInfo->pDistinctDataInfo, i); - -// char* tmp = taosMemoryRealloc(pResultColInfoData->pData, newSize * pDistDataInfo->bytes); -// if (tmp == NULL) { -// return NULL; -// } else { -// pResultColInfoData->pData = tmp; -// } - } - pInfo->resInfo.capacity = newSize; - } - - for (int32_t i = 0; i < pBlock->info.rows; i++) { - buildMultiDistinctKey(pInfo, pBlock, i); - if (taosHashGet(pInfo->pSet, pInfo->buf, 0) == NULL) { - taosHashPut(pInfo->pSet, pInfo->buf, 0, NULL, 0); - - for (int j = 0; j < taosArrayGetSize(pRes->pDataBlock); j++) { - SDistinctDataInfo* pDistDataInfo = taosArrayGet(pInfo->pDistinctDataInfo, j); // distinct meta info - SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pDistDataInfo->index); // src - SColumnInfoData* pResultColInfoData = taosArrayGet(pRes->pDataBlock, j); // dist - - char* val = ((char*)pColInfoData->pData) + pDistDataInfo->bytes * i; - char* start = pResultColInfoData->pData + pDistDataInfo->bytes * pInfo->pRes->info.rows; - memcpy(start, val, pDistDataInfo->bytes); - } - - pRes->info.rows += 1; - } - } - - if (pRes->info.rows >= pInfo->resInfo.threshold) { - break; - } - } - - return (pInfo->pRes->info.rows > 0) ? pInfo->pRes : NULL; -} - -SOperatorInfo* createDistinctOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo) { - SDistinctOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SDistinctOperatorInfo)); - SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); +SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, + SArray* pGroupColList, SNode* pCondition, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo) { + SGroupbyOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SGroupbyOperatorInfo)); + SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { goto _error; } - pOperator->resultInfo.capacity = 4096; // todo extract function. + pInfo->pGroupCols = pGroupColList; + pInfo->pCondition = pCondition; + initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, 4096, pResultBlock, pTaskInfo->id.str); + initResultRowInfo(&pInfo->binfo.resultRowInfo, 8); -// pInfo->totalBytes = 0; - pInfo->buf = NULL; + int32_t code = initGroupOptrInfo(pInfo, pGroupColList); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } - pInfo->pDistinctDataInfo = taosArrayInit(numOfCols, sizeof(SDistinctDataInfo)); - initMultiDistinctInfo(pInfo, pOperator); - - pInfo->pSet = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); - - pOperator->name = "DistinctOperator"; + pOperator->name = "PartitionByOperator"; pOperator->blockingOptr = true; pOperator->status = OP_NOT_OPENED; -// pOperator->operatorType = DISTINCT; - pOperator->pExpr = pExpr; + pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PARTITION; + pOperator->pExpr = pExprInfo; pOperator->numOfOutput = numOfCols; pOperator->info = pInfo; - pOperator->getNextFn = hashDistinct; - pOperator->closeFn = destroyDistinctOperatorInfo; + pOperator->_openFn = operatorDummyOpenFn; + pOperator->getNextFn = hashGroupbyAggregate; + pOperator->closeFn = destroyGroupbyOperatorInfo; - int32_t code = appendDownstream(pOperator, &downstream, 1); + code = appendDownstream(pOperator, &downstream, 1); return pOperator; _error: - pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; - taosMemoryFree(pInfo); - taosMemoryFree(pOperator); + taosMemoryFreeClear(pInfo); + taosMemoryFreeClear(pOperator); return NULL; -} +} \ No newline at end of file