diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 1ff0cc6925..1188925600 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -825,13 +825,31 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pSqlFuncExpr->colInfo.colIndex = htons(pExpr->colInfo.colIndex); pSqlFuncExpr->colInfo.flag = htons(pExpr->colInfo.flag); + if (TSDB_COL_IS_UD_COL(pExpr->colInfo.flag)) { + pSqlFuncExpr->colType = htons(pExpr->resType); + pSqlFuncExpr->colBytes = htons(pExpr->resBytes); + } else if (pExpr->colInfo.colId == TSDB_TBNAME_COLUMN_INDEX) { + SSchema *s = tGetTbnameColumnSchema(); + + pSqlFuncExpr->colType = htons(s->type); + pSqlFuncExpr->colBytes = htons(s->bytes); + } else if (pExpr->colInfo.colId == TSDB_BLOCK_DIST_COLUMN_INDEX) { + SSchema s = tGetBlockDistColumnSchema(); + + pSqlFuncExpr->colType = htons(s.type); + pSqlFuncExpr->colBytes = htons(s.bytes); + } else { + SSchema* s = tscGetColumnSchemaById(pTableMeta, pExpr->colInfo.colId); + pSqlFuncExpr->colType = htons(s->type); + pSqlFuncExpr->colBytes = htons(s->bytes); + } + pSqlFuncExpr->functionId = htons(pExpr->functionId); pSqlFuncExpr->numOfParams = htons(pExpr->numOfParams); pSqlFuncExpr->resColId = htons(pExpr->resColId); pMsg += sizeof(SSqlFuncMsg); - for (int32_t j = 0; j < pExpr->numOfParams; ++j) { - // todo add log + for (int32_t j = 0; j < pExpr->numOfParams; ++j) { // todo add log pSqlFuncExpr->arg[j].argType = htons((uint16_t)pExpr->param[j].nType); pSqlFuncExpr->arg[j].argBytes = htons(pExpr->param[j].nLen); @@ -869,14 +887,19 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pSqlFuncExpr1->colInfo.colId = htons(pExpr->resColId); pSqlFuncExpr1->colInfo.flag = htons(TSDB_COL_NORMAL); + bool assign = false; for (int32_t f = 0; f < tscSqlExprNumOfExprs(pQueryInfo); ++f) { SSqlExpr *pe = tscSqlExprGet(pQueryInfo, f); if (pe == pExpr) { pSqlFuncExpr1->colInfo.colIndex = htons(f); + pSqlFuncExpr1->colType = htons(pe->resType); + pSqlFuncExpr1->colBytes = htons(pe->resBytes); + assign = true; break; } } + assert(assign); pMsg += sizeof(SSqlFuncMsg); pSqlFuncExpr1 = (SSqlFuncMsg *)pMsg; } else { diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 721b9ca605..b7730405f4 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -387,7 +387,7 @@ typedef struct SColIndex { int16_t colId; // column id int16_t colIndex; // column index in colList if it is a normal column or index in tagColList if a tag uint16_t flag; // denote if it is a tag or a normal column - char name[TSDB_COL_NAME_LEN]; + char name[TSDB_COL_NAME_LEN]; // TODO remove it } SColIndex; /* sql function msg, to describe the message to vnode about sql function @@ -395,7 +395,10 @@ typedef struct SColIndex { typedef struct SSqlFuncMsg { int16_t functionId; int16_t numOfParams; + int16_t resColId; // result column id, id of the current output column + int16_t colType; + int16_t colBytes; SColIndex colInfo; struct ArgElem { diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index c52279f4ce..07ae0b27f8 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -240,6 +240,7 @@ typedef struct SQuery { } SQuery; typedef SSDataBlock* (*__operator_fn_t)(void* param); +typedef void (*__optr_cleanup_fn_t)(void* param, int32_t num); struct SOperatorInfo; @@ -256,8 +257,6 @@ typedef struct SQueryRuntimeEnv { SHashObj* pResultRowHashTable; // quick locate the window object for each result char* keyBuf; // window key buffer SResultRowPool* pool; // window result object pool - -// int32_t* rowCellInfoOffset;// offset value for each row result cell info char** prevRow; SArray* prevResult; // intermediate result, SArray @@ -285,11 +284,11 @@ typedef struct SOperatorInfo { SExprInfo *pExpr; int32_t numOfOutput; - SQueryRuntimeEnv *pRuntimeEnv; + SQueryRuntimeEnv *pRuntimeEnv; + struct SOperatorInfo *upstream; __operator_fn_t exec; - __operator_fn_t cleanup; - struct SOperatorInfo *upstream; + __optr_cleanup_fn_t cleanup; } SOperatorInfo; enum { @@ -357,6 +356,7 @@ typedef struct STableScanInfo { SQLFunctionCtx *pCtx; // next operator query context SResultRowInfo *pResultRowInfo; + int32_t numOfOutput; int32_t *rowCellInfoOffset; int64_t elapsedTime; @@ -367,19 +367,19 @@ typedef struct STagScanInfo { SSDataBlock* pRes; } STagScanInfo; -typedef struct SAggOperatorInfo { +typedef struct SOptrBasicInfo { SResultRowInfo resultRowInfo; + int32_t *rowCellInfoOffset; // offset value for each row result cell info SQLFunctionCtx *pCtx; - int32_t *rowCellInfoOffset; SSDataBlock *pRes; -} SAggOperatorInfo; +} SOptrBasicInfo; + +typedef struct SOptrBasicInfo SAggOperatorInfo; +typedef struct SOptrBasicInfo SHashIntervalOperatorInfo; typedef struct SArithOperatorInfo { - SQLFunctionCtx *pCtx; - int32_t *rowCellInfoOffset; - SResultRowInfo resultRowInfo; - SSDataBlock *pOutput; - int32_t bufCapacity; + SOptrBasicInfo binfo; + int32_t bufCapacity; } SArithOperatorInfo; typedef struct SLimitOperatorInfo { @@ -392,22 +392,12 @@ typedef struct SOffsetOperatorInfo { int64_t currentOffset; } SOffsetOperatorInfo; -typedef struct SHashIntervalOperatorInfo { - SQLFunctionCtx *pCtx; - int32_t *rowCellInfoOffset; - SResultRowInfo resultRowInfo; - SSDataBlock *pRes; -} SHashIntervalOperatorInfo; - typedef struct SFillOperatorInfo { SSDataBlock *pRes; } SFillOperatorInfo; typedef struct SHashGroupbyOperatorInfo { - SQLFunctionCtx *pCtx; - int32_t *rowCellInfoOffset; - SResultRowInfo resultRowInfo; - SSDataBlock *pRes; + SOptrBasicInfo binfo; int32_t colIndex; } SHashGroupbyOperatorInfo; diff --git a/src/query/src/qAggMain.c b/src/query/src/qAggMain.c index 37946b5f90..b191c33048 100644 --- a/src/query/src/qAggMain.c +++ b/src/query/src/qAggMain.c @@ -3566,7 +3566,6 @@ static void arithmetic_function(SQLFunctionCtx *pCtx) { SArithmeticSupport *sas = (SArithmeticSupport *)pCtx->param[1].pz; arithmeticTreeTraverse(sas->pArithExpr->pExpr, pCtx->size, pCtx->pOutput, sas, pCtx->order, getArithColumnData); -// pCtx->pOutput += pCtx->outputBytes * pCtx->size; } static void arithmetic_function_f(SQLFunctionCtx *pCtx, int32_t index) { diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 942bf0df63..ac2cc94ab0 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -183,17 +183,30 @@ static int32_t getNumOfScanTimes(SQuery* pQuery); static bool isFixedOutputQuery(SQuery* pQuery); static SOperatorInfo* createAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); +static void destroyBasicOperatorInfo(void* param, int32_t numOfOutput); +static void destroySFillOperatorInfo(void* param, int32_t numOfOutput); +static void destroyGroupbyOperatorInfo(void* param, int32_t numOfOutput); +static void destroyArithOperatorInfo(void* param, int32_t numOfOutput); + static SOperatorInfo* createArithOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); + static SOperatorInfo* createLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream); + static SOperatorInfo* createOffsetOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream); + static SOperatorInfo* createIntervalAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); + static SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); + static SOperatorInfo* createHashGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); + static SOperatorInfo* createStableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); + static SOperatorInfo* createStableIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); -static SOperatorInfo* createTagScanOperator(SQueryRuntimeEnv* pRuntimeEnv); + +static SOperatorInfo* createTagScanOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv); static int32_t doCopyToSData_rv(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pGroupResInfo, int32_t orderType, SSDataBlock* pBlock); @@ -859,8 +872,8 @@ static int32_t getNumOfRowsInTimeWindow(SQuery *pQuery, SDataBlockInfo *pDataBlo static char *getDataBlock(SQuery* pQuery, SArithmeticSupport *sas, int32_t col, int32_t size, SArray *pDataBlock); -static UNUSED_FUNC void doBlockwiseApplyFunctions_rv(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pCtx, STimeWindow *pWin, int32_t offset, - int32_t forwardStep, TSKEY *tsCol, int32_t numOfTotal, int32_t numOfOutput) { +static void doBlockwiseApplyFunctions_rv(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pCtx, STimeWindow *pWin, + int32_t offset, int32_t forwardStep, TSKEY *tsCol, int32_t numOfTotal, int32_t numOfOutput) { SQuery *pQuery = pRuntimeEnv->pQuery; bool hasPrev = pCtx[0].preAggVals.isSet; @@ -1093,53 +1106,6 @@ static TSKEY getStartTsKey(SQuery* pQuery, SDataBlockInfo* pDataBlockInfo, const return ts; } -static void setInputSDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SSDataBlock* pSDataBlock, int32_t order) { - if (pCtx[0].pInput == NULL && pSDataBlock->pDataBlock != NULL) { - for (int32_t i = 0; i < pOperator->numOfOutput; ++i) { - pCtx[i].size = pSDataBlock->info.rows; - pCtx[i].order = order; - - SColIndex *pCol = &pOperator->pExpr[i].base.colInfo; - if (TSDB_COL_IS_NORMAL_COL(pCol->flag)) { - SColIndex* pColIndex = &pOperator->pExpr[i].base.colInfo; - SColumnInfoData *p = taosArrayGet(pSDataBlock->pDataBlock, pColIndex->colIndex); - assert(p->info.colId == pColIndex->colId); - - SQLFunctionCtx* pCtx1 = &pCtx[i]; - - pCtx1->pInput = p->pData; - pCtx1->inputType = p->info.type; - pCtx1->inputBytes = p->info.bytes; - - uint32_t status = aAggs[pCtx1->functionId].status; - if ((status & (TSDB_FUNCSTATE_SELECTIVITY | TSDB_FUNCSTATE_NEED_TS)) != 0) { - SColumnInfoData *tsInfo = taosArrayGet(pSDataBlock->pDataBlock, 0); - pCtx1->ptsList = tsInfo->pData; - } - } - } - } else { - for (int32_t i = 0; i < pOperator->numOfOutput; ++i) { - pCtx[i].size = pSDataBlock->info.rows; - pCtx[i].order = order; - } - } -} - -static void aggApplyFunctions(SOperatorInfo* pOperator, TSKEY startTs, SQLFunctionCtx* pCtx, SSDataBlock* pSDataBlock) { - SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; - - for (int32_t k = 0; k < pOperator->numOfOutput; ++k) { - setBlockStatisInfo(&pCtx[k], pSDataBlock, &pOperator->pExpr[k].base.colInfo); - - int32_t functionId = pCtx[k].functionId; - if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) { - pCtx[k].startTs = startTs;// this can be set during create the struct - aAggs[functionId].xFunction(&pCtx[k]); - } - } -} - static void setArithParams(SArithmeticSupport* sas, SExprInfo *pExprInfo, SSDataBlock* pSDataBlock) { sas->numOfCols = (int32_t) pSDataBlock->info.numOfCols; sas->pArithExpr = pExprInfo; @@ -1159,7 +1125,77 @@ static void setArithParams(SArithmeticSupport* sas, SExprInfo *pExprInfo, SSData } } -static void arithmeticApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, SExprInfo *pExprInfo, int32_t numOfOutput) { +static void doSetInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order); +static void doSetInputDataBlockInfo(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order) { + for (int32_t i = 0; i < pOperator->numOfOutput; ++i) { + pCtx[i].size = pBlock->info.rows; + pCtx[i].order = order; + + setBlockStatisInfo(&pCtx[i], pBlock, &pOperator->pExpr[i].base.colInfo); + } +} + +static void setInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order) { + if (pCtx[0].functionId == TSDB_FUNC_ARITHM) { + SArithmeticSupport* pSupport = (SArithmeticSupport*) pCtx[0].param[1].pz; + if (pSupport->colList == NULL) { + doSetInputDataBlock(pOperator, pCtx, pBlock, order); + } else { + doSetInputDataBlockInfo(pOperator, pCtx, pBlock, order); + } + } else { + if (pCtx[0].pInput == NULL && pBlock->pDataBlock != NULL) { + doSetInputDataBlock(pOperator, pCtx, pBlock, order); + } else { + doSetInputDataBlockInfo(pOperator, pCtx, pBlock, order); + } + } +} + +static void doSetInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order) { + for (int32_t i = 0; i < pOperator->numOfOutput; ++i) { + pCtx[i].size = pBlock->info.rows; + pCtx[i].order = order; + + setBlockStatisInfo(&pCtx[i], pBlock, &pOperator->pExpr[i].base.colInfo); + + if (pCtx[i].functionId == TSDB_FUNC_ARITHM) { + setArithParams((SArithmeticSupport*)pCtx[i].param[1].pz, &pOperator->pExpr[i], pBlock); + } else { + SColIndex* pCol = &pOperator->pExpr[i].base.colInfo; + if (TSDB_COL_IS_NORMAL_COL(pCol->flag)) { + SColIndex* pColIndex = &pOperator->pExpr[i].base.colInfo; + SColumnInfoData* p = taosArrayGet(pBlock->pDataBlock, pColIndex->colIndex); + + pCtx[i].pInput = p->pData; + + assert(p->info.colId == pColIndex->colId && pCtx[i].inputType == p->info.type && pCtx[i].inputBytes == p->info.bytes); + + uint32_t status = aAggs[pCtx[i].functionId].status; + if ((status & (TSDB_FUNCSTATE_SELECTIVITY | TSDB_FUNCSTATE_NEED_TS)) != 0) { + SColumnInfoData* tsInfo = taosArrayGet(pBlock->pDataBlock, 0); + pCtx[i].ptsList = tsInfo->pData; + } + } + } + } +} + +static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SQLFunctionCtx* pCtx, SSDataBlock* pSDataBlock) { + SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; + + for (int32_t k = 0; k < pOperator->numOfOutput; ++k) { + setBlockStatisInfo(&pCtx[k], pSDataBlock, &pOperator->pExpr[k].base.colInfo); + + int32_t functionId = pCtx[k].functionId; + if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) { + pCtx[k].startTs = startTs;// this can be set during create the struct + aAggs[functionId].xFunction(&pCtx[k]); + } + } +} + +static void arithmeticApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, int32_t numOfOutput) { SQuery *pQuery = pRuntimeEnv->pQuery; for (int32_t k = 0; k < numOfOutput; ++k) { @@ -1290,16 +1326,16 @@ static void hashGroupbyAgg(SQueryRuntimeEnv *pRuntimeEnv, SOperatorInfo* pOperat } // TODO compare with the previous value to speedup the query processing - int32_t ret = setGroupResultOutputBuf_rv(pRuntimeEnv, &pInfo->resultRowInfo, pInfo->pCtx, pOperator->numOfOutput, val, type, bytes, item->groupIndex, pInfo->rowCellInfoOffset); + int32_t ret = setGroupResultOutputBuf_rv(pRuntimeEnv, &pInfo->binfo.resultRowInfo, pInfo->binfo.pCtx, pOperator->numOfOutput, val, type, bytes, item->groupIndex, pInfo->binfo.rowCellInfoOffset); if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR); } for (int32_t k = 0; k < pOperator->numOfOutput; ++k) { - pInfo->pCtx[k].size = 1; // TODO refactor: extract from here - int32_t functionId = pInfo->pCtx[k].functionId; - if (functionNeedToExecute(pRuntimeEnv, &pInfo->pCtx[k], functionId)) { - aAggs[functionId].xFunctionF(&pInfo->pCtx[k], offset); + pInfo->binfo.pCtx[k].size = 1; // TODO refactor: extract from here + int32_t functionId = pInfo->binfo.pCtx[k].functionId; + if (functionNeedToExecute(pRuntimeEnv, &pInfo->binfo.pCtx[k], functionId)) { + aAggs[functionId].xFunctionF(&pInfo->binfo.pCtx[k], offset); } } } @@ -1989,7 +2025,7 @@ void UNUSED_FUNC setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void* inpu //} static SQLFunctionCtx* createSQLFunctionCtx(SQueryRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput, - int32_t** rowCellInfoOffset) { + int32_t** rowCellInfoOffset) { SQuery* pQuery = pRuntimeEnv->pQuery; SQLFunctionCtx * pFuncCtx = (SQLFunctionCtx *)calloc(numOfOutput, sizeof(SQLFunctionCtx)); @@ -1998,6 +2034,10 @@ static SQLFunctionCtx* createSQLFunctionCtx(SQueryRuntimeEnv* pRuntimeEnv, SExpr } *rowCellInfoOffset = calloc(numOfOutput, sizeof(int32_t)); + if (*rowCellInfoOffset == 0) { + tfree(pFuncCtx); + return NULL; + } for (int32_t i = 0; i < numOfOutput; ++i) { SSqlFuncMsg *pSqlFuncMsg = &pExpr[i].base; @@ -2012,26 +2052,8 @@ static SQLFunctionCtx* createSQLFunctionCtx(SQueryRuntimeEnv* pRuntimeEnv, SExpr pCtx->requireNull = false; } - // set the input along with the setting of the input buffer - int32_t index = pSqlFuncMsg->colInfo.colIndex; - if (TSDB_COL_IS_TAG(pIndex->flag)) { - if (pIndex->colId == TSDB_TBNAME_COLUMN_INDEX) { // todo refactor - SSchema *s = tGetTbnameColumnSchema(); - - pCtx->inputBytes = s->bytes; - pCtx->inputType = s->type; - } else if (pIndex->colId == TSDB_BLOCK_DIST_COLUMN_INDEX) { - SSchema s = tGetBlockDistColumnSchema(); - pCtx->inputBytes = s.bytes; - pCtx->inputType = s.type; - } else { - pCtx->inputBytes = pQuery->tagColList[index].bytes; - pCtx->inputType = pQuery->tagColList[index].type; - } - } else if (TSDB_COL_IS_UD_COL(pIndex->flag)) { - pCtx->inputBytes = pSqlFuncMsg->arg[0].argBytes; - pCtx->inputType = pSqlFuncMsg->arg[0].argType; - } + pCtx->inputBytes = pSqlFuncMsg->colBytes; + pCtx->inputType = pSqlFuncMsg->colType; pCtx->ptsOutputBuf = NULL; @@ -2093,25 +2115,15 @@ static SQLFunctionCtx* createSQLFunctionCtx(SQueryRuntimeEnv* pRuntimeEnv, SExpr pCtx->param[2].i64 = pQuery->window.ekey; pCtx->param[2].nType = TSDB_DATA_TYPE_BIGINT; } else if (functionId == TSDB_FUNC_ARITHM) { - pRuntimeEnv->sasArray[i].data = calloc(pQuery->numOfCols, POINTER_BYTES); - if (pRuntimeEnv->sasArray[i].data == NULL) { - goto _clean; - } - pCtx->param[1].pz = (char*) &pRuntimeEnv->sasArray[i]; } if (i > 0) { -// (*offset)[i] = (*offset)[i - 1] + pFuncCtx[i - 1].outputBytes; (*rowCellInfoOffset)[i] = (*rowCellInfoOffset)[i - 1] + sizeof(SResultRowCellInfo) + pExpr[i - 1].interBytes; } } return pFuncCtx; - - _clean: - tfree(pFuncCtx); - return NULL; } static void* destroySQLFunctionCtx(SQLFunctionCtx* pCtx, int32_t numOfOutput) { @@ -2163,12 +2175,6 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf *(int64_t*) pRuntimeEnv->prevRow[0] = INT64_MIN; - // if it is group by normal column, do not set output buffer, the output buffer is pResult - // fixed output query/multi-output query for normal table -// if (!pQuery->groupbyColumn && !pQuery->stableQuery && !QUERY_IS_INTERVAL_QUERY(pRuntimeEnv->pQuery)) { -// resetDefaultResInfoOutputBuf(pRuntimeEnv); -// } - // if (setCtxTagColumnInfo(pRuntimeEnv, pRuntimeEnv->pCtx) != TSDB_CODE_SUCCESS) { // goto _clean; // } @@ -2265,7 +2271,6 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { SQInfo* pQInfo = (SQInfo*) GET_QINFO_ADDR(pRuntimeEnv); qDebug("QInfo:%p teardown runtime env", pQInfo); -// cleanupResultRowInfo(&pRuntimeEnv->resultRowInfo); if (isTsCompQuery(pQuery)) { FILE *f = *(FILE **)pQuery->sdata[0]->data; @@ -2276,25 +2281,10 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { } } - -// if (pRuntimeEnv->pCtx != NULL) { -// for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { -// SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i]; -// -// for (int32_t j = 0; j < pCtx->numOfParams; ++j) { -// tVariantDestroy(&pCtx->param[j]); -// } -// -// tVariantDestroy(&pCtx->tag); -// tfree(pCtx->tagInfo.pTagCtxList); -// } -// -// tfree(pRuntimeEnv->pCtx); -// } - if (pRuntimeEnv->sasArray != NULL) { for(int32_t i = 0; i < pQuery->numOfOutput; ++i) { tfree(pRuntimeEnv->sasArray[i].data); + tfree(pRuntimeEnv->sasArray[i].colList); } tfree(pRuntimeEnv->sasArray); @@ -2308,7 +2298,6 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { pRuntimeEnv->pTsBuf = tsBufDestroy(pRuntimeEnv->pTsBuf); tfree(pRuntimeEnv->keyBuf); -// tfree(pRuntimeEnv->rowCellInfoOffset); tfree(pRuntimeEnv->prevRow); tfree(pRuntimeEnv->tagVal); @@ -2319,7 +2308,6 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { taosArrayDestroyEx(pRuntimeEnv->prevResult, freeInterResult); pRuntimeEnv->prevResult = NULL; - pRuntimeEnv->outputBuf = destroyOutputBuf(pRuntimeEnv->outputBuf); destroyOperatorInfo(pRuntimeEnv->proot); } @@ -3575,7 +3563,7 @@ void setDefaultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pCtx, SR } void updateOutputBuf(SArithOperatorInfo* pInfo, int32_t numOfInputRows) { - SSDataBlock* pDataBlock = pInfo->pOutput; + SSDataBlock* pDataBlock = pInfo->binfo.pRes; if (pInfo->bufCapacity < pDataBlock->info.rows + numOfInputRows) { int32_t newSize = pDataBlock->info.rows + numOfInputRows; @@ -3593,7 +3581,7 @@ void updateOutputBuf(SArithOperatorInfo* pInfo, int32_t numOfInputRows) { for (int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) { SColumnInfoData *pColInfo = taosArrayGet(pDataBlock->pDataBlock, i); - pInfo->pCtx[i].pOutput = pColInfo->pData + pColInfo->info.bytes * pDataBlock->info.rows; + pInfo->binfo.pCtx[i].pOutput = pColInfo->pData + pColInfo->info.bytes * pDataBlock->info.rows; } } @@ -4581,9 +4569,11 @@ static void toSSDataBlock(SGroupResInfo *pGroupResInfo, SQueryRuntimeEnv* pRunti SColumnInfoData* pInfoData = taosArrayGet(pBlock->pDataBlock, 0); - STimeWindow* w = &pBlock->info.window; - w->skey = *(int64_t*)pInfoData->pData; - w->ekey = *(int64_t*)(pInfoData->pData + TSDB_KEYSIZE * (pBlock->info.rows - 1)); + if (pInfoData->info.type == TSDB_DATA_TYPE_TIMESTAMP) { + STimeWindow* w = &pBlock->info.window; + w->skey = *(int64_t*)pInfoData->pData; + w->ekey = *(int64_t*)(pInfoData->pData + TSDB_KEYSIZE * (pBlock->info.rows - 1)); + } } static void updateWindowResNumOfRes_rv(SQueryRuntimeEnv *pRuntimeEnv, @@ -5223,7 +5213,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *ts pQuery->groupbyColumn = isGroupbyColumn(pQuery->pGroupbyExpr); if (onlyQueryTags(pQuery)) { - pRuntimeEnv->proot = createTagScanOperator(pRuntimeEnv); + pRuntimeEnv->proot = createTagScanOperatorInfo(pRuntimeEnv); } else if (needReverseScan(pQuery)) { pRuntimeEnv->pi = createBiDirectionTableScanInfo(pRuntimeEnv->pQueryHandle, pRuntimeEnv, getNumOfScanTimes(pQuery), 1); } else { @@ -6204,25 +6194,25 @@ void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInf assert(pTableScanInfo != NULL && pDownstream != NULL); char* name = pDownstream->name; - if ((strcasecmp(name, "AggregationOp") == 0) || (strcasecmp(name, "STableAggregate") == 0)) { + if ((strcasecmp(name, "TableAggregate") == 0) || (strcasecmp(name, "STableAggregate") == 0)) { SAggOperatorInfo* pAggInfo = pDownstream->info; pTableScanInfo->pCtx = pAggInfo->pCtx; pTableScanInfo->pResultRowInfo = &pAggInfo->resultRowInfo; pTableScanInfo->rowCellInfoOffset = pAggInfo->rowCellInfoOffset; - } else if (strcasecmp(name, "HashIntervalAggOp") == 0) { + } else if (strcasecmp(name, "HashIntervalAgg") == 0) { SHashIntervalOperatorInfo *pIntervalInfo = pDownstream->info; pTableScanInfo->pCtx = pIntervalInfo->pCtx; pTableScanInfo->pResultRowInfo = &pIntervalInfo->resultRowInfo; pTableScanInfo->rowCellInfoOffset = pIntervalInfo->rowCellInfoOffset; - } else if (strcasecmp(name, "HashGroupbyAggOp") == 0) { + } else if (strcasecmp(name, "HashGroupbyAgg") == 0) { SHashGroupbyOperatorInfo *pGroupbyInfo = pDownstream->info; - pTableScanInfo->pCtx = pGroupbyInfo->pCtx; - pTableScanInfo->pResultRowInfo = &pGroupbyInfo->resultRowInfo; - pTableScanInfo->rowCellInfoOffset = pGroupbyInfo->rowCellInfoOffset; + pTableScanInfo->pCtx = pGroupbyInfo->binfo.pCtx; + pTableScanInfo->pResultRowInfo = &pGroupbyInfo->binfo.resultRowInfo; + pTableScanInfo->rowCellInfoOffset = pGroupbyInfo->binfo.rowCellInfoOffset; } else if (strcasecmp(name, "STableIntervalAggOp") == 0) { SHashIntervalOperatorInfo *pInfo = pDownstream->info; @@ -6234,10 +6224,9 @@ void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInf } else if (strcasecmp(name, "ArithmeticOp") == 0) { SArithOperatorInfo *pInfo = pDownstream->info; - pTableScanInfo->pCtx = pInfo->pCtx; - pTableScanInfo->pResultRowInfo = &pInfo->resultRowInfo; - pTableScanInfo->rowCellInfoOffset = pInfo->rowCellInfoOffset; - + pTableScanInfo->pCtx = pInfo->binfo.pCtx; + pTableScanInfo->pResultRowInfo = &pInfo->binfo.resultRowInfo; + pTableScanInfo->rowCellInfoOffset = pInfo->binfo.rowCellInfoOffset; } else { assert(0); } @@ -6304,17 +6293,15 @@ static SSDataBlock* doAggregate(void* param) { } // the pDataBlock are always the same one, no need to call this again - setInputSDataBlock(pOperator, pAggInfo->pCtx, pBlock, order); - aggApplyFunctions(pOperator, pQuery->window.skey, pAggInfo->pCtx, pBlock); + setInputDataBlock(pOperator, pAggInfo->pCtx, pBlock, order); + doAggregateImpl(pOperator, pQuery->window.skey, pAggInfo->pCtx, pBlock); } pOperator->completed = true; setQueryStatus(pRuntimeEnv->pQuery, QUERY_COMPLETED); finalizeQueryResult_rv(pOperator, pAggInfo->pCtx, &pAggInfo->resultRowInfo, pAggInfo->rowCellInfoOffset); - pAggInfo->pRes->info.rows = getNumOfResult_rv(pRuntimeEnv, pAggInfo->pCtx, pOperator->numOfOutput); - destroySQLFunctionCtx(pAggInfo->pCtx, pAggInfo->pRes->info.numOfCols); return pAggInfo->pRes; } @@ -6359,11 +6346,11 @@ static SSDataBlock* doSTableAggregate(void* param) { } // the pDataBlock are always the same one, no need to call this again - setInputSDataBlock(pOperator, pAggInfo->pCtx, pBlock, order); + setInputDataBlock(pOperator, pAggInfo->pCtx, pBlock, order); TSKEY k = (pQuery->order.order == TSDB_ORDER_ASC)? pBlock->info.window.ekey + 1:pBlock->info.window.skey-1; setExecutionContext_rv(pRuntimeEnv, pAggInfo, pOperator->numOfOutput, pQuery->current->groupIndex, k); - aggApplyFunctions(pOperator, pQuery->window.skey, pAggInfo->pCtx, pBlock); + doAggregateImpl(pOperator, pQuery->window.skey, pAggInfo->pCtx, pBlock); } closeAllResultRows(&pAggInfo->resultRowInfo); @@ -6375,7 +6362,6 @@ static SSDataBlock* doSTableAggregate(void* param) { pOperator->completed = true; } - destroySQLFunctionCtx(pAggInfo->pCtx, pAggInfo->pRes->info.numOfCols); return pAggInfo->pRes; } @@ -6385,10 +6371,8 @@ static SSDataBlock* doArithmeticOperation(void* param) { SArithOperatorInfo* pArithInfo = pOperator->info; SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; - setDefaultOutputBuf(pRuntimeEnv, pArithInfo->pCtx, &pArithInfo->resultRowInfo, pArithInfo->pOutput, pArithInfo->rowCellInfoOffset); - pRuntimeEnv->pQuery->pos = 0; - pArithInfo->pOutput->info.rows = 0; + pArithInfo->binfo.pRes->info.rows = 0; while(1) { SSDataBlock* pBlock = pOperator->upstream->exec(pOperator->upstream); @@ -6397,39 +6381,20 @@ static SSDataBlock* doArithmeticOperation(void* param) { break; } - setTagVal_rv(pOperator, pRuntimeEnv->pQuery->current->pTable, pArithInfo->pCtx, pOperator->numOfOutput); + setTagVal_rv(pOperator, pRuntimeEnv->pQuery->current->pTable, pArithInfo->binfo.pCtx, pOperator->numOfOutput); // the pDataBlock are always the same one, no need to call this again - for (int32_t i = 0; i < pOperator->numOfOutput; ++i) { - pArithInfo->pCtx[i].size = pBlock->info.rows; - if (pArithInfo->pCtx[i].functionId == TSDB_FUNC_ARITHM) { - setArithParams((SArithmeticSupport*) pArithInfo->pCtx[i].param[1].pz, &pOperator->pExpr[i], pBlock); - } else { - SColIndex *pCol = &pOperator->pExpr[i].base.colInfo; - if (TSDB_COL_IS_NORMAL_COL(pCol->flag)) { - for (int32_t j = 0; j < pBlock->info.numOfCols; ++j) { - SColumnInfoData *p = taosArrayGet(pBlock->pDataBlock, j); - if (p->info.colId == pCol->colId) { - pArithInfo->pCtx[i].pInput = p->pData; - pArithInfo->pCtx[i].inputType = p->info.type; - pArithInfo->pCtx[i].inputBytes = p->info.bytes; - break; - } - } - } - } - } - + setInputDataBlock(pOperator, pArithInfo->binfo.pCtx, pBlock, pRuntimeEnv->pQuery->order.order); updateOutputBuf(pArithInfo, pBlock->info.rows); - arithmeticApplyFunctions(pRuntimeEnv, pArithInfo->pCtx, pOperator->pExpr, pOperator->numOfOutput); - pArithInfo->pOutput->info.rows += pBlock->info.rows; + arithmeticApplyFunctions(pRuntimeEnv, pArithInfo->binfo.pCtx, pOperator->numOfOutput); - if (pArithInfo->pOutput->info.rows >= 4096) { + pArithInfo->binfo.pRes->info.rows += pBlock->info.rows; + if (pArithInfo->binfo.pRes->info.rows >= 4096) { break; } } - return pArithInfo->pOutput; + return pArithInfo->binfo.pRes; } static SSDataBlock* doLimit(void* param) { @@ -6523,7 +6488,7 @@ static SSDataBlock* doHashIntervalAgg(void* param) { } // the pDataBlock are always the same one, no need to call this again - setInputSDataBlock(pOperator, pIntervalInfo->pCtx, pBlock, pQuery->order.order); + setInputDataBlock(pOperator, pIntervalInfo->pCtx, pBlock, pQuery->order.order); hashIntervalAgg(pOperator, &pIntervalInfo->resultRowInfo, pIntervalInfo, pBlock, 0); } @@ -6581,7 +6546,7 @@ static SSDataBlock* doSTableIntervalAgg(void* param) { STableQueryInfo* pTableQueryInfo = pRuntimeEnv->pQuery->current; setTagVal_rv(pOperator, pTableQueryInfo->pTable, pIntervalInfo->pCtx, pOperator->numOfOutput); - setInputSDataBlock(pOperator, pIntervalInfo->pCtx, pBlock, pQuery->order.order); + setInputDataBlock(pOperator, pIntervalInfo->pCtx, pBlock, pQuery->order.order); setIntervalQueryRange(pRuntimeEnv, pBlock->info.window.skey); hashIntervalAgg(pOperator, &pTableQueryInfo->resInfo, pIntervalInfo, pBlock, pTableQueryInfo->groupIndex); @@ -6609,11 +6574,11 @@ static SSDataBlock* doHashGroupbyAgg(void* param) { SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; if (hasRemainData(&pRuntimeEnv->groupResInfo)) { - toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->pRes); - if (pInfo->pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) { + toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->binfo.pRes); + if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) { pOperator->completed = true; } - return pInfo->pRes; + return pInfo->binfo.pRes; } SOperatorInfo* upstream = pOperator->upstream; @@ -6626,7 +6591,7 @@ static SSDataBlock* doHashGroupbyAgg(void* param) { } // the pDataBlock are always the same one, no need to call this again - setInputSDataBlock(pOperator, pInfo->pCtx, pBlock, pRuntimeEnv->pQuery->order.order); + setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, pRuntimeEnv->pQuery->order.order); if (pInfo->colIndex == -1) { pInfo->colIndex = getGroupbyColumnData_rv(pRuntimeEnv->pQuery->pGroupbyExpr, pBlock); } @@ -6634,23 +6599,23 @@ static SSDataBlock* doHashGroupbyAgg(void* param) { hashGroupbyAgg(pRuntimeEnv, pOperator, pInfo, pBlock); } - closeAllResultRows(&pInfo->resultRowInfo); + closeAllResultRows(&pInfo->binfo.resultRowInfo); setQueryStatus(pRuntimeEnv->pQuery, QUERY_COMPLETED); if (!pRuntimeEnv->pQuery->stableQuery) { - finalizeQueryResult_rv(pOperator, pInfo->pCtx, &pInfo->resultRowInfo, pInfo->rowCellInfoOffset); + finalizeQueryResult_rv(pOperator, pInfo->binfo.pCtx, &pInfo->binfo.resultRowInfo, pInfo->binfo.rowCellInfoOffset); } - updateWindowResNumOfRes_rv(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput, &pInfo->resultRowInfo, pInfo->rowCellInfoOffset); + updateWindowResNumOfRes_rv(pRuntimeEnv, pInfo->binfo.pCtx, pOperator->numOfOutput, &pInfo->binfo.resultRowInfo, pInfo->binfo.rowCellInfoOffset); - initGroupResInfo(&pRuntimeEnv->groupResInfo, &pInfo->resultRowInfo, 0); - toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->pRes); + initGroupResInfo(&pRuntimeEnv->groupResInfo, &pInfo->binfo.resultRowInfo, 0); + toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->binfo.pRes); - if (pInfo->pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) { + if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) { pOperator->completed = true; } - return pInfo->pRes; + return pInfo->binfo.pRes; } static SSDataBlock* doFill(void* param) { @@ -6702,7 +6667,7 @@ static void destroyOperatorInfo(SOperatorInfo* pOperator) { } if (pOperator->cleanup != NULL) { - pOperator->cleanup(pOperator->info); + pOperator->cleanup(pOperator->info, pOperator->numOfOutput); } destroyOperatorInfo(pOperator->upstream); @@ -6720,47 +6685,81 @@ static SOperatorInfo* createAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, setDefaultOutputBuf(pRuntimeEnv, pInfo->pCtx, &pInfo->resultRowInfo, pInfo->pRes, pInfo->rowCellInfoOffset); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); - pOperator->name = "Aggregate"; + pOperator->name = "TableAggregate"; pOperator->blockingOptr = true; pOperator->completed = false; pOperator->info = pInfo; pOperator->upstream = upstream; - pOperator->exec = doAggregate; pOperator->pExpr = pExpr; pOperator->numOfOutput = numOfOutput; pOperator->pRuntimeEnv = pRuntimeEnv; + pOperator->exec = doAggregate; + pOperator->cleanup = destroyBasicOperatorInfo; return pOperator; } -static SOperatorInfo* createStableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { +static void doDestroyBasicInfo(SOptrBasicInfo* pInfo, int32_t numOfOutput) { + assert(pInfo != NULL); + + destroySQLFunctionCtx(pInfo->pCtx, numOfOutput); + tfree(pInfo->rowCellInfoOffset); + + cleanupResultRowInfo(&pInfo->resultRowInfo); + pInfo->pRes = destroyOutputBuf(pInfo->pRes); +} + +static void destroyBasicOperatorInfo(void* param, int32_t numOfOutput) { + SOptrBasicInfo* pInfo = (SOptrBasicInfo*) param; + doDestroyBasicInfo(pInfo, numOfOutput); +} + +static void destroySFillOperatorInfo(void* param, int32_t numOfOutput) { + SFillOperatorInfo* pInfo = (SFillOperatorInfo*) param; + pInfo->pRes = destroyOutputBuf(pInfo->pRes); +} + +static void destroyGroupbyOperatorInfo(void* param, int32_t numOfOutput) { + SHashGroupbyOperatorInfo* pInfo = (SHashGroupbyOperatorInfo*) param; + doDestroyBasicInfo(&pInfo->binfo, numOfOutput); +} + +static void destroyArithOperatorInfo(void* param, int32_t numOfOutput) { + SArithOperatorInfo* pInfo = (SArithOperatorInfo*) param; + doDestroyBasicInfo(&pInfo->binfo, numOfOutput); +} + +SOperatorInfo* createStableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo)); + pInfo->pRes = createOutputBuf(pExpr, numOfOutput); pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset); initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT); - pInfo->pRes = createOutputBuf(pExpr, numOfOutput); - SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "STableAggregate"; pOperator->blockingOptr = true; pOperator->completed = false; pOperator->info = pInfo; pOperator->upstream = upstream; - pOperator->exec = doSTableAggregate; pOperator->pExpr = pExpr; pOperator->numOfOutput = numOfOutput; pOperator->pRuntimeEnv = pRuntimeEnv; + pOperator->exec = doSTableAggregate; + pOperator->cleanup = destroyBasicOperatorInfo; + return pOperator; } -static SOperatorInfo* createArithOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { +SOperatorInfo* createArithOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SArithOperatorInfo* pInfo = calloc(1, sizeof(SArithOperatorInfo)); - pInfo->pOutput = createOutputBuf(pExpr, numOfOutput); - pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset); - initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT); + pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput); + pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); + + initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT); + setDefaultOutputBuf(pRuntimeEnv, pInfo->binfo.pCtx, &pInfo->binfo.resultRowInfo, pInfo->binfo.pRes, pInfo->binfo.rowCellInfoOffset); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "ArithmeticOp"; @@ -6768,15 +6767,17 @@ static SOperatorInfo* createArithOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOp pOperator->completed = false; pOperator->info = pInfo; pOperator->upstream = upstream; - pOperator->exec = doArithmeticOperation; pOperator->pExpr = pExpr; pOperator->numOfOutput = numOfOutput; pOperator->pRuntimeEnv = pRuntimeEnv; + pOperator->exec = doArithmeticOperation; + pOperator->cleanup = destroyArithOperatorInfo; + return pOperator; } -static SOperatorInfo* createLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream) { +SOperatorInfo* createLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream) { SLimitOperatorInfo* pInfo = calloc(1, sizeof(SLimitOperatorInfo)); pInfo->limit = pRuntimeEnv->pQuery->limit.limit; @@ -6793,7 +6794,7 @@ static SOperatorInfo* createLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOp return pOperator; } -static SOperatorInfo* createOffsetOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream) { +SOperatorInfo* createOffsetOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream) { SOffsetOperatorInfo* pInfo = calloc(1, sizeof(SOffsetOperatorInfo)); pInfo->offset = pRuntimeEnv->pQuery->limit.offset; @@ -6812,7 +6813,7 @@ static SOperatorInfo* createOffsetOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SO return pOperator; } -static SOperatorInfo* createIntervalAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { +SOperatorInfo* createIntervalAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SHashIntervalOperatorInfo* pInfo = calloc(1, sizeof(SHashIntervalOperatorInfo)); pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset); @@ -6825,35 +6826,37 @@ static SOperatorInfo* createIntervalAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEn pOperator->blockingOptr = true; pOperator->completed = false; pOperator->upstream = upstream; - pOperator->exec = doHashIntervalAgg; pOperator->pExpr = pExpr; pOperator->numOfOutput = numOfOutput; pOperator->info = pInfo; pOperator->pRuntimeEnv = pRuntimeEnv; + pOperator->exec = doHashIntervalAgg; + pOperator->cleanup = destroyBasicOperatorInfo; + return pOperator; } -static SOperatorInfo* createStableIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { +SOperatorInfo* createStableIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SHashIntervalOperatorInfo* pInfo = calloc(1, sizeof(SHashIntervalOperatorInfo)); pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset); pInfo->pRes = createOutputBuf(pExpr, numOfOutput); initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT); - SQuery* pQuery = pRuntimeEnv->pQuery; SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); - pOperator->name = "STableIntervalAggOp"; pOperator->blockingOptr = true; pOperator->completed = false; pOperator->upstream = upstream; - pOperator->exec = doSTableIntervalAgg; - pOperator->pExpr = pQuery->pExpr1; - pOperator->numOfOutput = pQuery->numOfOutput; + pOperator->pExpr = pExpr; + pOperator->numOfOutput = numOfOutput; pOperator->info = pInfo; pOperator->pRuntimeEnv = pRuntimeEnv; + pOperator->exec = doSTableIntervalAgg; + pOperator->cleanup = destroyBasicOperatorInfo; + return pOperator; } @@ -6861,26 +6864,27 @@ SOperatorInfo* createHashGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpe SHashGroupbyOperatorInfo* pInfo = calloc(1, sizeof(SHashGroupbyOperatorInfo)); pInfo->colIndex = -1; // group by column index - pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset); - pInfo->pRes = createOutputBuf(pExpr, numOfOutput); - initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT); + pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); + pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput); + initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); - - pOperator->name = "HashGroupby"; + pOperator->name = "HashGroupbyAgg"; pOperator->blockingOptr = true; pOperator->completed = false; pOperator->upstream = upstream; - pOperator->exec = doHashGroupbyAgg; pOperator->pExpr = pExpr; pOperator->numOfOutput = numOfOutput; pOperator->info = pInfo; pOperator->pRuntimeEnv = pRuntimeEnv; + pOperator->exec = doHashGroupbyAgg; + pOperator->cleanup = destroyGroupbyOperatorInfo; + return pOperator; } -static SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, +SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SFillOperatorInfo* pInfo = calloc(1, sizeof(SFillOperatorInfo)); pInfo->pRes = createOutputBuf(pExpr, numOfOutput); @@ -6891,12 +6895,14 @@ static SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpe pOperator->blockingOptr = false; pOperator->completed = false; pOperator->upstream = upstream; - pOperator->exec = doFill; pOperator->pExpr = pExpr; pOperator->numOfOutput = numOfOutput; pOperator->info = pInfo; pOperator->pRuntimeEnv = pRuntimeEnv; + pOperator->exec = doFill; + pOperator->cleanup = destroySFillOperatorInfo; + return pOperator; } @@ -7035,7 +7041,7 @@ static SSDataBlock* doTagScan(void* param) { return pTagScanInfo->pRes; } -static SOperatorInfo* createTagScanOperator(SQueryRuntimeEnv* pRuntimeEnv) { +SOperatorInfo* createTagScanOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv) { STagScanInfo* pInfo = calloc(1, sizeof(STagScanInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); @@ -7432,6 +7438,9 @@ int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param) { pExprMsg->colInfo.colIndex = htons(pExprMsg->colInfo.colIndex); pExprMsg->colInfo.colId = htons(pExprMsg->colInfo.colId); pExprMsg->colInfo.flag = htons(pExprMsg->colInfo.flag); + pExprMsg->colType = htons(pExprMsg->colType); + pExprMsg->colBytes = htons(pExprMsg->colBytes); + pExprMsg->functionId = htons(pExprMsg->functionId); pExprMsg->numOfParams = htons(pExprMsg->numOfParams); pExprMsg->resColId = htons(pExprMsg->resColId); @@ -7471,6 +7480,9 @@ int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param) { pExprMsg->colInfo.colIndex = htons(pExprMsg->colInfo.colIndex); pExprMsg->colInfo.colId = htons(pExprMsg->colInfo.colId); pExprMsg->colInfo.flag = htons(pExprMsg->colInfo.flag); + pExprMsg->colType = htons(pExprMsg->colType); + pExprMsg->colBytes = htons(pExprMsg->colBytes); + pExprMsg->functionId = htons(pExprMsg->functionId); pExprMsg->numOfParams = htons(pExprMsg->numOfParams); @@ -8036,7 +8048,7 @@ SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr // allocate additional memory for interResults that are usually larger then final results // TODO refactor int16_t bytes = 0; - if (pQuery->pExpr2 == NULL || col > pQuery->numOfExpr2) { + if (pQuery->pExpr2 == NULL || col >= pQuery->numOfExpr2) { bytes = pExprs[col].bytes; } else { bytes = MAX(pQuery->pExpr2[col].bytes, pExprs[col].bytes); diff --git a/src/query/src/qFill.c b/src/query/src/qFill.c index 5b5a7b6656..a3c69c7707 100644 --- a/src/query/src/qFill.c +++ b/src/query/src/qFill.c @@ -279,7 +279,7 @@ static int32_t setTagColumnInfo(SFillInfo* pFillInfo, int32_t numOfCols, int32_t int32_t k = 0; for (int32_t i = 0; i < numOfCols; ++i) { SFillColInfo* pColInfo = &pFillInfo->pFillCol[i]; - pFillInfo->pData[i] = calloc(1, pColInfo->col.bytes * capacity); + pFillInfo->pData[i] = NULL; if (TSDB_COL_IS_TAG(pColInfo->flag)) { bool exists = false; @@ -377,10 +377,10 @@ void* taosDestroyFillInfo(SFillInfo* pFillInfo) { tfree(pFillInfo->prevValues); tfree(pFillInfo->nextValues); tfree(pFillInfo->pTags); - - for(int32_t i = 0; i < pFillInfo->numOfCols; ++i) { - tfree(pFillInfo->pData[i]); - } + +// for(int32_t i = 0; i < pFillInfo->numOfCols; ++i) { +// tfree(pFillInfo->pData[i]); +// } tfree(pFillInfo->pData); tfree(pFillInfo->pFillCol); diff --git a/tests/script/general/parser/fill.sim b/tests/script/general/parser/fill.sim index 0fd6f2e4f8..ca113fb34b 100644 --- a/tests/script/general/parser/fill.sim +++ b/tests/script/general/parser/fill.sim @@ -980,10 +980,6 @@ if $data00 != @20-01-01 01:01:00.000@ then return -1 endi -if $data00 != @20-01-01 01:01:00.000@ then - return -1 -endi -if $data1 if $data01 != 2.000000000 then return -1 endi