diff --git a/src/client/src/tscLocalMerge.c b/src/client/src/tscLocalMerge.c index 8baa2abf7d..a01e426409 100644 --- a/src/client/src/tscLocalMerge.c +++ b/src/client/src/tscLocalMerge.c @@ -1120,13 +1120,16 @@ static void doExecuteFinalMerge( SLocalMerger *pLocalMerge, int32_t numOfExpr, b } static void savePrevOrderColumns(SMultiwayMergeInfo* pInfo, SSDataBlock* pBlock, int32_t rowIndex) { - int32_t size = taosArrayGetSize(pInfo->orderColumnList); + int32_t size = pInfo->pMerge->pDesc->orderInfo.numOfCols; for(int32_t i = 0; i < size; ++i) { - int32_t index = *(int16_t*)taosArrayGet(pInfo->orderColumnList, i); + int32_t index = pInfo->pMerge->pDesc->orderInfo.colIndex[i]; +// int32_t index = *(int16_t*)taosArrayGet(pInfo->orderColumnList, i); SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, index); memcpy(pInfo->prevRow[i], pColInfo->pData + pColInfo->info.bytes * rowIndex, pColInfo->info.bytes); } + + pInfo->hasPrev = true; } static void doExecuteFinalMergeRv(SMultiwayMergeInfo* pInfo, int32_t numOfExpr, SSDataBlock* pBlock, bool needInit) { @@ -1141,10 +1144,26 @@ static void doExecuteFinalMergeRv(SMultiwayMergeInfo* pInfo, int32_t numOfExpr, continue; } + pCtx[j].size = 1; aAggs[functionId].mergeFunc(&pCtx[j]); } } else { - // todo finalize the result + for(int32_t j = 0; j < numOfExpr; ++j) { + int32_t functionId = pCtx[j].functionId; + if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) { + continue; + } + + pCtx[j].size = 1; + aAggs[functionId].xFinalize(&pCtx[j]); + } + + pInfo->binfo.pRes->info.rows += 1; + + for(int32_t j = 0; j < numOfExpr; ++j) { + pCtx[j].pOutput += pCtx[j].outputBytes; + pCtx[j].pInput += pCtx[j].inputBytes; + } for (int32_t j = 0; j < numOfExpr; ++j) { int32_t functionId = pCtx[j].functionId; @@ -1152,13 +1171,10 @@ static void doExecuteFinalMergeRv(SMultiwayMergeInfo* pInfo, int32_t numOfExpr, continue; } - pCtx[j].pOutput += pCtx[j].outputBytes; - pCtx[j].pInput += pCtx[j].inputBytes; - + pCtx[j].size = 1; aAggs[functionId].mergeFunc(&pCtx[j]); } - pInfo->binfo.pRes->info.rows += 1; } } else { for (int32_t j = 0; j < numOfExpr; ++j) { @@ -1167,13 +1183,24 @@ static void doExecuteFinalMergeRv(SMultiwayMergeInfo* pInfo, int32_t numOfExpr, continue; } + pCtx[j].size = 1; aAggs[functionId].mergeFunc(&pCtx[j]); } } savePrevOrderColumns(pInfo, pBlock, i); - pInfo->hasPrev = true; } + + for(int32_t j = 0; j < numOfExpr; ++j) { + int32_t functionId = pCtx[j].functionId; + if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) { + continue; + } + aAggs[functionId].xFinalize(&pCtx[j]); + } + + pInfo->binfo.pRes->info.rows += 1; + } static void handleUnprocessedRow(SSqlCmd *pCmd, SLocalMerger *pLocalMerge, tFilePage *tmpBuffer) { @@ -1891,12 +1918,6 @@ SSDataBlock* doGlobalAggregate(void* param) { pOperator->status = OP_EXEC_DONE; setQueryStatus(pRuntimeEnv, QUERY_COMPLETED); -// finalizeQueryResult(pOperator, pAggInfo->binfo.pCtx, &pAggInfo->binfo.resultRowInfo, pAggInfo->binfo.rowCellInfoOffset); - pAggInfo->binfo.pRes->info.rows = getNumOfResult(pRuntimeEnv, pAggInfo->binfo.pCtx, pOperator->numOfOutput); - - pOperator->status = OP_EXEC_DONE; - setQueryStatus(pRuntimeEnv, QUERY_COMPLETED); - return pAggInfo->binfo.pRes; } diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 38594f8499..eda06612d9 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -525,17 +525,17 @@ void tscSetResRawPtr(SSqlRes* pRes, SQueryInfo* pQueryInfo) { void tscSetResRawPtrRv(SSqlRes* pRes, SQueryInfo* pQueryInfo, SSDataBlock* pBlock) { assert(pRes->numOfCols > 0); - int32_t offset = 0; +// int32_t offset = 0; for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) { SInternalField* pInfo = (SInternalField*)TARRAY_GET_ELEM(pQueryInfo->fieldsInfo.internalField, i); SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i); - pRes->urow[i] = pColData->pData + offset * pColData->info.bytes; + pRes->urow[i] = pColData->pData/* + offset * pColData->info.bytes*/; pRes->length[i] = pInfo->field.bytes; - offset += pInfo->field.bytes; + //offset += pInfo->field.bytes; // generated the user-defined column result if (pInfo->pExpr->pExpr == NULL && TSDB_COL_IS_UD_COL(pInfo->pExpr->base.colInfo.flag)) { @@ -3337,6 +3337,7 @@ static int32_t createGlobalAggregateExpr(SQueryAttr* pQueryAttr, SQueryInfo* pQu pse->colType = pExpr->base.resType; pse->colBytes = pExpr->base.resBytes; + pse->colInfo.flag = TSDB_COL_NORMAL; for (int32_t j = 0; j < pExpr->base.numOfParams; ++j) { tVariantAssign(&pse->param[j], &pExpr->base.param[j]); diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index dc942e632d..70c1e551d1 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -1734,8 +1734,9 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf } case OP_GlobalAggregate: { - pRuntimeEnv->proot = createGlobalAggregateOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr3, - pQueryAttr->numOfExpr3, &pQueryAttr->order.orderColId, 1); + pRuntimeEnv->proot = + createGlobalAggregateOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr3, + pQueryAttr->numOfExpr3, &pQueryAttr->order.orderColId, 1); break; } @@ -3861,8 +3862,8 @@ static SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfOutput, in return pFillCol; } -int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *tsdb, int32_t tbScanner, SArray* pOperator, - void* param) { +int32_t doInitQInfo(SQInfo* pQInfo, STSBuf* pTsBuf, SArray* prevResult, void* tsdb, int32_t tbScanner, + SArray* pOperator, void* param) { SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQueryAttr *pQueryAttr = pQInfo->runtimeEnv.pQueryAttr; @@ -4299,13 +4300,15 @@ SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, SQueryRuntime return pOptr; } -SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, - int32_t* orderColumn, int32_t numOfOrder) { +SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, + SExprInfo* pExpr, int32_t numOfOutput, int32_t* orderColumn, + int32_t numOfOrder) { SMultiwayMergeInfo* pInfo = calloc(1, sizeof(SMultiwayMergeInfo)); - SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; - int32_t numOfRows = - (int32_t)(GET_ROW_PARAM_FOR_MULTIOUTPUT(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery)); +// SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; + int32_t numOfRows = 4096; +// int32_t numOfRows = +// (int32_t)(GET_ROW_PARAM_FOR_MULTIOUTPUT(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery)); pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, numOfRows); pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); @@ -4322,7 +4325,9 @@ SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, pInfo->prevRow[i] = (char*)pInfo->prevRow + offset; int32_t index = orderColumn[i]; - offset += pExpr[index].base.resBytes; + if (index != INT32_MIN) { + offset += pExpr[index].base.resBytes; + } } pInfo->orderColumnList = taosArrayFromList(orderColumn, numOfOrder, sizeof(int32_t)); diff --git a/src/query/src/qExtbuffer.c b/src/query/src/qExtbuffer.c index 79f86b1609..8d4c24ee67 100644 --- a/src/query/src/qExtbuffer.c +++ b/src/query/src/qExtbuffer.c @@ -485,25 +485,6 @@ int32_t compare_aRv(SSDataBlock* pBlock, int16_t* colIndex, int32_t numOfCols, i return ret; } } -// char *f1 = COLMODEL_GET_VAL(data1, pDescriptor->pColumnModel, numOfRows1, s1, colIdx); -// char *f2 = COLMODEL_GET_VAL(data2, pDescriptor->pColumnModel, numOfRows2, s2, colIdx); - -// if (pDescriptor->pColumnModel->pFields[colIdx].field.type == TSDB_DATA_TYPE_TIMESTAMP) { -// int32_t ret = primaryKeyComparator(*(int64_t *)f1, *(int64_t *)f2, colIdx, pDescriptor->tsOrder); -// if (ret == 0) { -// continue; -// } else { -// return ret; -// } -// } else { -// SSchemaEx *pSchema = &pDescriptor->pColumnModel->pFields[colIdx]; -// int32_t ret = columnValueAscendingComparator(f1, f2, pSchema->field.type, pSchema->field.bytes); -// if (ret == 0) { -// continue; -// } else { -// return ret; -// } -// } } return 0;