diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 9a75830ec4..526f43cb64 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -232,9 +232,11 @@ typedef struct STaskAttr { } STaskAttr; struct SOperatorInfo; +struct SAggSupporter; +struct SOptrBasicInfo; -typedef void (*__optr_encode_fn_t)(struct SOperatorInfo* pOperator, char **result, int32_t *length); -typedef bool (*__optr_decode_fn_t)(struct SOperatorInfo* pOperator, char *result, int32_t length); +typedef void (*__optr_encode_fn_t)(struct SOperatorInfo* pOperator, struct SAggSupporter *pSup, struct SOptrBasicInfo *pInfo, char **result, int32_t *length); +typedef bool (*__optr_decode_fn_t)(struct SOperatorInfo* pOperator, struct SAggSupporter *pSup, struct SOptrBasicInfo *pInfo, char *result, int32_t length); typedef int32_t (*__optr_open_fn_t)(struct SOperatorInfo* pOptr); typedef SSDataBlock* (*__optr_fn_t)(struct SOperatorInfo* pOptr, bool* newgroup); @@ -753,6 +755,9 @@ void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status); int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId, EOPTR_EXEC_MODEL model); int32_t getOperatorExplainExecInfo(SOperatorInfo *operatorInfo, SExplainExecInfo **pRes, int32_t *capacity, int32_t *resNum); +bool aggDecodeResultRow(SOperatorInfo* pOperator, SAggSupporter *pSup, SOptrBasicInfo *pInfo, char* result, int32_t length); +void aggEncodeResultRow(SOperatorInfo* pOperator, SAggSupporter *pSup, SOptrBasicInfo *pInfo, char **result, int32_t *length); + #ifdef __cplusplus } #endif diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index d344b93485..8addd53479 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4974,15 +4974,19 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order); doAggregateImpl(pOperator, 0, pInfo->pCtx); - char *result = NULL; - int32_t length = 0; - pOperator->encodeResultRow(pOperator, &result, &length); - SAggSupporter *pSup = &pAggInfo->aggSup; - taosHashClear(pSup->pResultRowHashTable); - pOperator->decodeResultRow(pOperator, result, length); - if(result){ - taosMemoryFree(result); + if(pOperator->encodeResultRow){ + char *result = NULL; + int32_t length = 0; + SAggSupporter *pSup = &pAggInfo->aggSup; + pOperator->encodeResultRow(pOperator, pSup, pInfo, &result, &length); + taosHashClear(pSup->pResultRowHashTable); + pInfo->resultRowInfo.size = 0; + pOperator->decodeResultRow(pOperator, pSup, pInfo, result, length); + if(result){ + taosMemoryFree(result); + } } + } finalizeQueryResult(pInfo->pCtx, pOperator->numOfOutput); @@ -5011,43 +5015,33 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator, bool* newgroup) return (blockDataGetNumOfRows(pInfo->pRes) != 0) ? pInfo->pRes : NULL; } -static void aggEncodeResultRow(SOperatorInfo* pOperator, char **result, int32_t *length) { - SAggSupporter *pSup = NULL; - switch(pOperator->operatorType){ - case QUERY_NODE_PHYSICAL_PLAN_AGG:{ - SAggOperatorInfo *pAggInfo = pOperator->info; - pSup = &pAggInfo->aggSup; - break; - } - case QUERY_NODE_PHYSICAL_PLAN_GROUPBY:{ - SGroupbyOperatorInfo *pAggInfo = pOperator->info; - pSup = &pAggInfo->aggSup; - break; - } - case QUERY_NODE_PHYSICAL_PLAN_INTERVAL:{ - STableIntervalOperatorInfo *pAggInfo = pOperator->info; - pSup = &pAggInfo->aggSup; - break; - } - default:{ - qDebug("invalid operatorType: %d", pOperator->operatorType); - } - } - +void aggEncodeResultRow(SOperatorInfo* pOperator, SAggSupporter *pSup, SOptrBasicInfo *pInfo, char **result, int32_t *length) { int32_t size = taosHashGetSize(pSup->pResultRowHashTable); - size_t keyLen = POINTER_BYTES; // estimate the key length + size_t keyLen = sizeof(uint64_t) * 2; // estimate the key length int32_t totalSize = sizeof(int32_t) + size * (sizeof(int32_t) + keyLen + sizeof(int32_t) + pSup->resultRowSize); *result = taosMemoryCalloc(1, totalSize); if (*result == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return; + longjmp(pOperator->pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY); } *(int32_t*)(*result) = size; int32_t offset = sizeof(int32_t); + + // prepare memory + SResultRowPosition* pos = &pInfo->resultRowInfo.pPosition[pInfo->resultRowInfo.curPos]; + void* pPage = getBufPage(pSup->pResultBuf, pos->pageId); + SResultRow* pRow = (SResultRow*)((char*)pPage + pos->offset); + setBufPageDirty(pPage, true); + releaseBufPage(pSup->pResultBuf, pPage); + void* pIter = taosHashIterate(pSup->pResultRowHashTable, NULL); while (pIter) { void* key = taosHashGetKey(pIter, &keyLen); - SResultRow** p1 = (SResultRow**)pIter; + SResultRowPosition** p1 = (SResultRowPosition**)pIter; + + pPage = (SFilePage*) getBufPage(pSup->pResultBuf, (*p1)->pageId); + pRow = (SResultRow*)((char*)pPage + (*p1)->offset); + setBufPageDirty(pPage, true); + releaseBufPage(pSup->pResultBuf, pPage); // recalculate the result size int32_t realTotalSize = offset + sizeof(int32_t) + keyLen + sizeof(int32_t) + pSup->resultRowSize; @@ -5057,7 +5051,7 @@ static void aggEncodeResultRow(SOperatorInfo* pOperator, char **result, int32_t terrno = TSDB_CODE_OUT_OF_MEMORY; taosMemoryFree(*result); *result = NULL; - return; + longjmp(pOperator->pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY); } else { *result = tmp; } @@ -5071,7 +5065,7 @@ static void aggEncodeResultRow(SOperatorInfo* pOperator, char **result, int32_t // save value *(int32_t*)(*result + offset) = pSup->resultRowSize; offset += sizeof(int32_t); - memcpy(*result + offset, *p1, pSup->resultRowSize); + memcpy(*result + offset, pRow, pSup->resultRowSize); offset += pSup->resultRowSize; pIter = taosHashIterate(pSup->pResultRowHashTable, pIter); @@ -5083,34 +5077,11 @@ static void aggEncodeResultRow(SOperatorInfo* pOperator, char **result, int32_t return; } -static bool aggDecodeResultRow(SOperatorInfo* pOperator, char* result, int32_t length) { +bool aggDecodeResultRow(SOperatorInfo* pOperator, SAggSupporter *pSup, SOptrBasicInfo *pInfo, char* result, int32_t length) { if (!result || length <= 0) { return false; } - SAggSupporter *pSup = NULL; - switch(pOperator->operatorType){ - case QUERY_NODE_PHYSICAL_PLAN_AGG:{ - SAggOperatorInfo *pAggInfo = pOperator->info; - //SOptrBasicInfo *pInfo = &pAggInfo->binfo; - pSup = &pAggInfo->aggSup; - break; - } - case QUERY_NODE_PHYSICAL_PLAN_GROUPBY:{ - SGroupbyOperatorInfo *pAggInfo = pOperator->info; - pSup = &pAggInfo->aggSup; - break; - } - case QUERY_NODE_PHYSICAL_PLAN_INTERVAL:{ - STableIntervalOperatorInfo *pAggInfo = pOperator->info; - pSup = &pAggInfo->aggSup; - break; - } - default:{ - qDebug("invalid operatorType: %d", pOperator->operatorType); - } - } - // int32_t size = taosHashGetSize(pSup->pResultRowHashTable); int32_t count = *(int32_t*)(result); @@ -5122,17 +5093,16 @@ static bool aggDecodeResultRow(SOperatorInfo* pOperator, char* result, int32_t l uint64_t tableGroupId = *(uint64_t*)(result + offset); SResultRow* resultRow = getNewResultRow_rv(pSup->pResultBuf, tableGroupId, pSup->resultRowSize); if (!resultRow) { - terrno = TSDB_CODE_TSC_INVALID_INPUT; - return false; + longjmp(pOperator->pTaskInfo->env, TSDB_CODE_TSC_INVALID_INPUT); } // add a new result set for a new group - taosHashPut(pSup->pResultRowHashTable, result + offset, keyLen, &resultRow, POINTER_BYTES); + SResultRowPosition pos = {.pageId = resultRow->pageId, .offset = resultRow->offset}; + taosHashPut(pSup->pResultRowHashTable, result + offset, keyLen, &pos, sizeof(SResultRowPosition)); offset += keyLen; int32_t valueLen = *(int32_t*)(result + offset); if (valueLen != pSup->resultRowSize) { - terrno = TSDB_CODE_TSC_INVALID_INPUT; - return false; + longjmp(pOperator->pTaskInfo->env, TSDB_CODE_TSC_INVALID_INPUT); } offset += sizeof(int32_t); int32_t pageId = resultRow->pageId; @@ -5143,12 +5113,13 @@ static bool aggDecodeResultRow(SOperatorInfo* pOperator, char* result, int32_t l offset += valueLen; initResultRow(resultRow); - //pInfo->resultRowInfo.pPosition[pInfo->resultRowInfo.size++] = (SResultRowPosition) {.pageId = resultRow->pageId, .offset = resultRow->offset}; + prepareResultListBuffer(&pInfo->resultRowInfo, pOperator->pTaskInfo->env); + pInfo->resultRowInfo.curPos = pInfo->resultRowInfo.size; + pInfo->resultRowInfo.pPosition[pInfo->resultRowInfo.size++] = (SResultRowPosition) {.pageId = resultRow->pageId, .offset = resultRow->offset}; } if (offset != length) { - terrno = TSDB_CODE_TSC_INVALID_INPUT; - return false; + longjmp(pOperator->pTaskInfo->env, TSDB_CODE_TSC_INVALID_INPUT); } return true; } @@ -5375,16 +5346,21 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) { setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order); hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, 0); - char *result = NULL; - int32_t length = 0; - pOperator->encodeResultRow(pOperator, &result, &length); - SAggSupporter *pSup = &pInfo->aggSup; - taosHashClear(pSup->pResultRowHashTable); - pOperator->decodeResultRow(pOperator, result, length); - if(result){ - taosMemoryFree(result); +#if 0 + if(pOperator->encodeResultRow){ + char *result = NULL; + int32_t length = 0; + SAggSupporter *pSup = &pInfo->aggSup; + pOperator->encodeResultRow(pOperator, pSup, &pInfo->binfo, &result, &length); + taosHashClear(pSup->pResultRowHashTable); + pInfo->binfo.resultRowInfo.size = 0; + pOperator->decodeResultRow(pOperator, pSup, &pInfo->binfo, result, length); + if(result){ + taosMemoryFree(result); + } } } +#endif closeAllResultRows(&pInfo->binfo.resultRowInfo); finalizeMultiTupleQueryResult(pInfo->binfo.pCtx, pOperator->numOfOutput, pInfo->aggSup.pResultBuf, @@ -6335,6 +6311,8 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInf pOperator->info = pInfo; pOperator->getNextFn = doStateWindowAgg; pOperator->closeFn = destroyStateWindowOperatorInfo; + pOperator->encodeResultRow = aggEncodeResultRow; + pOperator->decodeResultRow = aggDecodeResultRow; int32_t code = appendDownstream(pOperator, &downstream, 1); return pOperator; @@ -6374,6 +6352,8 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo pOperator->info = pInfo; pOperator->getNextFn = doSessionWindowAgg; pOperator->closeFn = destroySWindowOperatorInfo; + pOperator->encodeResultRow = aggEncodeResultRow; + pOperator->decodeResultRow = aggDecodeResultRow; pOperator->pTaskInfo = pTaskInfo; code = appendDownstream(pOperator, &downstream, 1); diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 3eb8ff1b72..1de33466e3 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -165,7 +165,7 @@ static int32_t buildGroupKeys(void* pKey, const SArray* pGroupColVals) { // assign the group keys or user input constant values if required static void doAssignGroupKeys(SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t totalRows, int32_t rowIndex) { for (int32_t i = 0; i < numOfOutput; ++i) { - if (pCtx[i].functionId == -1) { + if (pCtx[i].functionId == -1) { // select count(*),key from t group by key. SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(&pCtx[i]); SColumnInfoData* pColInfoData = pCtx[i].input.pData[0]; @@ -337,7 +337,7 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx pOperator->name = "GroupbyAggOperator"; pOperator->blockingOptr = true; pOperator->status = OP_NOT_OPENED; - // pOperator->operatorType = OP_Groupby; + pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_GROUPBY; pOperator->pExpr = pExprInfo; pOperator->numOfOutput = numOfCols; pOperator->info = pInfo; @@ -345,6 +345,8 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx pOperator->_openFn = operatorDummyOpenFn; pOperator->getNextFn = hashGroupbyAggregate; pOperator->closeFn = destroyGroupOperatorInfo; + pOperator->encodeResultRow = aggEncodeResultRow; + pOperator->decodeResultRow = aggDecodeResultRow; code = appendDownstream(pOperator, &downstream, 1); return pOperator;