From 199a3610f774ec04a3dec10b9fdf9912dd650ae7 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 25 Apr 2022 09:44:39 +0800 Subject: [PATCH] fix: fix some syntax errors. --- source/dnode/vnode/src/tq/tqPush.c | 14 -- source/libs/executor/inc/executorimpl.h | 21 ++- source/libs/executor/src/executorimpl.c | 227 ------------------------ source/libs/executor/src/scanoperator.c | 164 +++++++++++++++++ source/libs/function/inc/builtinsimpl.h | 2 +- source/libs/function/src/builtinsimpl.c | 35 +--- 6 files changed, 178 insertions(+), 285 deletions(-) delete mode 100644 source/dnode/vnode/src/tq/tqPush.c diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c deleted file mode 100644 index f2f48bbc8a..0000000000 --- a/source/dnode/vnode/src/tq/tqPush.c +++ /dev/null @@ -1,14 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index c6d209e706..db0dd1ffbb 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -347,10 +347,11 @@ typedef struct STableScanInfo { } STableScanInfo; typedef struct STagScanInfo { - SColumnInfo* pCols; - SSDataBlock* pRes; + SColumnInfo *pCols; + SSDataBlock *pRes; int32_t totalTables; int32_t curPos; + void *pReader; } STagScanInfo; typedef struct SStreamBlockScanInfo { @@ -376,13 +377,11 @@ typedef struct SSysTableScanInfo { SEpSet epSet; tsem_t ready; - int32_t accountId; - bool showRewrite; - SNode* pCondition; // db_name filter condition, to discard data that are not in current database - void* pCur; // cursor for iterate the local table meta store. - SArray* scanCols; // SArray scan column id list - -// int32_t type; // show type, TODO remove it + int32_t accountId; + bool showRewrite; + SNode* pCondition; // db_name filter condition, to discard data that are not in current database + void* pCur; // cursor for iterate the local table meta store. + SArray* scanCols; // SArray scan column id list SName name; SSDataBlock* pRes; int32_t capacity; @@ -628,7 +627,7 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfCols, int32_t dataLoadFlag, int32_t repeatTime, +SOperatorInfo* createTableScanOperatorInfo(void* pReaderHandle, int32_t order, int32_t numOfCols, int32_t dataLoadFlag, int32_t repeatTime, int32_t reverseTime, SArray* pColMatchInfo, SSDataBlock* pResBlock, SNode* pCondition, SInterval* pInterval, double ratio, SExecTaskInfo* pTaskInfo); SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SExprInfo* pScalarExprInfo, @@ -668,12 +667,12 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo* SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo); SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SNode* pOnCondition, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createTagScanOperatorInfo(void* pReaderHandle, SExprInfo* pExpr, int32_t numOfOutput, SExecTaskInfo* pTaskInfo); #if 0 SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv); SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput); -SOperatorInfo* createTagScanOperatorInfo(SReaderHandle* pReaderHandle, SExprInfo* pExpr, int32_t numOfOutput); #endif void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx, diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 32d24e1975..3e58326cc9 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -235,7 +235,6 @@ static int32_t doCopyToSDataBlock(SSDataBlock* pBlock, SExprInfo* pExprInfo, SDi static void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size); static void setResultBufSize(STaskAttr* pQueryAttr, SResultInfo* pResultInfo); -static void setCtxTagForJoin(STaskRuntimeEnv* pRuntimeEnv, SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, void* pTable); static void doSetTableGroupOutputBuf(SAggOperatorInfo* pAggInfo, int32_t numOfOutput, uint64_t groupId, SExecTaskInfo* pTaskInfo); @@ -298,26 +297,6 @@ SSDataBlock* createResDataBlock(SDataBlockDescNode* pNode) { return pBlock; } -static bool isSelectivityWithTagsQuery(SqlFunctionCtx* pCtx, int32_t numOfOutput) { - return true; - // bool hasTags = false; - // int32_t numOfSelectivity = 0; - // - // for (int32_t i = 0; i < numOfOutput; ++i) { - // int32_t functId = pCtx[i].functionId; - // if (functId == FUNCTION_TAG_DUMMY || functId == FUNCTION_TS_DUMMY) { - // hasTags = true; - // continue; - // } - // - // if ((aAggs[functId].status & FUNCSTATE_SELECTIVITY) != 0) { - // numOfSelectivity++; - // } - // } - // - // return (numOfSelectivity > 0 && hasTags); -} - static bool hasNull(SColumn* pColumn, SColumnDataAgg* pStatis) { if (TSDB_COL_IS_TAG(pColumn->flag) || TSDB_COL_IS_UD_COL(pColumn->flag) || pColumn->colId == PRIMARYKEY_TIMESTAMP_COL_ID) { @@ -1858,10 +1837,6 @@ void setBlockStatisInfo(SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, SSDataBlock* // set the output buffer for the selectivity + tag query static int32_t setCtxTagColumnInfo(SqlFunctionCtx* pCtx, int32_t numOfOutput) { - if (!isSelectivityWithTagsQuery(pCtx, numOfOutput)) { - return TSDB_CODE_SUCCESS; - } - int32_t num = 0; int16_t tagLen = 0; @@ -2111,25 +2086,6 @@ static int32_t updateBlockLoadStatus(STaskAttr* pQuery, int32_t status) { return status; } -static void doUpdateLastKey(STaskAttr* pQueryAttr) { - STimeWindow* win = &pQueryAttr->window; - - size_t num = taosArrayGetSize(pQueryAttr->tableGroupInfo.pGroupList); - for (int32_t i = 0; i < num; ++i) { - SArray* p1 = taosArrayGetP(pQueryAttr->tableGroupInfo.pGroupList, i); - - size_t len = taosArrayGetSize(p1); - for (int32_t j = 0; j < len; ++j) { - // STableKeyInfo* pInfo = taosArrayGet(p1, j); - // - // // update the new lastkey if it is equalled to the value of the old skey - // if (pInfo->lastKey == win->ekey) { - // pInfo->lastKey = win->skey; - // } - } - } -} - // static void updateDataCheckOrder(SQInfo *pQInfo, SQueryTableReq* pQueryMsg, bool stableQuery) { // STaskAttr* pQueryAttr = pQInfo->runtimeEnv.pQueryAttr; // @@ -3075,33 +3031,6 @@ void setExecutionContext(int32_t numOfOutput, uint64_t groupId, SExecTaskInfo* p pAggInfo->groupId = groupId; } -void setCtxTagForJoin(STaskRuntimeEnv* pRuntimeEnv, SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, void* pTable) { - STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; - - SExprBasicInfo* pExpr = &pExprInfo->base; - // if (pQueryAttr->stableQuery && (pRuntimeEnv->pTsBuf != NULL) && - // (pExpr->functionId == FUNCTION_TS || pExpr->functionId == FUNCTION_PRJ) && - // (pExpr->colInfo.colIndex == PRIMARYKEY_TIMESTAMP_COL_ID)) { - // assert(pExpr->numOfParams == 1); - // - // int16_t tagColId = (int16_t)pExprInfo->base.param[0].i; - // SColumnInfo* pColInfo = doGetTagColumnInfoById(pQueryAttr->tagColList, pQueryAttr->numOfTags, tagColId); - // - // doSetTagValueInParam(pTable, tagColId, &pCtx->tag, pColInfo->type, pColInfo->bytes); - // - // int16_t tagType = pCtx[0].tag.nType; - // if (tagType == TSDB_DATA_TYPE_BINARY || tagType == TSDB_DATA_TYPE_NCHAR) { - // //qDebug("QInfo:0x%"PRIx64" set tag value for join comparison, colId:%" PRId64 ", val:%s", - // GET_TASKID(pRuntimeEnv), - //// pExprInfo->base.param[0].i, pCtx[0].tag.pz); - // } else { - // //qDebug("QInfo:0x%"PRIx64" set tag value for join comparison, colId:%" PRId64 ", val:%" PRId64, - // GET_TASKID(pRuntimeEnv), - //// pExprInfo->base.param[0].i, pCtx[0].tag.i); - // } - // } -} - /* * There are two cases to handle: * @@ -5863,11 +5792,6 @@ static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput) { doDestroyBasicInfo(&pInfo->binfo, numOfOutput); } -static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) { - STagScanInfo* pInfo = (STagScanInfo*)param; - pInfo->pRes = blockDataDestroy(pInfo->pRes); -} - static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) { SSortOperatorInfo* pInfo = (SSortOperatorInfo*)param; pInfo->pDataBlock = blockDataDestroy(pInfo->pDataBlock); @@ -6221,157 +6145,6 @@ _error: return NULL; } -static SSDataBlock* doTagScan(SOperatorInfo* pOperator, bool* newgroup) { -#if 0 - SOperatorInfo* pOperator = (SOperatorInfo*) param; - if (pOperator->status == OP_EXEC_DONE) { - return NULL; - } - - int32_t maxNumOfTables = (int32_t)pResultInfo->capacity; - - STagScanInfo *pInfo = pOperator->info; - SSDataBlock *pRes = pInfo->pRes; - *newgroup = false; - - int32_t count = 0; - SArray* pa = GET_TABLEGROUP(pRuntimeEnv, 0); - - int32_t functionId = getExprFunctionId(&pOperator->pExpr[0]); - if (functionId == FUNCTION_TID_TAG) { // return the tags & table Id - assert(pQueryAttr->numOfOutput == 1); - - SExprInfo* pExprInfo = &pOperator->pExpr[0]; - int32_t rsize = pExprInfo->base.resSchema.bytes; - - count = 0; - - int16_t bytes = pExprInfo->base.resSchema.bytes; - int16_t type = pExprInfo->base.resSchema.type; - - for(int32_t i = 0; i < pQueryAttr->numOfTags; ++i) { - if (pQueryAttr->tagColList[i].colId == pExprInfo->base.pColumns->info.colId) { - bytes = pQueryAttr->tagColList[i].bytes; - type = pQueryAttr->tagColList[i].type; - break; - } - } - - SColumnInfoData* pColInfo = taosArrayGet(pRes->pDataBlock, 0); - - while(pInfo->curPos < pInfo->totalTables && count < maxNumOfTables) { - int32_t i = pInfo->curPos++; - STableQueryInfo *item = taosArrayGetP(pa, i); - - char *output = pColInfo->pData + count * rsize; - varDataSetLen(output, rsize - VARSTR_HEADER_SIZE); - - output = varDataVal(output); - STableId* id = TSDB_TABLEID(item->pTable); - - *(int16_t *)output = 0; - output += sizeof(int16_t); - - *(int64_t *)output = id->uid; // memory align problem, todo serialize - output += sizeof(id->uid); - - *(int32_t *)output = id->tid; - output += sizeof(id->tid); - - *(int32_t *)output = pQueryAttr->vgId; - output += sizeof(pQueryAttr->vgId); - - char* data = NULL; - if (pExprInfo->base.pColumns->info.colId == TSDB_TBNAME_COLUMN_INDEX) { - data = tsdbGetTableName(item->pTable); - } else { - data = tsdbGetTableTagVal(item->pTable, pExprInfo->base.pColumns->info.colId, type, bytes); - } - - doSetTagValueToResultBuf(output, data, type, bytes); - count += 1; - } - - //qDebug("QInfo:0x%"PRIx64" create (tableId, tag) info completed, rows:%d", GET_TASKID(pRuntimeEnv), count); - } else if (functionId == FUNCTION_COUNT) {// handle the "count(tbname)" query - SColumnInfoData* pColInfo = taosArrayGet(pRes->pDataBlock, 0); - *(int64_t*)pColInfo->pData = pInfo->totalTables; - count = 1; - - pOperator->status = OP_EXEC_DONE; - //qDebug("QInfo:0x%"PRIx64" create count(tbname) query, res:%d rows:1", GET_TASKID(pRuntimeEnv), count); - } else { // return only the tags|table name etc. - SExprInfo* pExprInfo = &pOperator->pExpr[0]; // todo use the column list instead of exprinfo - - count = 0; - while(pInfo->curPos < pInfo->totalTables && count < maxNumOfTables) { - int32_t i = pInfo->curPos++; - - STableQueryInfo* item = taosArrayGetP(pa, i); - - char *data = NULL, *dst = NULL; - int16_t type = 0, bytes = 0; - for(int32_t j = 0; j < pOperator->numOfOutput; ++j) { - // not assign value in case of user defined constant output column - if (TSDB_COL_IS_UD_COL(pExprInfo[j].base.pColumns->flag)) { - continue; - } - - SColumnInfoData* pColInfo = taosArrayGet(pRes->pDataBlock, j); - type = pExprInfo[j].base.resSchema.type; - bytes = pExprInfo[j].base.resSchema.bytes; - - if (pExprInfo[j].base.pColumns->info.colId == TSDB_TBNAME_COLUMN_INDEX) { - data = tsdbGetTableName(item->pTable); - } else { - data = tsdbGetTableTagVal(item->pTable, pExprInfo[j].base.pColumns->info.colId, type, bytes); - } - - dst = pColInfo->pData + count * pExprInfo[j].base.resSchema.bytes; - doSetTagValueToResultBuf(dst, data, type, bytes); - } - - count += 1; - } - - if (pInfo->curPos >= pInfo->totalTables) { - pOperator->status = OP_EXEC_DONE; - } - - //qDebug("QInfo:0x%"PRIx64" create tag values results completed, rows:%d", GET_TASKID(pRuntimeEnv), count); - } - - if (pOperator->status == OP_EXEC_DONE) { - setTaskStatus(pOperator->pRuntimeEnv, TASK_COMPLETED); - } - - pRes->info.rows = count; - return (pRes->info.rows == 0)? NULL:pInfo->pRes; - -#endif - return TSDB_CODE_SUCCESS; -} - -SOperatorInfo* createTagScanOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput) { - STagScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STagScanInfo)); - size_t numOfGroup = GET_NUM_OF_TABLEGROUP(pRuntimeEnv); - assert(numOfGroup == 0 || numOfGroup == 1); - - pInfo->curPos = 0; - - SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); - pOperator->name = "SeqTableTagScan"; - pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN; - pOperator->blockingOptr = false; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; - pOperator->getNextFn = doTagScan; - pOperator->pExpr = pExpr; - pOperator->numOfOutput = numOfOutput; - pOperator->closeFn = destroyTagScanOperatorInfo; - - return pOperator; -} static int32_t getColumnIndexInSource(SQueriedTableInfo* pTableInfo, SExprBasicInfo* pExpr, SColumnInfo* pTagCols) { int32_t j = 0; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 3f3f6a6fb3..a70e402871 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -980,3 +980,167 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataB return pOperator; } + +static SSDataBlock* doTagScan(SOperatorInfo* pOperator, bool* newgroup) { +#if 0 + SOperatorInfo* pOperator = (SOperatorInfo*) param; + if (pOperator->status == OP_EXEC_DONE) { + return NULL; + } + + int32_t maxNumOfTables = (int32_t)pResultInfo->capacity; + + STagScanInfo *pInfo = pOperator->info; + SSDataBlock *pRes = pInfo->pRes; + *newgroup = false; + + int32_t count = 0; + SArray* pa = GET_TABLEGROUP(pRuntimeEnv, 0); + + int32_t functionId = getExprFunctionId(&pOperator->pExpr[0]); + if (functionId == FUNCTION_TID_TAG) { // return the tags & table Id + assert(pQueryAttr->numOfOutput == 1); + + SExprInfo* pExprInfo = &pOperator->pExpr[0]; + int32_t rsize = pExprInfo->base.resSchema.bytes; + + count = 0; + + int16_t bytes = pExprInfo->base.resSchema.bytes; + int16_t type = pExprInfo->base.resSchema.type; + + for(int32_t i = 0; i < pQueryAttr->numOfTags; ++i) { + if (pQueryAttr->tagColList[i].colId == pExprInfo->base.pColumns->info.colId) { + bytes = pQueryAttr->tagColList[i].bytes; + type = pQueryAttr->tagColList[i].type; + break; + } + } + + SColumnInfoData* pColInfo = taosArrayGet(pRes->pDataBlock, 0); + + while(pInfo->curPos < pInfo->totalTables && count < maxNumOfTables) { + int32_t i = pInfo->curPos++; + STableQueryInfo *item = taosArrayGetP(pa, i); + + char *output = pColInfo->pData + count * rsize; + varDataSetLen(output, rsize - VARSTR_HEADER_SIZE); + + output = varDataVal(output); + STableId* id = TSDB_TABLEID(item->pTable); + + *(int16_t *)output = 0; + output += sizeof(int16_t); + + *(int64_t *)output = id->uid; // memory align problem, todo serialize + output += sizeof(id->uid); + + *(int32_t *)output = id->tid; + output += sizeof(id->tid); + + *(int32_t *)output = pQueryAttr->vgId; + output += sizeof(pQueryAttr->vgId); + + char* data = NULL; + if (pExprInfo->base.pColumns->info.colId == TSDB_TBNAME_COLUMN_INDEX) { + data = tsdbGetTableName(item->pTable); + } else { + data = tsdbGetTableTagVal(item->pTable, pExprInfo->base.pColumns->info.colId, type, bytes); + } + + doSetTagValueToResultBuf(output, data, type, bytes); + count += 1; + } + + //qDebug("QInfo:0x%"PRIx64" create (tableId, tag) info completed, rows:%d", GET_TASKID(pRuntimeEnv), count); + } else if (functionId == FUNCTION_COUNT) {// handle the "count(tbname)" query + SColumnInfoData* pColInfo = taosArrayGet(pRes->pDataBlock, 0); + *(int64_t*)pColInfo->pData = pInfo->totalTables; + count = 1; + + pOperator->status = OP_EXEC_DONE; + //qDebug("QInfo:0x%"PRIx64" create count(tbname) query, res:%d rows:1", GET_TASKID(pRuntimeEnv), count); + } else { // return only the tags|table name etc. + SExprInfo* pExprInfo = &pOperator->pExpr[0]; // todo use the column list instead of exprinfo + + count = 0; + while(pInfo->curPos < pInfo->totalTables && count < maxNumOfTables) { + int32_t i = pInfo->curPos++; + + STableQueryInfo* item = taosArrayGetP(pa, i); + + char *data = NULL, *dst = NULL; + int16_t type = 0, bytes = 0; + for(int32_t j = 0; j < pOperator->numOfOutput; ++j) { + // not assign value in case of user defined constant output column + if (TSDB_COL_IS_UD_COL(pExprInfo[j].base.pColumns->flag)) { + continue; + } + + SColumnInfoData* pColInfo = taosArrayGet(pRes->pDataBlock, j); + type = pExprInfo[j].base.resSchema.type; + bytes = pExprInfo[j].base.resSchema.bytes; + + if (pExprInfo[j].base.pColumns->info.colId == TSDB_TBNAME_COLUMN_INDEX) { + data = tsdbGetTableName(item->pTable); + } else { + data = tsdbGetTableTagVal(item->pTable, pExprInfo[j].base.pColumns->info.colId, type, bytes); + } + + dst = pColInfo->pData + count * pExprInfo[j].base.resSchema.bytes; + doSetTagValueToResultBuf(dst, data, type, bytes); + } + + count += 1; + } + + if (pInfo->curPos >= pInfo->totalTables) { + pOperator->status = OP_EXEC_DONE; + } + + //qDebug("QInfo:0x%"PRIx64" create tag values results completed, rows:%d", GET_TASKID(pRuntimeEnv), count); + } + + if (pOperator->status == OP_EXEC_DONE) { + setTaskStatus(pOperator->pRuntimeEnv, TASK_COMPLETED); + } + + pRes->info.rows = count; + return (pRes->info.rows == 0)? NULL:pInfo->pRes; + +#endif + return TSDB_CODE_SUCCESS; +} + +static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) { + STagScanInfo* pInfo = (STagScanInfo*)param; + pInfo->pRes = blockDataDestroy(pInfo->pRes); +} + +SOperatorInfo* createTagScanOperatorInfo(void* pReaderHandle, SExprInfo* pExpr, int32_t numOfOutput, SExecTaskInfo* pTaskInfo) { + STagScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STagScanInfo)); + SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); + if (pInfo == NULL || pOperator == NULL) { + goto _error; + } + + pInfo->pReader = pReaderHandle; + pInfo->curPos = 0; + pOperator->name = "TagScanOperator"; + pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN; + pOperator->blockingOptr = false; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; + pOperator->getNextFn = doTagScan; + pOperator->pExpr = pExpr; + pOperator->numOfOutput = numOfOutput; + pOperator->pTaskInfo = pTaskInfo; + pOperator->closeFn = destroyTagScanOperatorInfo; + + return pOperator; + _error: + taosMemoryFree(pInfo); + taosMemoryFree(pOperator); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; +} diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index d424ed41bc..0ad3730b5a 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -43,7 +43,7 @@ int32_t maxFunction(SqlFunctionCtx *pCtx); bool getAvgFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool avgFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); int32_t avgFunction(SqlFunctionCtx* pCtx); -int32_t avgFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t slotId); +int32_t avgFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); bool getStddevFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool stddevFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 129bebe174..0b9765ef15 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -44,8 +44,6 @@ typedef struct STopBotResItem { } STopBotResItem; typedef struct STopBotRes { - int32_t pageId; -// int32_t num; STopBotResItem *pItems; } STopBotRes; @@ -92,18 +90,6 @@ typedef struct SDiffInfo { } \ } while (0); -#define DO_UPDATE_SUBSID_RES(ctx, ts) \ - do { \ - for (int32_t _i = 0; _i < (ctx)->subsidiaryRes.numOfCols; ++_i) { \ - SqlFunctionCtx *__ctx = (ctx)->subsidiaryRes.pCtx[_i]; \ - if (__ctx->functionId == FUNCTION_TS_DUMMY) { \ - __ctx->tag.i = (ts); \ - __ctx->tag.nType = TSDB_DATA_TYPE_BIGINT; \ - } \ - __ctx->fpSet.process(__ctx); \ - } \ - } while (0) - #define UPDATE_DATA(ctx, left, right, num, sign, _ts) \ do { \ if (((left) < (right)) ^ (sign)) { \ @@ -407,7 +393,7 @@ int32_t avgFunction(SqlFunctionCtx* pCtx) { return TSDB_CODE_SUCCESS; } -int32_t avgFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t slotId) { +int32_t avgFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { SInputColumnInfoData* pInput = &pCtx->input; int32_t type = pInput->pData[0]->info.type; SAvgRes* pAvgRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); @@ -417,7 +403,7 @@ int32_t avgFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t slotId) { pAvgRes->result = pAvgRes->sum.dsum / ((double) pAvgRes->count); } - return functionFinalize(pCtx, pBlock, slotId); + return functionFinalize(pCtx, pBlock); } EFuncDataRequired statisDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow){ @@ -841,7 +827,7 @@ int32_t stddevFunction(SqlFunctionCtx* pCtx) { return TSDB_CODE_SUCCESS; } -int32_t stddevFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t slotId) { +int32_t stddevFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { SInputColumnInfoData* pInput = &pCtx->input; int32_t type = pInput->pData[0]->info.type; SStddevRes* pStddevRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); @@ -1383,21 +1369,6 @@ int32_t diffFunction(SqlFunctionCtx *pCtx) { } } -typedef struct STopBotResItem { - SVariant v; - uint64_t uid; // it is a table uid, used to extract tag data during building of the final result for the tag data - struct { - int32_t pageId; - int32_t offset; - } tuplePos; // tuple data of this chosen row -} STopBotResItem; - -typedef struct STopBotRes { -// int32_t pageId; -// int32_t num; - STopBotResItem *pItems; -} STopBotRes; - bool getTopBotFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { SValueNode* pkNode = (SValueNode*) nodesListGetNode(pFunc->pParameterList, 1); pEnv->calcMemSize = sizeof(STopBotRes) + pkNode->datum.i * sizeof(STopBotResItem);