From fa3fce9c9deab7741071591a4c2cf2f9a734295e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 4 Mar 2022 15:53:30 +0800 Subject: [PATCH] [td-13039] refactor. --- include/client/taos.h | 40 +++++----- source/libs/executor/inc/executorimpl.h | 10 +-- source/libs/executor/src/executorimpl.c | 102 +++++++++++++++++------- 3 files changed, 96 insertions(+), 56 deletions(-) diff --git a/include/client/taos.h b/include/client/taos.h index 8b1517c6ff..d3edd93c37 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -31,26 +31,26 @@ typedef void TAOS_SUB; typedef void **TAOS_ROW; // Data type definition -#define TSDB_DATA_TYPE_NULL 0 // 1 bytes -#define TSDB_DATA_TYPE_BOOL 1 // 1 bytes -#define TSDB_DATA_TYPE_TINYINT 2 // 1 byte -#define TSDB_DATA_TYPE_SMALLINT 3 // 2 bytes -#define TSDB_DATA_TYPE_INT 4 // 4 bytes -#define TSDB_DATA_TYPE_BIGINT 5 // 8 bytes -#define TSDB_DATA_TYPE_FLOAT 6 // 4 bytes -#define TSDB_DATA_TYPE_DOUBLE 7 // 8 bytes -#define TSDB_DATA_TYPE_BINARY 8 // string, alias for varchar -#define TSDB_DATA_TYPE_TIMESTAMP 9 // 8 bytes -#define TSDB_DATA_TYPE_NCHAR 10 // unicode string -#define TSDB_DATA_TYPE_UTINYINT 11 // 1 byte -#define TSDB_DATA_TYPE_USMALLINT 12 // 2 bytes -#define TSDB_DATA_TYPE_UINT 13 // 4 bytes -#define TSDB_DATA_TYPE_UBIGINT 14 // 8 bytes -#define TSDB_DATA_TYPE_VARCHAR 15 // string -#define TSDB_DATA_TYPE_VARBINARY 16 // binary -#define TSDB_DATA_TYPE_JSON 17 // json -#define TSDB_DATA_TYPE_DECIMAL 18 // decimal -#define TSDB_DATA_TYPE_BLOB 19 // binary +#define TSDB_DATA_TYPE_NULL 0 // 1 bytes +#define TSDB_DATA_TYPE_BOOL 1 // 1 bytes +#define TSDB_DATA_TYPE_TINYINT 2 // 1 byte +#define TSDB_DATA_TYPE_SMALLINT 3 // 2 bytes +#define TSDB_DATA_TYPE_INT 4 // 4 bytes +#define TSDB_DATA_TYPE_BIGINT 5 // 8 bytes +#define TSDB_DATA_TYPE_FLOAT 6 // 4 bytes +#define TSDB_DATA_TYPE_DOUBLE 7 // 8 bytes +#define TSDB_DATA_TYPE_BINARY 8 // string, alias for varchar +#define TSDB_DATA_TYPE_TIMESTAMP 9 // 8 bytes +#define TSDB_DATA_TYPE_NCHAR 10 // unicode string +#define TSDB_DATA_TYPE_UTINYINT 11 // 1 byte +#define TSDB_DATA_TYPE_USMALLINT 12 // 2 bytes +#define TSDB_DATA_TYPE_UINT 13 // 4 bytes +#define TSDB_DATA_TYPE_UBIGINT 14 // 8 bytes +#define TSDB_DATA_TYPE_JSON 15 // json string +#define TSDB_DATA_TYPE_VARCHAR 16 // string +#define TSDB_DATA_TYPE_VARBINARY 17 // binary +#define TSDB_DATA_TYPE_DECIMAL 18 // decimal +#define TSDB_DATA_TYPE_BLOB 19 // binary typedef enum { TSDB_OPTION_LOCALE, diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 991cd372c3..2280295c13 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -264,7 +264,8 @@ typedef struct SExecTaskInfo { uint64_t totalRows; // total number of rows STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray structure char* sql; // query sql string - jmp_buf env; // + jmp_buf env; // when error occurs, abort + int32_t bufSize; // available buffer size for all operator struct SOperatorInfo* pRoot; } SExecTaskInfo; @@ -322,6 +323,7 @@ typedef struct SOperatorInfo { SExprInfo* pExpr; STaskRuntimeEnv* pRuntimeEnv; // todo remove it SExecTaskInfo* pTaskInfo; + SOperatorCostInfo cost; struct SOperatorInfo** pDownstream; // downstram pointer list int32_t numOfDownstream; // number of downstream. The value is always ONE expect for join operator @@ -620,6 +622,8 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntim SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createOrderOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SArray* pOrderVal, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SArray* pExprInfo, SArray* pOrderVal, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo); SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream); SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SInterval* pInterval, SExecTaskInfo* pTaskInfo); @@ -655,10 +659,6 @@ SOperatorInfo* createFilterOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorI SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pdownstream, int32_t numOfDownstream, SSchema* pSchema, int32_t numOfOutput); -SOperatorInfo* createOrderOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SArray* pOrderVal, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SArray* pExprInfo, SArray* pOrderVal, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo); - -// SSDataBlock* doSLimit(void* param, bool* newgroup); // int32_t doCreateFilterInfo(SColumnInfo* pCols, int32_t numOfCols, int32_t numOfFilterCols, SSingleColumnFilterInfo** pFilterInfo, uint64_t qId); void doSetFilterColumnInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols, SSDataBlock* pBlock); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 3ebad151fd..675fd3e39f 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -208,6 +208,9 @@ static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput); static void destroySWindowOperatorInfo(void* param, int32_t numOfOutput); static void destroyStateWindowOperatorInfo(void* param, int32_t numOfOutput); static void destroyAggOperatorInfo(void* param, int32_t numOfOutput); +static void destroyExchangeOperatorInfo(void* param, int32_t numOfOutput); +static void destroyConditionOperatorInfo(void* param, int32_t numOfOutput); + static void destroyOperatorInfo(SOperatorInfo* pOperator); static void doSetOperatorCompleted(SOperatorInfo* pOperator) { @@ -217,6 +220,10 @@ static void doSetOperatorCompleted(SOperatorInfo* pOperator) { } } +static void dummyOperatorOpenFn() { + return; +} + static int32_t doCopyToSDataBlock(SDiskbasedBuf *pBuf, SGroupResInfo* pGroupResInfo, int32_t orderType, SSDataBlock* pBlock, int32_t rowCapacity); static int32_t getGroupbyColumnIndex(SGroupbyExpr *pGroupbyExpr, SSDataBlock* pDataBlock); @@ -5236,28 +5243,10 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) { static SSDataBlock* createResultDataBlock(const SArray* pExprInfo); -SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray* pExprInfo, SExecTaskInfo* pTaskInfo) { - SExchangeInfo* pInfo = calloc(1, sizeof(SExchangeInfo)); - SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); - - if (pInfo == NULL || pOperator == NULL) { - tfree(pInfo); - tfree(pOperator); - terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; - return NULL; - } - - size_t numOfSources = taosArrayGetSize(pSources); - - pInfo->pSources = taosArrayDup(pSources); +static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo) { pInfo->pSourceDataInfo = taosArrayInit(numOfSources, sizeof(SSourceDataInfo)); - if (pInfo->pSourceDataInfo == NULL || pInfo->pSources == NULL) { - tfree(pInfo); - tfree(pOperator); - taosArrayDestroy(pInfo->pSources); - taosArrayDestroy(pInfo->pSourceDataInfo); - terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; - return NULL; + if (pInfo->pSourceDataInfo == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; } for(int32_t i = 0; i < numOfSources; ++i) { @@ -5266,13 +5255,41 @@ SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray* dataInfo.pEx = pInfo; dataInfo.index = i; - taosArrayPush(pInfo->pSourceDataInfo, &dataInfo); + void* ret = taosArrayPush(pInfo->pSourceDataInfo, &dataInfo); + if (ret == NULL) { + taosArrayDestroy(pInfo->pSourceDataInfo); + return TSDB_CODE_OUT_OF_MEMORY; + } } - size_t size = taosArrayGetSize(pExprInfo); - pInfo->pResult = createResultDataBlock(pExprInfo); - pInfo->seqLoadData = true; + return TSDB_CODE_SUCCESS; +} +SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray* pExprInfo, SExecTaskInfo* pTaskInfo) { + SExchangeInfo* pInfo = calloc(1, sizeof(SExchangeInfo)); + SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); + + if (pInfo == NULL || pOperator == NULL) { + goto _error; + } + + pInfo->pSources = taosArrayDup(pSources); + if (pInfo->pSources == NULL) { + goto _error; + } + + size_t numOfSources = taosArrayGetSize(pSources); + int32_t code = initDataSource(numOfSources, pInfo); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + + pInfo->pResult = createResultDataBlock(pExprInfo); + if (pInfo->pResult == NULL) { + goto _error; + } + + pInfo->seqLoadData = true; // sequentially load data from the source node tsem_init(&pInfo->ready, 0, 0); pOperator->name = "ExchangeOperator"; @@ -5280,9 +5297,11 @@ SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray* pOperator->blockingOptr = false; pOperator->status = OP_IN_EXECUTING; pOperator->info = pInfo; - pOperator->numOfOutput = size; - pOperator->nextDataFn = doLoadRemoteData; + pOperator->numOfOutput = taosArrayGetSize(pExprInfo); pOperator->pTaskInfo = pTaskInfo; + pOperator->openFn = NULL; // assign a dummy function. + pOperator->nextDataFn = doLoadRemoteData; + pOperator->closeFn = destroyExchangeOperatorInfo; #if 1 { // todo refactor @@ -5308,6 +5327,16 @@ SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray* #endif return pOperator; + + _error: + if (pInfo != NULL) { + destroyExchangeOperatorInfo(pInfo, 0); + } + + tfree(pInfo); + tfree(pOperator); + terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; + return NULL; } SSDataBlock* createResultDataBlock(const SArray* pExprInfo) { @@ -7115,17 +7144,17 @@ static void destroyGroupbyOperatorInfo(void* param, int32_t numOfOutput) { tfree(pInfo->prevData); } -static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput) { +void destroyProjectOperatorInfo(void* param, int32_t numOfOutput) { SProjectOperatorInfo* pInfo = (SProjectOperatorInfo*) param; doDestroyBasicInfo(&pInfo->binfo, numOfOutput); } -static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) { +void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) { STagScanInfo* pInfo = (STagScanInfo*) param; pInfo->pRes = blockDataDestroy(pInfo->pRes); } -static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) { +void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) { SOrderOperatorInfo* pInfo = (SOrderOperatorInfo*) param; pInfo->pDataBlock = blockDataDestroy(pInfo->pDataBlock); @@ -7145,6 +7174,17 @@ static void destroyDistinctOperatorInfo(void* param, int32_t numOfOutput) { pInfo->pRes = blockDataDestroy(pInfo->pRes); } +void destroyExchangeOperatorInfo(void* param, int32_t numOfOutput) { + SExchangeInfo* pExInfo = (SExchangeInfo*) param; + taosArrayDestroy(pExInfo->pSources); + taosArrayDestroy(pExInfo->pSourceDataInfo); + if (pExInfo->pResult != NULL) { + blockDataDestroy(pExInfo->pResult); + } + + tsem_destroy(&pExInfo->ready); +} + SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo) { SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo)); @@ -7268,7 +7308,7 @@ SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorIn pOperator->nextDataFn = doLimit; pOperator->info = pInfo; pOperator->pRuntimeEnv = pRuntimeEnv; - int32_t code = appendDownstream(pOperator, &downstream, 1); + int32_t code = appendDownstream(pOperator, &downstream, 1); return pOperator; }