From fa3fce9c9deab7741071591a4c2cf2f9a734295e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 4 Mar 2022 15:53:30 +0800 Subject: [PATCH 1/5] [td-13039] refactor. --- include/client/taos.h | 40 +++++----- source/libs/executor/inc/executorimpl.h | 10 +-- source/libs/executor/src/executorimpl.c | 102 +++++++++++++++++------- 3 files changed, 96 insertions(+), 56 deletions(-) diff --git a/include/client/taos.h b/include/client/taos.h index 8b1517c6ff..d3edd93c37 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -31,26 +31,26 @@ typedef void TAOS_SUB; typedef void **TAOS_ROW; // Data type definition -#define TSDB_DATA_TYPE_NULL 0 // 1 bytes -#define TSDB_DATA_TYPE_BOOL 1 // 1 bytes -#define TSDB_DATA_TYPE_TINYINT 2 // 1 byte -#define TSDB_DATA_TYPE_SMALLINT 3 // 2 bytes -#define TSDB_DATA_TYPE_INT 4 // 4 bytes -#define TSDB_DATA_TYPE_BIGINT 5 // 8 bytes -#define TSDB_DATA_TYPE_FLOAT 6 // 4 bytes -#define TSDB_DATA_TYPE_DOUBLE 7 // 8 bytes -#define TSDB_DATA_TYPE_BINARY 8 // string, alias for varchar -#define TSDB_DATA_TYPE_TIMESTAMP 9 // 8 bytes -#define TSDB_DATA_TYPE_NCHAR 10 // unicode string -#define TSDB_DATA_TYPE_UTINYINT 11 // 1 byte -#define TSDB_DATA_TYPE_USMALLINT 12 // 2 bytes -#define TSDB_DATA_TYPE_UINT 13 // 4 bytes -#define TSDB_DATA_TYPE_UBIGINT 14 // 8 bytes -#define TSDB_DATA_TYPE_VARCHAR 15 // string -#define TSDB_DATA_TYPE_VARBINARY 16 // binary -#define TSDB_DATA_TYPE_JSON 17 // json -#define TSDB_DATA_TYPE_DECIMAL 18 // decimal -#define TSDB_DATA_TYPE_BLOB 19 // binary +#define TSDB_DATA_TYPE_NULL 0 // 1 bytes +#define TSDB_DATA_TYPE_BOOL 1 // 1 bytes +#define TSDB_DATA_TYPE_TINYINT 2 // 1 byte +#define TSDB_DATA_TYPE_SMALLINT 3 // 2 bytes +#define TSDB_DATA_TYPE_INT 4 // 4 bytes +#define TSDB_DATA_TYPE_BIGINT 5 // 8 bytes +#define TSDB_DATA_TYPE_FLOAT 6 // 4 bytes +#define TSDB_DATA_TYPE_DOUBLE 7 // 8 bytes +#define TSDB_DATA_TYPE_BINARY 8 // string, alias for varchar +#define TSDB_DATA_TYPE_TIMESTAMP 9 // 8 bytes +#define TSDB_DATA_TYPE_NCHAR 10 // unicode string +#define TSDB_DATA_TYPE_UTINYINT 11 // 1 byte +#define TSDB_DATA_TYPE_USMALLINT 12 // 2 bytes +#define TSDB_DATA_TYPE_UINT 13 // 4 bytes +#define TSDB_DATA_TYPE_UBIGINT 14 // 8 bytes +#define TSDB_DATA_TYPE_JSON 15 // json string +#define TSDB_DATA_TYPE_VARCHAR 16 // string +#define TSDB_DATA_TYPE_VARBINARY 17 // binary +#define TSDB_DATA_TYPE_DECIMAL 18 // decimal +#define TSDB_DATA_TYPE_BLOB 19 // binary typedef enum { TSDB_OPTION_LOCALE, diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 991cd372c3..2280295c13 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -264,7 +264,8 @@ typedef struct SExecTaskInfo { uint64_t totalRows; // total number of rows STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray structure char* sql; // query sql string - jmp_buf env; // + jmp_buf env; // when error occurs, abort + int32_t bufSize; // available buffer size for all operator struct SOperatorInfo* pRoot; } SExecTaskInfo; @@ -322,6 +323,7 @@ typedef struct SOperatorInfo { SExprInfo* pExpr; STaskRuntimeEnv* pRuntimeEnv; // todo remove it SExecTaskInfo* pTaskInfo; + SOperatorCostInfo cost; struct SOperatorInfo** pDownstream; // downstram pointer list int32_t numOfDownstream; // number of downstream. The value is always ONE expect for join operator @@ -620,6 +622,8 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntim SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createOrderOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SArray* pOrderVal, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SArray* pExprInfo, SArray* pOrderVal, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo); SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream); SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SInterval* pInterval, SExecTaskInfo* pTaskInfo); @@ -655,10 +659,6 @@ SOperatorInfo* createFilterOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorI SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pdownstream, int32_t numOfDownstream, SSchema* pSchema, int32_t numOfOutput); -SOperatorInfo* createOrderOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SArray* pOrderVal, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SArray* pExprInfo, SArray* pOrderVal, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo); - -// SSDataBlock* doSLimit(void* param, bool* newgroup); // int32_t doCreateFilterInfo(SColumnInfo* pCols, int32_t numOfCols, int32_t numOfFilterCols, SSingleColumnFilterInfo** pFilterInfo, uint64_t qId); void doSetFilterColumnInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols, SSDataBlock* pBlock); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 3ebad151fd..675fd3e39f 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -208,6 +208,9 @@ static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput); static void destroySWindowOperatorInfo(void* param, int32_t numOfOutput); static void destroyStateWindowOperatorInfo(void* param, int32_t numOfOutput); static void destroyAggOperatorInfo(void* param, int32_t numOfOutput); +static void destroyExchangeOperatorInfo(void* param, int32_t numOfOutput); +static void destroyConditionOperatorInfo(void* param, int32_t numOfOutput); + static void destroyOperatorInfo(SOperatorInfo* pOperator); static void doSetOperatorCompleted(SOperatorInfo* pOperator) { @@ -217,6 +220,10 @@ static void doSetOperatorCompleted(SOperatorInfo* pOperator) { } } +static void dummyOperatorOpenFn() { + return; +} + static int32_t doCopyToSDataBlock(SDiskbasedBuf *pBuf, SGroupResInfo* pGroupResInfo, int32_t orderType, SSDataBlock* pBlock, int32_t rowCapacity); static int32_t getGroupbyColumnIndex(SGroupbyExpr *pGroupbyExpr, SSDataBlock* pDataBlock); @@ -5236,28 +5243,10 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) { static SSDataBlock* createResultDataBlock(const SArray* pExprInfo); -SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray* pExprInfo, SExecTaskInfo* pTaskInfo) { - SExchangeInfo* pInfo = calloc(1, sizeof(SExchangeInfo)); - SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); - - if (pInfo == NULL || pOperator == NULL) { - tfree(pInfo); - tfree(pOperator); - terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; - return NULL; - } - - size_t numOfSources = taosArrayGetSize(pSources); - - pInfo->pSources = taosArrayDup(pSources); +static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo) { pInfo->pSourceDataInfo = taosArrayInit(numOfSources, sizeof(SSourceDataInfo)); - if (pInfo->pSourceDataInfo == NULL || pInfo->pSources == NULL) { - tfree(pInfo); - tfree(pOperator); - taosArrayDestroy(pInfo->pSources); - taosArrayDestroy(pInfo->pSourceDataInfo); - terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; - return NULL; + if (pInfo->pSourceDataInfo == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; } for(int32_t i = 0; i < numOfSources; ++i) { @@ -5266,13 +5255,41 @@ SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray* dataInfo.pEx = pInfo; dataInfo.index = i; - taosArrayPush(pInfo->pSourceDataInfo, &dataInfo); + void* ret = taosArrayPush(pInfo->pSourceDataInfo, &dataInfo); + if (ret == NULL) { + taosArrayDestroy(pInfo->pSourceDataInfo); + return TSDB_CODE_OUT_OF_MEMORY; + } } - size_t size = taosArrayGetSize(pExprInfo); - pInfo->pResult = createResultDataBlock(pExprInfo); - pInfo->seqLoadData = true; + return TSDB_CODE_SUCCESS; +} +SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray* pExprInfo, SExecTaskInfo* pTaskInfo) { + SExchangeInfo* pInfo = calloc(1, sizeof(SExchangeInfo)); + SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); + + if (pInfo == NULL || pOperator == NULL) { + goto _error; + } + + pInfo->pSources = taosArrayDup(pSources); + if (pInfo->pSources == NULL) { + goto _error; + } + + size_t numOfSources = taosArrayGetSize(pSources); + int32_t code = initDataSource(numOfSources, pInfo); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + + pInfo->pResult = createResultDataBlock(pExprInfo); + if (pInfo->pResult == NULL) { + goto _error; + } + + pInfo->seqLoadData = true; // sequentially load data from the source node tsem_init(&pInfo->ready, 0, 0); pOperator->name = "ExchangeOperator"; @@ -5280,9 +5297,11 @@ SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray* pOperator->blockingOptr = false; pOperator->status = OP_IN_EXECUTING; pOperator->info = pInfo; - pOperator->numOfOutput = size; - pOperator->nextDataFn = doLoadRemoteData; + pOperator->numOfOutput = taosArrayGetSize(pExprInfo); pOperator->pTaskInfo = pTaskInfo; + pOperator->openFn = NULL; // assign a dummy function. + pOperator->nextDataFn = doLoadRemoteData; + pOperator->closeFn = destroyExchangeOperatorInfo; #if 1 { // todo refactor @@ -5308,6 +5327,16 @@ SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray* #endif return pOperator; + + _error: + if (pInfo != NULL) { + destroyExchangeOperatorInfo(pInfo, 0); + } + + tfree(pInfo); + tfree(pOperator); + terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; + return NULL; } SSDataBlock* createResultDataBlock(const SArray* pExprInfo) { @@ -7115,17 +7144,17 @@ static void destroyGroupbyOperatorInfo(void* param, int32_t numOfOutput) { tfree(pInfo->prevData); } -static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput) { +void destroyProjectOperatorInfo(void* param, int32_t numOfOutput) { SProjectOperatorInfo* pInfo = (SProjectOperatorInfo*) param; doDestroyBasicInfo(&pInfo->binfo, numOfOutput); } -static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) { +void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) { STagScanInfo* pInfo = (STagScanInfo*) param; pInfo->pRes = blockDataDestroy(pInfo->pRes); } -static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) { +void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) { SOrderOperatorInfo* pInfo = (SOrderOperatorInfo*) param; pInfo->pDataBlock = blockDataDestroy(pInfo->pDataBlock); @@ -7145,6 +7174,17 @@ static void destroyDistinctOperatorInfo(void* param, int32_t numOfOutput) { pInfo->pRes = blockDataDestroy(pInfo->pRes); } +void destroyExchangeOperatorInfo(void* param, int32_t numOfOutput) { + SExchangeInfo* pExInfo = (SExchangeInfo*) param; + taosArrayDestroy(pExInfo->pSources); + taosArrayDestroy(pExInfo->pSourceDataInfo); + if (pExInfo->pResult != NULL) { + blockDataDestroy(pExInfo->pResult); + } + + tsem_destroy(&pExInfo->ready); +} + SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo) { SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo)); @@ -7268,7 +7308,7 @@ SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorIn pOperator->nextDataFn = doLimit; pOperator->info = pInfo; pOperator->pRuntimeEnv = pRuntimeEnv; - int32_t code = appendDownstream(pOperator, &downstream, 1); + int32_t code = appendDownstream(pOperator, &downstream, 1); return pOperator; } From 9c610369e27f7cb04bbdedf9d0db7efa9e3d2dc9 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 5 Mar 2022 15:58:28 +0800 Subject: [PATCH 2/5] [td-13039] refactor. --- include/libs/executor/dataSinkMgt.h | 1 - source/libs/executor/inc/executorimpl.h | 49 ++-- source/libs/executor/src/dataDispatcher.c | 19 +- source/libs/executor/src/executorMain.c | 2 +- source/libs/executor/src/executorimpl.c | 267 +++++++++----------- source/libs/executor/test/executorTests.cpp | 12 +- source/libs/qworker/src/qworker.c | 2 +- 7 files changed, 150 insertions(+), 202 deletions(-) diff --git a/include/libs/executor/dataSinkMgt.h b/include/libs/executor/dataSinkMgt.h index 19438b5dd4..08ae2212bc 100644 --- a/include/libs/executor/dataSinkMgt.h +++ b/include/libs/executor/dataSinkMgt.h @@ -40,7 +40,6 @@ int32_t dsDataSinkMgtInit(SDataSinkMgtCfg *cfg); typedef struct SInputData { const struct SSDataBlock* pData; - SHashObj* pTableRetrieveTsMap; } SInputData; typedef struct SOutputData { diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 2280295c13..53372cb2e3 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -253,6 +253,11 @@ typedef struct STaskIdInfo { char* str; } STaskIdInfo; +typedef struct STaskBufInfo { + int32_t bufSize; // total available buffer size in bytes + int32_t remainBuf; // remain buffer size +} STaskBufInfo; + typedef struct SExecTaskInfo { STaskIdInfo id; char* content; @@ -265,7 +270,7 @@ typedef struct SExecTaskInfo { STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray structure char* sql; // query sql string jmp_buf env; // when error occurs, abort - int32_t bufSize; // available buffer size for all operator + STaskBufInfo bufInfo; // available buffer info this task struct SOperatorInfo* pRoot; } SExecTaskInfo; @@ -308,9 +313,11 @@ typedef struct STaskRuntimeEnv { } STaskRuntimeEnv; enum { - OP_IN_EXECUTING = 1, - OP_RES_TO_RETURN = 2, - OP_EXEC_DONE = 3, + OP_NOT_OPENED = 0x0, + OP_OPENED = 0x1, + OP_IN_EXECUTING = 0x3, + OP_RES_TO_RETURN = 0x5, + OP_EXEC_DONE = 0x9, }; typedef struct SOperatorInfo { @@ -328,7 +335,7 @@ typedef struct SOperatorInfo { struct SOperatorInfo** pDownstream; // downstram pointer list int32_t numOfDownstream; // number of downstream. The value is always ONE expect for join operator __optr_open_fn_t openFn; - __optr_fn_t nextDataFn; + __optr_fn_t getNextFn; __optr_close_fn_t closeFn; } SOperatorInfo; @@ -380,9 +387,9 @@ typedef struct STaskParam { } STaskParam; enum { - DATA_NOT_READY = 0x1, - DATA_READY = 0x2, - DATA_EXHAUSTED = 0x3, + EX_SOURCE_DATA_NOT_READY = 0x1, + EX_SOURCE_DATA_READY = 0x2, + EX_SOURCE_DATA_EXHAUSTED = 0x3, }; typedef struct SSourceDataInfo { @@ -407,16 +414,16 @@ typedef struct SExchangeInfo { } SExchangeInfo; typedef struct STableScanInfo { - void* pTsdbReadHandle; - int32_t numOfBlocks; // extract basic running information. - int32_t numOfSkipped; - int32_t numOfBlockStatis; - int64_t numOfRows; + void* pTsdbReadHandle; + int32_t numOfBlocks; // extract basic running information. + int32_t numOfSkipped; + int32_t numOfBlockStatis; + int64_t numOfRows; - int32_t order; // scan order - int32_t times; // repeat counts - int32_t current; - int32_t reverseTimes; // 0 by default + int32_t order; // scan order + int32_t times; // repeat counts + int32_t current; + int32_t reverseTimes; // 0 by default SqlFunctionCtx* pCtx; // next operator query context SResultRowInfo* pResultRowInfo; @@ -427,7 +434,7 @@ typedef struct STableScanInfo { int64_t elapsedTime; int32_t prevGroupId; // previous table group id - int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan + int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan } STableScanInfo; typedef struct STagScanInfo { @@ -624,9 +631,9 @@ SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SArray SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo); SOperatorInfo* createOrderOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SArray* pOrderVal, SExecTaskInfo* pTaskInfo); SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SArray* pExprInfo, SArray* pOrderVal, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SInterval* pInterval, SExecTaskInfo* pTaskInfo); SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream); -SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SInterval* pInterval, SExecTaskInfo* pTaskInfo); SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput); @@ -672,13 +679,9 @@ void* doDestroyFilterInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFil void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order); void finalizeQueryResult(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset); -void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t* bufCapacity, int32_t numOfInputRows); void clearOutputBuf(SOptrBasicInfo* pBInfo, int32_t* bufCapacity); void copyTsColoum(SSDataBlock* pRes, SqlFunctionCtx* pCtx, int32_t numOfOutput); -int32_t createQueryFunc(SQueriedTableInfo* pTableInfo, int32_t numOfOutput, SExprInfo** pExprInfo, SSqlExpr** pExprMsg, - SColumnInfo* pTagCols, int32_t queryType, void* pMsg, struct SUdfInfo* pUdfInfo); - int32_t createIndirectQueryFuncExprFromMsg(SQueryTableReq* pQueryMsg, int32_t numOfOutput, SExprInfo** pExprInfo, SSqlExpr** pExpr, SExprInfo* prevExpr, struct SUdfInfo* pUdfInfo); diff --git a/source/libs/executor/src/dataDispatcher.c b/source/libs/executor/src/dataDispatcher.c index 3623f5947d..a9865e5dec 100644 --- a/source/libs/executor/src/dataDispatcher.c +++ b/source/libs/executor/src/dataDispatcher.c @@ -21,8 +21,6 @@ #include "tqueue.h" #include "executorimpl.h" -#define DATA_META_LENGTH(tables) (sizeof(int32_t) + sizeof(STableIdInfo) * taosHashGetSize(tables) + sizeof(SRetrieveTableRsp)) - typedef struct SDataDispatchBuf { int32_t useSize; int32_t allocSize; @@ -88,19 +86,6 @@ static void copyData(const SInputData* pInput, const SDataBlockSchema* pSchema, data += pColRes->info.bytes * pInput->pData->info.rows; } } - - int32_t numOfTables = (int32_t) taosHashGetSize(pInput->pTableRetrieveTsMap); - *(int32_t*)data = htonl(numOfTables); - data += sizeof(int32_t); - - STableIdInfo* item = taosHashIterate(pInput->pTableRetrieveTsMap, NULL); - while (item) { - STableIdInfo* pDst = (STableIdInfo*)data; - pDst->uid = htobe64(item->uid); - pDst->key = htobe64(item->key); - data += sizeof(STableIdInfo); - item = taosHashIterate(pInput->pTableRetrieveTsMap, item); - } } // data format with compress: SDataCacheEntry | cols_data_offset | col1_data col2_data ... | numOfTables | STableIdInfo STableIdInfo ... @@ -111,7 +96,7 @@ static void toDataCacheEntry(const SDataDispatchHandle* pHandle, const SInputDat pEntry->numOfRows = pInput->pData->info.rows; pEntry->dataLen = 0; - pBuf->useSize = DATA_META_LENGTH(pInput->pTableRetrieveTsMap); + pBuf->useSize = sizeof(SRetrieveTableRsp); copyData(pInput, &pHandle->schema, pEntry->data, pEntry->compressed, &pEntry->dataLen); if (0 == pEntry->compressed) { pEntry->dataLen = pHandle->schema.resultRowSize * pInput->pData->info.rows; @@ -128,7 +113,7 @@ static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput, return false; } - pBuf->allocSize = DATA_META_LENGTH(pInput->pTableRetrieveTsMap) + pDispatcher->schema.resultRowSize * pInput->pData->info.rows; + pBuf->allocSize = sizeof(SRetrieveTableRsp) + pDispatcher->schema.resultRowSize * pInput->pData->info.rows; pBuf->pData = malloc(pBuf->allocSize); if (pBuf->pData == NULL) { qError("SinkNode failed to malloc memory, size:%d, code:%d", pBuf->allocSize, TAOS_SYSTEM_ERROR(errno)); diff --git a/source/libs/executor/src/executorMain.c b/source/libs/executor/src/executorMain.c index fabaa2d31d..7e55a4b3e1 100644 --- a/source/libs/executor/src/executorMain.c +++ b/source/libs/executor/src/executorMain.c @@ -158,7 +158,7 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) { int64_t st = 0; st = taosGetTimestampUs(); - *pRes = pTaskInfo->pRoot->nextDataFn(pTaskInfo->pRoot, &newgroup); + *pRes = pTaskInfo->pRoot->getNextFn(pTaskInfo->pRoot, &newgroup); uint64_t el = (taosGetTimestampUs() - st); pTaskInfo->cost.elapsedTime += el; diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 675fd3e39f..197bb97d3a 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -220,10 +220,14 @@ static void doSetOperatorCompleted(SOperatorInfo* pOperator) { } } -static void dummyOperatorOpenFn() { - return; +static int32_t operatorDummyOpenFn(void* param) { + SOperatorInfo* pOperator = (SOperatorInfo*) param; + pOperator->status = OP_OPENED; + return TSDB_CODE_SUCCESS; } +static void operatorDummyCloseFn(void* param, int32_t numOfCols) {} + static int32_t doCopyToSDataBlock(SDiskbasedBuf *pBuf, SGroupResInfo* pGroupResInfo, int32_t orderType, SSDataBlock* pBlock, int32_t rowCapacity); static int32_t getGroupbyColumnIndex(SGroupbyExpr *pGroupbyExpr, SSDataBlock* pDataBlock); @@ -4940,7 +4944,7 @@ int32_t loadRemoteDataCallback(void* param, const SDataBuf* pMsg, int32_t code) pRsp->useconds = htobe64(pRsp->useconds); pRsp->compLen = htonl(pRsp->compLen); - pSourceDataInfo->status = DATA_READY; + pSourceDataInfo->status = EX_SOURCE_DATA_READY; tsem_post(&pSourceDataInfo->pEx->ready); } @@ -5068,12 +5072,12 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo *pOperator, SEx for (int32_t i = 0; i < totalSources; ++i) { SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, i); - if (pDataInfo->status == DATA_EXHAUSTED) { + if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) { completed += 1; continue; } - if (pDataInfo->status != DATA_READY) { + if (pDataInfo->status != EX_SOURCE_DATA_READY) { continue; } @@ -5086,7 +5090,7 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo *pOperator, SEx qDebug("%s vgId:%d, taskID:0x%" PRIx64 " index:%d completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 " try next", GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, i + 1, pDataInfo->totalRows, pExchangeInfo->totalRows); - pDataInfo->status = DATA_EXHAUSTED; + pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED; completed += 1; continue; } @@ -5102,15 +5106,15 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo *pOperator, SEx GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows, pDataInfo->totalRows, pExchangeInfo->totalRows, pExchangeInfo->totalSize, i + 1, totalSources); - pDataInfo->status = DATA_EXHAUSTED; + pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED; } else { qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, totalRows:%" PRIu64 ", totalBytes:%" PRIu64, GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows, pExchangeInfo->totalRows, pExchangeInfo->totalSize); } - if (pDataInfo->status != DATA_EXHAUSTED) { - pDataInfo->status = DATA_NOT_READY; + if (pDataInfo->status != EX_SOURCE_DATA_EXHAUSTED) { + pDataInfo->status = EX_SOURCE_DATA_NOT_READY; code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i); if (code != TSDB_CODE_SUCCESS) { goto _error; @@ -5130,14 +5134,10 @@ _error: return NULL; } -static SSDataBlock* concurrentlyLoadRemoteData(SOperatorInfo *pOperator) { +static int32_t prepareConcurrentlyLoad(SOperatorInfo *pOperator) { SExchangeInfo *pExchangeInfo = pOperator->info; SExecTaskInfo *pTaskInfo = pOperator->pTaskInfo; - if (pOperator->status == OP_RES_TO_RETURN) { - return concurrentlyLoadRemoteDataImpl(pOperator, pExchangeInfo, pTaskInfo); - } - size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources); int64_t startTs = taosGetTimestampUs(); @@ -5145,7 +5145,8 @@ static SSDataBlock* concurrentlyLoadRemoteData(SOperatorInfo *pOperator) { for(int32_t i = 0; i < totalSources; ++i) { int32_t code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i); if (code != TSDB_CODE_SUCCESS) { - return NULL; + pTaskInfo->code = code; + return code; } } @@ -5153,9 +5154,9 @@ static SSDataBlock* concurrentlyLoadRemoteData(SOperatorInfo *pOperator) { qDebug("%s send all fetch request to %"PRIzu" sources completed, elapsed:%"PRId64, GET_TASKID(pTaskInfo), totalSources, endTs - startTs); tsem_wait(&pExchangeInfo->ready); + pOperator->cost.openCost = taosGetTimestampUs() - startTs; - pOperator->status = OP_RES_TO_RETURN; - return concurrentlyLoadRemoteDataImpl(pOperator, pExchangeInfo, pTaskInfo); + return TSDB_CODE_SUCCESS; } static SSDataBlock* seqLoadRemoteData(SOperatorInfo *pOperator) { @@ -5183,7 +5184,7 @@ static SSDataBlock* seqLoadRemoteData(SOperatorInfo *pOperator) { GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pExchangeInfo->current + 1, pDataInfo->totalRows, pExchangeInfo->totalRows); - pDataInfo->status = DATA_EXHAUSTED; + pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED; pExchangeInfo->current += 1; continue; } @@ -5198,7 +5199,7 @@ static SSDataBlock* seqLoadRemoteData(SOperatorInfo *pOperator) { pDataInfo->totalRows, pExchangeInfo->totalRows, pExchangeInfo->totalSize, pExchangeInfo->current + 1, totalSources); - pDataInfo->status = DATA_EXHAUSTED; + pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED; pExchangeInfo->current += 1; } else { qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, totalRows:%" PRIu64 ", totalBytes:%" PRIu64, @@ -5209,6 +5210,26 @@ static SSDataBlock* seqLoadRemoteData(SOperatorInfo *pOperator) { } } +static int32_t prepareLoadRemoteData(void* param) { + SOperatorInfo *pOperator = (SOperatorInfo*) param; + if ((pOperator->status & OP_OPENED) == OP_OPENED) { + return TSDB_CODE_SUCCESS; + } + + SExchangeInfo *pExchangeInfo = pOperator->info; + if (pExchangeInfo->seqLoadData) { + // do nothing for sequentially load data + } else { + int32_t code = prepareConcurrentlyLoad(pOperator); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + } + + pOperator->status = OP_OPENED; + return TSDB_CODE_SUCCESS; +} + static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) { SOperatorInfo *pOperator = (SOperatorInfo*) param; @@ -5216,6 +5237,10 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) { SExecTaskInfo *pTaskInfo = pOperator->pTaskInfo; size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources); + if ((pOperator->status & OP_OPENED) != OP_OPENED) { + pTaskInfo->code = TSDB_CODE_QRY_IN_EXEC; // TODO add a new error code + return NULL; + } if (pOperator->status == OP_EXEC_DONE) { qDebug("%s all %"PRIzu" source(s) are exhausted, total rows:%"PRIu64" bytes:%"PRIu64", elapsed:%.2f ms", GET_TASKID(pTaskInfo), totalSources, @@ -5224,11 +5249,10 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) { } *newgroup = false; - if (pExchangeInfo->seqLoadData) { return seqLoadRemoteData(pOperator); } else { - return concurrentlyLoadRemoteData(pOperator); + return concurrentlyLoadRemoteDataImpl(pOperator, pExchangeInfo, pTaskInfo); } #if 0 @@ -5251,7 +5275,7 @@ static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo) { for(int32_t i = 0; i < numOfSources; ++i) { SSourceDataInfo dataInfo = {0}; - dataInfo.status = DATA_NOT_READY; + dataInfo.status = EX_SOURCE_DATA_NOT_READY; dataInfo.pEx = pInfo; dataInfo.index = i; @@ -5295,12 +5319,12 @@ SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray* pOperator->name = "ExchangeOperator"; pOperator->operatorType = OP_Exchange; pOperator->blockingOptr = false; - pOperator->status = OP_IN_EXECUTING; + pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; pOperator->numOfOutput = taosArrayGetSize(pExprInfo); pOperator->pTaskInfo = pTaskInfo; - pOperator->openFn = NULL; // assign a dummy function. - pOperator->nextDataFn = doLoadRemoteData; + pOperator->openFn = prepareLoadRemoteData; // assign a dummy function. + pOperator->getNextFn = doLoadRemoteData; pOperator->closeFn = destroyExchangeOperatorInfo; #if 1 @@ -5387,10 +5411,12 @@ SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, pOperator->name = "TableScanOperator"; pOperator->operatorType = OP_TableScan; pOperator->blockingOptr = false; - pOperator->status = OP_IN_EXECUTING; + pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; pOperator->numOfOutput = numOfOutput; - pOperator->nextDataFn = doTableScan; + pOperator->openFn = operatorDummyOpenFn; + pOperator->getNextFn = doTableScan; + pOperator->closeFn = operatorDummyCloseFn; pOperator->pTaskInfo = pTaskInfo; return pOperator; @@ -5411,11 +5437,11 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntim pOperator->name = "TableSeqScanOperator"; pOperator->operatorType = OP_TableSeqScan; pOperator->blockingOptr = false; - pOperator->status = OP_IN_EXECUTING; + pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; pOperator->numOfOutput = pRuntimeEnv->pQueryAttr->numOfCols; pOperator->pRuntimeEnv = pRuntimeEnv; - pOperator->nextDataFn = doTableScanImpl; + pOperator->getNextFn = doTableScanImpl; return pOperator; } @@ -5436,10 +5462,10 @@ SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbReadHandle, STaskRunt pOperator->name = "TableBlockInfoScanOperator"; // pOperator->operatorType = OP_TableBlockInfoScan; pOperator->blockingOptr = false; - pOperator->status = OP_IN_EXECUTING; + pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; // pOperator->numOfOutput = pRuntimeEnv->pQueryAttr->numOfCols; - pOperator->nextDataFn = doBlockInfoScan; + pOperator->getNextFn = doBlockInfoScan; return pOperator; } @@ -5478,10 +5504,13 @@ SOperatorInfo* createStreamScanOperatorInfo(void *streamReadHandle, SArray* pExp pOperator->name = "StreamBlockScanOperator"; pOperator->operatorType = OP_StreamScan; pOperator->blockingOptr = false; - pOperator->status = OP_IN_EXECUTING; + pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; pOperator->numOfOutput = numOfOutput; - pOperator->nextDataFn = doStreamBlockScan; + pOperator->openFn = operatorDummyOpenFn; + pOperator->getNextFn = doStreamBlockScan; + pOperator->closeFn = operatorDummyCloseFn; + pOperator->pTaskInfo = pTaskInfo; return pOperator; } @@ -5692,7 +5721,7 @@ SSDataBlock* loadNextDataBlock(void* param) { SOperatorInfo* pOperator = (SOperatorInfo*) param; bool newgroup = false; - return pOperator->nextDataFn(pOperator, &newgroup); + return pOperator->getNextFn(pOperator, &newgroup); } static bool needToMerge(SSDataBlock* pBlock, SArray* groupInfo, char **buf, int32_t rowIndex) { @@ -6006,13 +6035,13 @@ SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t pOperator->name = "SortedMerge"; pOperator->operatorType = OP_SortedMerge; pOperator->blockingOptr = true; - pOperator->status = OP_IN_EXECUTING; + pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; pOperator->numOfOutput = numOfOutput; pOperator->pExpr = exprArrayDup(pExprInfo); pOperator->pTaskInfo = pTaskInfo; - pOperator->nextDataFn = doSortedMerge; + pOperator->getNextFn = doSortedMerge; pOperator->closeFn = destroySortedMergeOperatorInfo; code = appendDownstream(pOperator, downstream, numOfDownstream); @@ -6104,11 +6133,11 @@ SOperatorInfo *createOrderOperatorInfo(SOperatorInfo* downstream, SArray* pExprI pOperator->name = "Order"; pOperator->operatorType = OP_Order; pOperator->blockingOptr = true; - pOperator->status = OP_IN_EXECUTING; + pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; pOperator->pTaskInfo = pTaskInfo; - pOperator->nextDataFn = doSort; + pOperator->getNextFn = doSort; pOperator->closeFn = destroyOrderOperatorInfo; int32_t code = appendDownstream(pOperator, &downstream, 1); @@ -6134,7 +6163,7 @@ static SSDataBlock* doAggregate(void* param, bool* newgroup) { while(1) { publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock* pBlock = downstream->nextDataFn(downstream, newgroup); + SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup); publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { @@ -6184,7 +6213,7 @@ static SSDataBlock* doMultiTableAggregate(void* param, bool* newgroup) { while(1) { publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock* pBlock = downstream->nextDataFn(downstream, newgroup); + SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup); publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { @@ -6270,7 +6299,7 @@ static SSDataBlock* doProjectOperation(void* param, bool* newgroup) { // The downstream exec may change the value of the newgroup, so use a local variable instead. publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock* pBlock = pOperator->pDownstream[0]->nextDataFn(pOperator->pDownstream[0], newgroup); + SSDataBlock* pBlock = pOperator->pDownstream[0]->getNextFn(pOperator->pDownstream[0], newgroup); publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { @@ -6328,7 +6357,7 @@ static SSDataBlock* doLimit(void* param, bool* newgroup) { SSDataBlock* pBlock = NULL; while (1) { publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); - pBlock = pOperator->pDownstream[0]->nextDataFn(pOperator->pDownstream[0], newgroup); + pBlock = pOperator->pDownstream[0]->getNextFn(pOperator->pDownstream[0], newgroup); publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { @@ -6379,7 +6408,7 @@ static SSDataBlock* doFilter(void* param, bool* newgroup) { while (1) { publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock *pBlock = pOperator->pDownstream[0]->nextDataFn(pOperator->pDownstream[0], newgroup); + SSDataBlock *pBlock = pOperator->pDownstream[0]->getNextFn(pOperator->pDownstream[0], newgroup); publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { @@ -6422,7 +6451,7 @@ static SSDataBlock* doIntervalAgg(void* param, bool* newgroup) { while(1) { publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock* pBlock = downstream->nextDataFn(downstream, newgroup); + SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup); publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { @@ -6482,7 +6511,7 @@ static SSDataBlock* doAllIntervalAgg(void* param, bool* newgroup) { while(1) { publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock* pBlock = downstream->nextDataFn(downstream, newgroup); + SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup); publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { @@ -6545,7 +6574,7 @@ static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) { while(1) { publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock* pBlock = downstream->nextDataFn(downstream, newgroup); + SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup); publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { @@ -6600,7 +6629,7 @@ static SSDataBlock* doAllSTableIntervalAgg(void* param, bool* newgroup) { while(1) { publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock* pBlock = downstream->nextDataFn(downstream, newgroup); + SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup); publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { @@ -6735,7 +6764,7 @@ static SSDataBlock* doStateWindowAgg(void *param, bool* newgroup) { SOperatorInfo* downstream = pOperator->pDownstream[0]; while (1) { publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock* pBlock = downstream->nextDataFn(downstream, newgroup); + SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup); publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { @@ -6797,7 +6826,7 @@ static SSDataBlock* doSessionWindowAgg(void* param, bool* newgroup) { while(1) { publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock* pBlock = downstream->nextDataFn(downstream, newgroup); + SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup); publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { break; @@ -6850,7 +6879,7 @@ static SSDataBlock* hashGroupbyAggregate(void* param, bool* newgroup) { while(1) { publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock* pBlock = downstream->nextDataFn(downstream, newgroup); + SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup); publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { break; @@ -6935,7 +6964,7 @@ static SSDataBlock* doFill(void* param, bool* newgroup) { while(1) { publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock* pBlock = pOperator->pDownstream[0]->nextDataFn(pOperator->pDownstream[0], newgroup); + SSDataBlock* pBlock = pOperator->pDownstream[0]->getNextFn(pOperator->pDownstream[0], newgroup); publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC); if (*newgroup) { @@ -7090,14 +7119,15 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SArray* pE pOperator->name = "TableAggregate"; pOperator->operatorType = OP_Aggregate; pOperator->blockingOptr = true; - pOperator->status = OP_IN_EXECUTING; + pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; pOperator->pExpr = exprArrayDup(pExprInfo); pOperator->numOfOutput = taosArrayGetSize(pExprInfo); pOperator->pTaskInfo = pTaskInfo; - pOperator->nextDataFn = doAggregate; - pOperator->closeFn = destroyAggOperatorInfo; + pOperator->openFn = operatorDummyOpenFn; + pOperator->getNextFn = doAggregate; + pOperator->closeFn = destroyAggOperatorInfo; int32_t code = appendDownstream(pOperator, &downstream, 1); return pOperator; @@ -7199,12 +7229,12 @@ SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SArray pOperator->name = "MultiTableAggregate"; pOperator->operatorType = OP_MultiTableAggregate; pOperator->blockingOptr = true; - pOperator->status = OP_IN_EXECUTING; + pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; pOperator->pExpr = exprArrayDup(pExprInfo); pOperator->numOfOutput = numOfOutput; - pOperator->nextDataFn = doMultiTableAggregate; + pOperator->getNextFn = doMultiTableAggregate; pOperator->closeFn = destroyAggOperatorInfo; int32_t code = appendDownstream(pOperator, &downstream, 1); @@ -7225,12 +7255,12 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SArray* pExp pOperator->name = "ProjectOperator"; pOperator->operatorType = OP_Project; pOperator->blockingOptr = false; - pOperator->status = OP_IN_EXECUTING; + pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; pOperator->pExpr = exprArrayDup(pExprInfo); pOperator->numOfOutput = taosArrayGetSize(pExprInfo); - pOperator->nextDataFn = doProjectOperation; + pOperator->getNextFn = doProjectOperation; pOperator->closeFn = destroyProjectOperatorInfo; int32_t code = appendDownstream(pOperator, &downstream, 1); @@ -7283,10 +7313,10 @@ SOperatorInfo* createFilterOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorI pOperator->name = "FilterOperator"; // pOperator->operatorType = OP_Filter; pOperator->blockingOptr = false; - pOperator->status = OP_IN_EXECUTING; + pOperator->status = OP_NOT_OPENED; pOperator->numOfOutput = numOfOutput; pOperator->pExpr = pExpr; - pOperator->nextDataFn = doFilter; + pOperator->getNextFn = doFilter; pOperator->info = pInfo; pOperator->pRuntimeEnv = pRuntimeEnv; pOperator->closeFn = destroyConditionOperatorInfo; @@ -7304,8 +7334,8 @@ SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorIn pOperator->name = "LimitOperator"; // pOperator->operatorType = OP_Limit; pOperator->blockingOptr = false; - pOperator->status = OP_IN_EXECUTING; - pOperator->nextDataFn = doLimit; + pOperator->status = OP_NOT_OPENED; + pOperator->getNextFn = doLimit; pOperator->info = pInfo; pOperator->pRuntimeEnv = pRuntimeEnv; int32_t code = appendDownstream(pOperator, &downstream, 1); @@ -7335,13 +7365,13 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SArray* pEx pOperator->name = "TimeIntervalAggOperator"; pOperator->operatorType = OP_TimeWindow; pOperator->blockingOptr = true; - pOperator->status = OP_IN_EXECUTING; + pOperator->status = OP_NOT_OPENED; pOperator->pExpr = exprArrayDup(pExprInfo); pOperator->pTaskInfo = pTaskInfo; pOperator->numOfOutput = taosArrayGetSize(pExprInfo); pOperator->info = pInfo; - pOperator->nextDataFn = doIntervalAgg; + pOperator->getNextFn = doIntervalAgg; pOperator->closeFn = destroyBasicOperatorInfo; code = appendDownstream(pOperator, &downstream, 1); @@ -7360,12 +7390,12 @@ SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, S pOperator->name = "AllTimeIntervalAggOperator"; // pOperator->operatorType = OP_AllTimeWindow; pOperator->blockingOptr = true; - pOperator->status = OP_IN_EXECUTING; + pOperator->status = OP_NOT_OPENED; pOperator->pExpr = pExpr; pOperator->numOfOutput = numOfOutput; pOperator->info = pInfo; pOperator->pRuntimeEnv = pRuntimeEnv; - pOperator->nextDataFn = doAllIntervalAgg; + pOperator->getNextFn = doAllIntervalAgg; pOperator->closeFn = destroyBasicOperatorInfo; int32_t code = appendDownstream(pOperator, &downstream, 1); @@ -7384,12 +7414,12 @@ SOperatorInfo* createStatewindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOper pOperator->name = "StateWindowOperator"; // pOperator->operatorType = OP_StateWindow; pOperator->blockingOptr = true; - pOperator->status = OP_IN_EXECUTING; + pOperator->status = OP_NOT_OPENED; pOperator->pExpr = pExpr; pOperator->numOfOutput = numOfOutput; pOperator->info = pInfo; pOperator->pRuntimeEnv = pRuntimeEnv; - pOperator->nextDataFn = doStateWindowAgg; + pOperator->getNextFn = doStateWindowAgg; pOperator->closeFn = destroyStateWindowOperatorInfo; int32_t code = appendDownstream(pOperator, &downstream, 1); @@ -7409,12 +7439,12 @@ SOperatorInfo* createSWindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperator pOperator->name = "SessionWindowAggOperator"; // pOperator->operatorType = OP_SessionWindow; pOperator->blockingOptr = true; - pOperator->status = OP_IN_EXECUTING; + pOperator->status = OP_NOT_OPENED; pOperator->pExpr = pExpr; pOperator->numOfOutput = numOfOutput; pOperator->info = pInfo; pOperator->pRuntimeEnv = pRuntimeEnv; - pOperator->nextDataFn = doSessionWindowAgg; + pOperator->getNextFn = doSessionWindowAgg; pOperator->closeFn = destroySWindowOperatorInfo; int32_t code = appendDownstream(pOperator, &downstream, 1); @@ -7432,13 +7462,13 @@ SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntim pOperator->name = "MultiTableTimeIntervalOperator"; // pOperator->operatorType = OP_MultiTableTimeInterval; pOperator->blockingOptr = true; - pOperator->status = OP_IN_EXECUTING; + pOperator->status = OP_NOT_OPENED; pOperator->pExpr = pExpr; pOperator->numOfOutput = numOfOutput; pOperator->info = pInfo; pOperator->pRuntimeEnv = pRuntimeEnv; - pOperator->nextDataFn = doSTableIntervalAgg; + pOperator->getNextFn = doSTableIntervalAgg; pOperator->closeFn = destroyBasicOperatorInfo; int32_t code = appendDownstream(pOperator, &downstream, 1); @@ -7456,13 +7486,13 @@ SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRun pOperator->name = "AllMultiTableTimeIntervalOperator"; // pOperator->operatorType = OP_AllMultiTableTimeInterval; pOperator->blockingOptr = true; - pOperator->status = OP_IN_EXECUTING; + pOperator->status = OP_NOT_OPENED; pOperator->pExpr = pExpr; pOperator->numOfOutput = numOfOutput; pOperator->info = pInfo; pOperator->pRuntimeEnv = pRuntimeEnv; - pOperator->nextDataFn = doAllSTableIntervalAgg; + pOperator->getNextFn = doAllSTableIntervalAgg; pOperator->closeFn = destroyBasicOperatorInfo; int32_t code = appendDownstream(pOperator, &downstream, 1); @@ -7488,13 +7518,13 @@ SOperatorInfo* createGroupbyOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperator SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "GroupbyAggOperator"; pOperator->blockingOptr = true; - pOperator->status = OP_IN_EXECUTING; + pOperator->status = OP_NOT_OPENED; // pOperator->operatorType = OP_Groupby; pOperator->pExpr = pExpr; pOperator->numOfOutput = numOfOutput; pOperator->info = pInfo; pOperator->pRuntimeEnv = pRuntimeEnv; - pOperator->nextDataFn = hashGroupbyAggregate; + pOperator->getNextFn = hashGroupbyAggregate; pOperator->closeFn = destroyGroupbyOperatorInfo; int32_t code = appendDownstream(pOperator, &downstream, 1); @@ -7527,13 +7557,13 @@ SOperatorInfo* createFillOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInf pOperator->name = "FillOperator"; pOperator->blockingOptr = false; - pOperator->status = OP_IN_EXECUTING; + pOperator->status = OP_NOT_OPENED; // pOperator->operatorType = OP_Fill; pOperator->pExpr = pExpr; pOperator->numOfOutput = numOfOutput; pOperator->info = pInfo; pOperator->pRuntimeEnv = pRuntimeEnv; - pOperator->nextDataFn = doFill; + pOperator->getNextFn = doFill; pOperator->closeFn = destroySFillOperatorInfo; int32_t code = appendDownstream(pOperator, &downstream, 1); @@ -7578,7 +7608,7 @@ SOperatorInfo* createSLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorI pOperator->name = "SLimitOperator"; pOperator->operatorType = OP_SLimit; pOperator->blockingOptr = false; - pOperator->status = OP_IN_EXECUTING; + pOperator->status = OP_NOT_OPENED; // pOperator->exec = doSLimit; pOperator->info = pInfo; pOperator->pRuntimeEnv = pRuntimeEnv; @@ -7734,9 +7764,9 @@ SOperatorInfo* createTagScanOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SExprInfo pOperator->name = "SeqTableTagScan"; pOperator->operatorType = OP_TagScan; pOperator->blockingOptr = false; - pOperator->status = OP_IN_EXECUTING; + pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; - pOperator->nextDataFn = doTagScan; + pOperator->getNextFn = doTagScan; pOperator->pExpr = pExpr; pOperator->numOfOutput = numOfOutput; pOperator->pRuntimeEnv = pRuntimeEnv; @@ -7806,7 +7836,7 @@ static SSDataBlock* hashDistinct(void* param, bool* newgroup) { while(1) { publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); - pBlock = pOperator->pDownstream[0]->nextDataFn(pOperator->pDownstream[0], newgroup); + pBlock = pOperator->pDownstream[0]->getNextFn(pOperator->pDownstream[0], newgroup); publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { @@ -7872,13 +7902,13 @@ SOperatorInfo* createDistinctOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperato SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "DistinctOperator"; pOperator->blockingOptr = false; - pOperator->status = OP_IN_EXECUTING; + pOperator->status = OP_NOT_OPENED; // pOperator->operatorType = OP_Distinct; pOperator->pExpr = pExpr; pOperator->numOfOutput = numOfOutput; pOperator->info = pInfo; pOperator->pRuntimeEnv = pRuntimeEnv; - pOperator->nextDataFn = hashDistinct; + pOperator->getNextFn = hashDistinct; pOperator->pExpr = pExpr; pOperator->closeFn = destroyDistinctOperatorInfo; @@ -8589,75 +8619,6 @@ void setResultBufSize(STaskAttr* pQueryAttr, SRspResultInfo* pResultInfo) { pResultInfo->total = 0; } -FORCE_INLINE bool checkQIdEqual(void *qHandle, uint64_t qId) { - return ((SQInfo *)qHandle)->qId == qId; -} - -int32_t initQInfo(STsBufInfo* pTsBufInfo, void* tsdb, void* sourceOptr, SQInfo* pQInfo, STaskParam* param, char* start, - int32_t prevResultLen, void* merger) { - int32_t code = TSDB_CODE_SUCCESS; - - STaskRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv; - pRuntimeEnv->qinfo = pQInfo; - - STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr; - - STSBuf *pTsBuf = NULL; - - if (pTsBufInfo->tsLen > 0) { // open new file to save the result - char* tsBlock = start + pTsBufInfo->tsOffset; - pTsBuf = tsBufCreateFromCompBlocks(tsBlock, pTsBufInfo->tsNumOfBlocks, pTsBufInfo->tsLen, pTsBufInfo->tsOrder, - pQueryAttr->vgId); - - if (pTsBuf == NULL) { - code = TSDB_CODE_QRY_NO_DISKSPACE; - goto _error; - } - tsBufResetPos(pTsBuf); - bool ret = tsBufNextPos(pTsBuf); - UNUSED(ret); - } - - SArray* prevResult = NULL; - if (prevResultLen > 0) { - prevResult = interResFromBinary(param->prevResult, prevResultLen); - pRuntimeEnv->prevResult = prevResult; - } - - pRuntimeEnv->currentOffset = pQueryAttr->limit.offset; - if (tsdb != NULL) { -// pQueryAttr->precision = tsdbGetCfg(tsdb)->precision; - } - - if ((QUERY_IS_ASC_QUERY(pQueryAttr) && (pQueryAttr->window.skey > pQueryAttr->window.ekey)) || - (!QUERY_IS_ASC_QUERY(pQueryAttr) && (pQueryAttr->window.ekey > pQueryAttr->window.skey))) { - //qDebug("QInfo:0x%"PRIx64" no result in time range %" PRId64 "-%" PRId64 ", order %d", pQInfo->qId, pQueryAttr->window.skey, -// pQueryAttr->window.ekey, pQueryAttr->order.order); -// setTaskStatus(pOperator->pTaskInfo, QUERY_COMPLETED); - pRuntimeEnv->tableqinfoGroupInfo.numOfTables = 0; - // todo free memory - return TSDB_CODE_SUCCESS; - } - - if (pRuntimeEnv->tableqinfoGroupInfo.numOfTables == 0) { - //qDebug("QInfo:0x%"PRIx64" no table qualified for tag filter, abort query", pQInfo->qId); -// setTaskStatus(pOperator->pTaskInfo, QUERY_COMPLETED); - return TSDB_CODE_SUCCESS; - } - - // filter the qualified - if ((code = doInitQInfo(pQInfo, pTsBuf, tsdb, sourceOptr, param->tableScanOperator, param->pOperator, merger)) != TSDB_CODE_SUCCESS) { - goto _error; - } - - return code; - -_error: - // table query ref will be decrease during error handling -// doDestroyTask(pQInfo); - return code; -} - //TODO refactor void freeColumnFilterInfo(SColumnFilterInfo* pFilter, int32_t numOfFilters) { if (pFilter == NULL || numOfFilters == 0) { diff --git a/source/libs/executor/test/executorTests.cpp b/source/libs/executor/test/executorTests.cpp index ff29d5f355..607a70437f 100644 --- a/source/libs/executor/test/executorTests.cpp +++ b/source/libs/executor/test/executorTests.cpp @@ -201,9 +201,9 @@ SOperatorInfo* createDummyOperator(int32_t startVal, int32_t numOfBlocks, int32_ pOperator->name = "dummyInputOpertor4Test"; if (numOfCols == 1) { - pOperator->nextDataFn = getDummyBlock; + pOperator->getNextFn = getDummyBlock; } else { - pOperator->nextDataFn = get2ColsDummyBlock; + pOperator->getNextFn = get2ColsDummyBlock; } SDummyInputInfo *pInfo = (SDummyInputInfo*) calloc(1, sizeof(SDummyInputInfo)); @@ -837,7 +837,7 @@ TEST(testCase, inMem_sort_Test) { SOperatorInfo* pOperator = createOrderOperatorInfo(createDummyOperator(10000, 5, 1000, data_asc, 1), pExprInfo, pOrderVal, NULL); bool newgroup = false; - SSDataBlock* pRes = pOperator->nextDataFn(pOperator, &newgroup); + SSDataBlock* pRes = pOperator->getNextFn(pOperator, &newgroup); SColumnInfoData* pCol1 = static_cast(taosArrayGet(pRes->pDataBlock, 0)); SColumnInfoData* pCol2 = static_cast(taosArrayGet(pRes->pDataBlock, 1)); @@ -912,7 +912,7 @@ TEST(testCase, external_sort_Test) { while(1) { int64_t s = taosGetTimestampUs(); - pRes = pOperator->nextDataFn(pOperator, &newgroup); + pRes = pOperator->getNextFn(pOperator, &newgroup); int64_t e = taosGetTimestampUs(); if (t++ == 1) { @@ -984,7 +984,7 @@ TEST(testCase, sorted_merge_Test) { while(1) { int64_t s = taosGetTimestampUs(); - pRes = pOperator->nextDataFn(pOperator, &newgroup); + pRes = pOperator->getNextFn(pOperator, &newgroup); int64_t e = taosGetTimestampUs(); if (t++ == 1) { @@ -1062,7 +1062,7 @@ TEST(testCase, time_interval_Operator_Test) { while(1) { int64_t s = taosGetTimestampUs(); - pRes = pOperator->nextDataFn(pOperator, &newgroup); + pRes = pOperator->getNextFn(pOperator, &newgroup); int64_t e = taosGetTimestampUs(); if (t++ == 1) { diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 7a6fa4cfd7..c7ca0a6132 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -494,7 +494,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) { ASSERT(pRes->info.rows > 0); - SInputData inputData = {.pData = pRes, .pTableRetrieveTsMap = NULL}; + SInputData inputData = {.pData = pRes}; code = dsPutDataBlock(sinkHandle, &inputData, &qcontinue); if (code) { QW_TASK_ELOG("dsPutDataBlock failed, code:%s", tstrerror(code)); From baf0ac2d3150048899aaab1ff4b330602353adfc Mon Sep 17 00:00:00 2001 From: ubuntu Date: Sat, 12 Mar 2022 11:31:07 +0800 Subject: [PATCH 3/5] update UT test --- include/libs/transport/trpc.h | 7 - source/libs/transport/test/CMakeLists.txt | 18 -- source/libs/transport/test/pushClient.c | 242 ---------------------- 3 files changed, 267 deletions(-) delete mode 100644 source/libs/transport/test/pushClient.c diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index feb493b50f..fdc9368b76 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -29,7 +29,6 @@ extern "C" { extern int tsRpcHeadSize; -typedef struct SRpcPush SRpcPush; typedef struct SRpcConnInfo { uint32_t clientIp; @@ -45,14 +44,8 @@ typedef struct SRpcMsg { int32_t code; void * handle; // rpc handle returned to app void * ahandle; // app handle set by client - int persist; // keep handle or not, default 0 - } SRpcMsg; -typedef struct SRpcPush { - void *arg; - int (*callback)(void *arg, SRpcMsg *rpcMsg); -} SRpcPush; typedef struct SRpcInit { uint16_t localPort; // local port diff --git a/source/libs/transport/test/CMakeLists.txt b/source/libs/transport/test/CMakeLists.txt index b4f50219ff..b29bad07f0 100644 --- a/source/libs/transport/test/CMakeLists.txt +++ b/source/libs/transport/test/CMakeLists.txt @@ -3,7 +3,6 @@ add_executable(client "") add_executable(server "") add_executable(transUT "") add_executable(syncClient "") -add_executable(pushClient "") add_executable(pushServer "") target_sources(transUT @@ -27,10 +26,6 @@ target_sources (syncClient "syncClient.c" ) -target_sources(pushClient - PRIVATE - "pushClient.c" -) target_sources(pushServer PRIVATE "pushServer.c" @@ -102,19 +97,6 @@ target_link_libraries (syncClient transport ) -target_include_directories(pushClient - PUBLIC - "${CMAKE_SOURCE_DIR}/include/libs/transport" - "${CMAKE_CURRENT_SOURCE_DIR}/../inc" -) -target_link_libraries (pushClient - os - util - common - gtest_main - transport -) - target_include_directories(pushServer PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/transport" diff --git a/source/libs/transport/test/pushClient.c b/source/libs/transport/test/pushClient.c deleted file mode 100644 index dc9914be35..0000000000 --- a/source/libs/transport/test/pushClient.c +++ /dev/null @@ -1,242 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ -#include - -#include -#include "os.h" -#include "rpcLog.h" -#include "taoserror.h" -#include "tglobal.h" -#include "trpc.h" -#include "tutil.h" - -typedef struct { - int index; - SEpSet epSet; - int num; - int numOfReqs; - int msgSize; - tsem_t rspSem; - tsem_t * pOverSem; - pthread_t thread; - void * pRpc; -} SInfo; -static void processResponse(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) { - SInfo *pInfo = (SInfo *)pMsg->ahandle; - tDebug("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, pMsg->msgType, pMsg->contLen, - pMsg->code); - - if (pEpSet) pInfo->epSet = *pEpSet; - - rpcFreeCont(pMsg->pCont); - // tsem_post(&pInfo->rspSem); - tsem_post(&pInfo->rspSem); -} - -static int tcount = 0; - -typedef struct SPushArg { - tsem_t sem; -} SPushArg; -// ping -int pushCallback(void *arg, SRpcMsg *msg) { - SPushArg *push = arg; - tsem_post(&push->sem); -} -SRpcPush *createPushArg() { - SRpcPush *push = calloc(1, sizeof(SRpcPush)); - push->arg = calloc(1, sizeof(SPushArg)); - - tsem_init(&(((SPushArg *)push->arg)->sem), 0, 0); - push->callback = pushCallback; - return push; -} -static void *sendRequest(void *param) { - SInfo * pInfo = (SInfo *)param; - SRpcMsg rpcMsg = {0}; - - tDebug("thread:%d, start to send request", pInfo->index); - - tDebug("thread:%d, reqs: %d", pInfo->index, pInfo->numOfReqs); - int u100 = 0; - int u500 = 0; - int u1000 = 0; - int u10000 = 0; - - while (pInfo->numOfReqs == 0 || pInfo->num < pInfo->numOfReqs) { - SRpcPush *push = createPushArg(); - pInfo->num++; - rpcMsg.pCont = rpcMallocCont(pInfo->msgSize); - rpcMsg.contLen = pInfo->msgSize; - rpcMsg.ahandle = pInfo; - rpcMsg.msgType = 1; - // rpcMsg.push = push; - // tDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num); - int64_t start = taosGetTimestampUs(); - rpcSendRequest(pInfo->pRpc, &pInfo->epSet, &rpcMsg, NULL); - if (pInfo->num % 20000 == 0) tInfo("thread:%d, %d requests have been sent", pInfo->index, pInfo->num); - tsem_wait(&pInfo->rspSem); // ping->pong - // tsem_wait(&pInfo->rspSem); - SPushArg *arg = push->arg; - /// e - tsem_wait(&arg->sem); // push callback - - // query_fetch(client->h) - int64_t end = taosGetTimestampUs() - start; - if (end <= 100) { - u100++; - } else if (end > 100 && end <= 500) { - u500++; - } else if (end > 500 && end < 1000) { - u1000++; - } else { - u10000++; - } - - tDebug("recv response succefully"); - - // usleep(100000000); - } - - tError("send and recv sum: %d, %d, %d, %d", u100, u500, u1000, u10000); - tDebug("thread:%d, it is over", pInfo->index); - tcount++; - - return NULL; -} - -int main(int argc, char *argv[]) { - SRpcInit rpcInit; - SEpSet epSet; - int msgSize = 128; - int numOfReqs = 0; - int appThreads = 1; - char serverIp[40] = "127.0.0.1"; - char secret[20] = "mypassword"; - struct timeval systemTime; - int64_t startTime, endTime; - pthread_attr_t thattr; - - // server info - epSet.inUse = 0; - addEpIntoEpSet(&epSet, serverIp, 7000); - addEpIntoEpSet(&epSet, "192.168.0.1", 7000); - - // client info - memset(&rpcInit, 0, sizeof(rpcInit)); - rpcInit.localPort = 0; - rpcInit.label = "APP"; - rpcInit.numOfThreads = 1; - rpcInit.cfp = processResponse; - rpcInit.sessions = 100; - rpcInit.idleTime = 100; - rpcInit.user = "michael"; - rpcInit.secret = secret; - rpcInit.ckey = "key"; - rpcInit.spi = 1; - rpcInit.connType = TAOS_CONN_CLIENT; - - for (int i = 1; i < argc; ++i) { - if (strcmp(argv[i], "-p") == 0 && i < argc - 1) { - epSet.eps[0].port = atoi(argv[++i]); - } else if (strcmp(argv[i], "-i") == 0 && i < argc - 1) { - tstrncpy(epSet.eps[0].fqdn, argv[++i], sizeof(epSet.eps[0].fqdn)); - } else if (strcmp(argv[i], "-t") == 0 && i < argc - 1) { - rpcInit.numOfThreads = atoi(argv[++i]); - } else if (strcmp(argv[i], "-m") == 0 && i < argc - 1) { - msgSize = atoi(argv[++i]); - } else if (strcmp(argv[i], "-s") == 0 && i < argc - 1) { - rpcInit.sessions = atoi(argv[++i]); - } else if (strcmp(argv[i], "-n") == 0 && i < argc - 1) { - numOfReqs = atoi(argv[++i]); - } else if (strcmp(argv[i], "-a") == 0 && i < argc - 1) { - appThreads = atoi(argv[++i]); - } else if (strcmp(argv[i], "-o") == 0 && i < argc - 1) { - tsCompressMsgSize = atoi(argv[++i]); - } else if (strcmp(argv[i], "-u") == 0 && i < argc - 1) { - rpcInit.user = argv[++i]; - } else if (strcmp(argv[i], "-k") == 0 && i < argc - 1) { - rpcInit.secret = argv[++i]; - } else if (strcmp(argv[i], "-spi") == 0 && i < argc - 1) { - rpcInit.spi = atoi(argv[++i]); - } else if (strcmp(argv[i], "-d") == 0 && i < argc - 1) { - rpcDebugFlag = atoi(argv[++i]); - } else { - printf("\nusage: %s [options] \n", argv[0]); - printf(" [-i ip]: first server IP address, default is:%s\n", serverIp); - printf(" [-p port]: server port number, default is:%d\n", epSet.eps[0].port); - printf(" [-t threads]: number of rpc threads, default is:%d\n", rpcInit.numOfThreads); - printf(" [-s sessions]: number of rpc sessions, default is:%d\n", rpcInit.sessions); - printf(" [-m msgSize]: message body size, default is:%d\n", msgSize); - printf(" [-a threads]: number of app threads, default is:%d\n", appThreads); - printf(" [-n requests]: number of requests per thread, default is:%d\n", numOfReqs); - printf(" [-o compSize]: compression message size, default is:%d\n", tsCompressMsgSize); - printf(" [-u user]: user name for the connection, default is:%s\n", rpcInit.user); - printf(" [-k secret]: password for the connection, default is:%s\n", rpcInit.secret); - printf(" [-spi SPI]: security parameter index, default is:%d\n", rpcInit.spi); - printf(" [-d debugFlag]: debug flag, default:%d\n", rpcDebugFlag); - printf(" [-h help]: print out this help\n\n"); - exit(0); - } - } - - taosInitLog("client.log", 10); - - void *pRpc = rpcOpen(&rpcInit); - if (pRpc == NULL) { - tError("failed to initialize RPC"); - return -1; - } - - tInfo("client is initialized"); - tInfo("threads:%d msgSize:%d requests:%d", appThreads, msgSize, numOfReqs); - - gettimeofday(&systemTime, NULL); - startTime = systemTime.tv_sec * 1000000 + systemTime.tv_usec; - - SInfo *pInfo = (SInfo *)calloc(1, sizeof(SInfo) * appThreads); - - pthread_attr_init(&thattr); - pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); - - for (int i = 0; i < appThreads; ++i) { - pInfo->index = i; - pInfo->epSet = epSet; - pInfo->numOfReqs = numOfReqs; - pInfo->msgSize = msgSize; - tsem_init(&pInfo->rspSem, 0, 0); - pInfo->pRpc = pRpc; - pthread_create(&pInfo->thread, &thattr, sendRequest, pInfo); - pInfo++; - } - - do { - usleep(1); - } while (tcount < appThreads); - - gettimeofday(&systemTime, NULL); - endTime = systemTime.tv_sec * 1000000 + systemTime.tv_usec; - float usedTime = (endTime - startTime) / 1000.0f; // mseconds - - tInfo("it takes %.3f mseconds to send %d requests to server", usedTime, numOfReqs * appThreads); - tInfo("Performance: %.3f requests per second, msgSize:%d bytes", 1000.0 * numOfReqs * appThreads / usedTime, msgSize); - - int ch = getchar(); - UNUSED(ch); - - taosCloseLog(); - - return 0; -} From 30f602fae0f406f76950ecea5ec2ec5ad7b90bc8 Mon Sep 17 00:00:00 2001 From: ubuntu Date: Sat, 12 Mar 2022 13:59:21 +0800 Subject: [PATCH 4/5] update UT test --- source/libs/transport/src/transCli.c | 197 ++++++++++++++------------- 1 file changed, 99 insertions(+), 98 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 834337d0ff..727845b7a9 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -63,12 +63,12 @@ typedef struct SCliThrdObj { bool quit; } SCliThrdObj; -typedef struct SClientObj { +typedef struct SCliObj { char label[TSDB_LABEL_LEN]; int32_t index; int numOfThreads; SCliThrdObj** pThreadObj; -} SClientObj; +} SCliObj; typedef struct SConnList { queue conn; @@ -82,32 +82,32 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port); static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* conn); // register timer in each thread to clear expire conn -static void clientTimeoutCb(uv_timer_t* handle); -// alloc buf for read -static void clientAllocBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); -// callback after read nbytes from socket -static void clientRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf); +static void cliTimeoutCb(uv_timer_t* handle); +// alloc buf for recv +static void cliAllocBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); +// callback after read nbytes from socket +static void cliRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf); // callback after write data to socket -static void clientSendDataCb(uv_write_t* req, int status); +static void cliSendCb(uv_write_t* req, int status); // callback after conn to server -static void clientConnCb(uv_connect_t* req, int status); -static void clientAsyncCb(uv_async_t* handle); +static void cliConnCb(uv_connect_t* req, int status); +static void cliAsyncCb(uv_async_t* handle); -static SCliConn* clientConnCreate(SCliThrdObj* thrd); -static void clientConnDestroy(SCliConn* pConn, bool clear /*clear tcp handle or not*/); -static void clientDestroy(uv_handle_t* handle); +static SCliConn* cliCreateConn(SCliThrdObj* thrd); +static void cliDestroyConn(SCliConn* pConn, bool clear /*clear tcp handle or not*/); +static void cliDestroy(uv_handle_t* handle); // process data read from server, add decompress etc later -static void clientHandleResp(SCliConn* conn); +static void cliHandleResp(SCliConn* conn); // handle except about conn -static void clientHandleExcept(SCliConn* conn); +static void cliHandleExcept(SCliConn* conn); // handle req from app -static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd); -static void clientHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd); -static void clientSendQuit(SCliThrdObj* thrd); +static void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd); +static void cliHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd); +static void cliSendQuit(SCliThrdObj* thrd); static void destroyUserdata(SRpcMsg* userdata); -static int clientRBChoseIdx(SRpcInfo* pTransInst); +static int cliRBChoseIdx(SRpcInfo* pTransInst); static void destroyCmsg(SCliMsg* cmsg); static void transDestroyConnCtx(STransConnCtx* ctx); @@ -122,7 +122,7 @@ static void destroyThrdObj(SCliThrdObj* pThrd); #define CONN_HANDLE_THREAD_QUIT(conn, thrd) \ do { \ if (thrd->quit) { \ - clientHandleExcept(conn); \ + cliHandleExcept(conn); \ goto _RETURE; \ } \ } while (0) @@ -130,15 +130,15 @@ static void destroyThrdObj(SCliThrdObj* pThrd); #define CONN_HANDLE_BROKEN(conn) \ do { \ if (conn->broken) { \ - clientHandleExcept(conn); \ + cliHandleExcept(conn); \ goto _RETURE; \ } \ } while (0); -static void* clientThread(void* arg); +static void* cliWorkThread(void* arg); -static void* clientNotifyApp() {} -static void clientHandleResp(SCliConn* conn) { +static void* cliNotifyApp() {} +static void cliHandleResp(SCliConn* conn) { SCliMsg* pMsg = conn->data; STransConnCtx* pCtx = pMsg->ctx; @@ -164,25 +164,25 @@ static void clientHandleResp(SCliConn* conn) { transRefCliHandle(conn); conn->persist = 1; - tDebug("client conn %p persist by app", conn); + tDebug("cli conn %p persist by app", conn); } - tDebug("%s client conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pTransInst->label, conn, + tDebug("%s cli conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pTransInst->label, conn, TMSG_INFO(pHead->msgType), inet_ntoa(conn->addr.sin_addr), ntohs(conn->addr.sin_port), inet_ntoa(conn->locaddr.sin_addr), ntohs(conn->locaddr.sin_port), rpcMsg.contLen); conn->secured = pHead->secured; if (pCtx->pSem == NULL) { - tTrace("%s client conn %p handle resp", pTransInst->label, conn); + tTrace("%s cli conn %p handle resp", pTransInst->label, conn); (pTransInst->cfp)(pTransInst->parent, &rpcMsg, NULL); } else { - tTrace("%s client conn(sync) %p handle resp", pTransInst->label, conn); + tTrace("%s cli conn(sync) %p handle resp", pTransInst->label, conn); memcpy((char*)pCtx->pRsp, (char*)&rpcMsg, sizeof(rpcMsg)); tsem_post(pCtx->pSem); } - uv_read_start((uv_stream_t*)conn->stream, clientAllocBufferCb, clientRecvCb); + uv_read_start((uv_stream_t*)conn->stream, cliAllocBufferCb, cliRecvCb); // user owns conn->persist = 1 if (conn->persist == 0) { @@ -193,10 +193,10 @@ static void clientHandleResp(SCliConn* conn) { // start thread's timer of conn pool if not active if (!uv_is_active((uv_handle_t*)&pThrd->timer) && pTransInst->idleTime > 0) { - // uv_timer_start((uv_timer_t*)&pThrd->timer, clientTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0); + // uv_timer_start((uv_timer_t*)&pThrd->timer, cliTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0); } } -static void clientHandleExcept(SCliConn* pConn) { +static void cliHandleExcept(SCliConn* pConn) { if (pConn->data == NULL) { // handle conn except in conn pool transUnrefCliHandle(pConn); @@ -214,25 +214,25 @@ static void clientHandleExcept(SCliConn* pConn) { rpcMsg.msgType = pMsg->msg.msgType + 1; if (pCtx->pSem == NULL) { - tTrace("%s client conn %p handle resp", pTransInst->label, pConn); + tTrace("%s cli conn %p handle resp", pTransInst->label, pConn); (pTransInst->cfp)(pTransInst->parent, &rpcMsg, NULL); } else { - tTrace("%s client conn(sync) %p handle resp", pTransInst->label, pConn); + tTrace("%s cli conn(sync) %p handle resp", pTransInst->label, pConn); memcpy((char*)(pCtx->pRsp), (char*)(&rpcMsg), sizeof(rpcMsg)); tsem_post(pCtx->pSem); } destroyCmsg(pConn->data); pConn->data = NULL; - tTrace("%s client conn %p start to destroy", CONN_GET_INST_LABEL(pConn), pConn); + tTrace("%s cli conn %p start to destroy", CONN_GET_INST_LABEL(pConn), pConn); transUnrefCliHandle(pConn); } -static void clientTimeoutCb(uv_timer_t* handle) { +static void cliTimeoutCb(uv_timer_t* handle) { SCliThrdObj* pThrd = handle->data; SRpcInfo* pRpc = pThrd->pTransInst; int64_t currentTime = pThrd->nextTimeout; - tTrace("%s, client conn timeout, try to remove expire conn from conn pool", pRpc->label); + tTrace("%s, cli conn timeout, try to remove expire conn from conn pool", pRpc->label); SConnList* p = taosHashIterate((SHashObj*)pThrd->pool, NULL); while (p != NULL) { @@ -250,7 +250,7 @@ static void clientTimeoutCb(uv_timer_t* handle) { } pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime); - uv_timer_start(handle, clientTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0); + uv_timer_start(handle, cliTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0); } static void* createConnPool(int size) { // thread local, no lock @@ -263,7 +263,7 @@ static void* destroyConnPool(void* pool) { queue* h = QUEUE_HEAD(&connList->conn); QUEUE_REMOVE(h); SCliConn* c = QUEUE_DATA(h, SCliConn, conn); - clientConnDestroy(c, true); + cliDestroyConn(c, true); } connList = taosHashIterate((SHashObj*)pool, connList); } @@ -299,7 +299,7 @@ static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* conn) { tstrncpy(key, ip, strlen(ip)); tstrncpy(key + strlen(key), (char*)(&port), sizeof(port)); - tTrace("client conn %p added to conn pool, read buf cap: %d", conn, conn->readBuf.cap); + tTrace("cli conn %p added to conn pool, read buf cap: %d", conn, conn->readBuf.cap); SRpcInfo* pRpc = ((SCliThrdObj*)conn->hostThrd)->pTransInst; @@ -309,12 +309,12 @@ static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* conn) { assert(plist != NULL); QUEUE_PUSH(&plist->conn, &conn->conn); } -static void clientAllocBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { +static void cliAllocBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { SCliConn* conn = handle->data; SConnBuffer* pBuf = &conn->readBuf; transAllocBuffer(pBuf, buf); } -static void clientRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) { +static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) { // impl later if (handle->data == NULL) { return; @@ -324,10 +324,10 @@ static void clientRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf if (nread > 0) { pBuf->len += nread; if (transReadComplete(pBuf)) { - tTrace("%s client conn %p read complete", CONN_GET_INST_LABEL(conn), conn); - clientHandleResp(conn); + tTrace("%s cli conn %p read complete", CONN_GET_INST_LABEL(conn), conn); + cliHandleResp(conn); } else { - tTrace("%s client conn %p read partial packet, continue to read", CONN_GET_INST_LABEL(conn), conn); + tTrace("%s cli conn %p read partial packet, continue to read", CONN_GET_INST_LABEL(conn), conn); } return; } @@ -340,13 +340,13 @@ static void clientRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf return; } if (nread < 0) { - tError("%s client conn %p read error: %s", CONN_GET_INST_LABEL(conn), conn, uv_err_name(nread)); + tError("%s cli conn %p read error: %s", CONN_GET_INST_LABEL(conn), conn, uv_err_name(nread)); conn->broken = true; - clientHandleExcept(conn); + cliHandleExcept(conn); } } -static SCliConn* clientConnCreate(SCliThrdObj* pThrd) { +static SCliConn* cliCreateConn(SCliThrdObj* pThrd) { SCliConn* conn = calloc(1, sizeof(SCliConn)); // read/write stream handle conn->stream = (uv_stream_t*)malloc(sizeof(uv_tcp_t)); @@ -362,40 +362,40 @@ static SCliConn* clientConnCreate(SCliThrdObj* pThrd) { transRefCliHandle(conn); return conn; } -static void clientConnDestroy(SCliConn* conn, bool clear) { - tTrace("%s client conn %p remove from conn pool", CONN_GET_INST_LABEL(conn), conn); +static void cliDestroyConn(SCliConn* conn, bool clear) { + tTrace("%s cli conn %p remove from conn pool", CONN_GET_INST_LABEL(conn), conn); QUEUE_REMOVE(&conn->conn); if (clear) { - uv_close((uv_handle_t*)conn->stream, clientDestroy); + uv_close((uv_handle_t*)conn->stream, cliDestroy); } } -static void clientDestroy(uv_handle_t* handle) { +static void cliDestroy(uv_handle_t* handle) { SCliConn* conn = handle->data; free(conn->stream); - tTrace("%s client conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn); + tTrace("%s cli conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn); free(conn); } -static void clientSendDataCb(uv_write_t* req, int status) { +static void cliSendCb(uv_write_t* req, int status) { SCliConn* pConn = req->data; if (status == 0) { - tTrace("%s client conn %p data already was written out", CONN_GET_INST_LABEL(pConn), pConn); + tTrace("%s cli conn %p data already was written out", CONN_GET_INST_LABEL(pConn), pConn); SCliMsg* pMsg = pConn->data; if (pMsg == NULL) { return; } destroyUserdata(&pMsg->msg); } else { - tError("%s client conn %p failed to write: %s", CONN_GET_INST_LABEL(pConn), pConn, uv_err_name(status)); - clientHandleExcept(pConn); + tError("%s cli conn %p failed to write: %s", CONN_GET_INST_LABEL(pConn), pConn, uv_err_name(status)); + cliHandleExcept(pConn); return; } - uv_read_start((uv_stream_t*)pConn->stream, clientAllocBufferCb, clientRecvCb); + uv_read_start((uv_stream_t*)pConn->stream, cliAllocBufferCb, cliRecvCb); } -static void clientSendData(SCliConn* pConn) { +static void cliSend(SCliConn* pConn) { CONN_HANDLE_BROKEN(pConn); SCliMsg* pCliMsg = pConn->data; @@ -432,22 +432,22 @@ static void clientSendData(SCliConn* pConn) { pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); uv_buf_t wb = uv_buf_init((char*)pHead, msgLen); - tDebug("%s client conn %p %s is send to %s:%d, local info %s:%d", CONN_GET_INST_LABEL(pConn), pConn, + tDebug("%s cli conn %p %s is send to %s:%d, local info %s:%d", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pHead->msgType), inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), inet_ntoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port)); - uv_write(&pConn->writeReq, (uv_stream_t*)pConn->stream, &wb, 1, clientSendDataCb); + uv_write(&pConn->writeReq, (uv_stream_t*)pConn->stream, &wb, 1, cliSendCb); return; _RETURE: return; } -static void clientConnCb(uv_connect_t* req, int status) { +static void cliConnCb(uv_connect_t* req, int status) { // impl later SCliConn* pConn = req->data; if (status != 0) { - tError("%s client conn %p failed to connect server: %s", CONN_GET_INST_LABEL(pConn), pConn, uv_strerror(status)); - clientHandleExcept(pConn); + tError("%s cli conn %p failed to connect server: %s", CONN_GET_INST_LABEL(pConn), pConn, uv_strerror(status)); + cliHandleExcept(pConn); return; } int addrlen = sizeof(pConn->addr); @@ -456,14 +456,14 @@ static void clientConnCb(uv_connect_t* req, int status) { addrlen = sizeof(pConn->locaddr); uv_tcp_getsockname((uv_tcp_t*)pConn->stream, (struct sockaddr*)&pConn->locaddr, &addrlen); - tTrace("%s client conn %p connect to server successfully", CONN_GET_INST_LABEL(pConn), pConn); + tTrace("%s cli conn %p connect to server successfully", CONN_GET_INST_LABEL(pConn), pConn); assert(pConn->stream == req->handle); - clientSendData(pConn); + cliSend(pConn); } -static void clientHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd) { - tDebug("client work thread %p start to quit", pThrd); +static void cliHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd) { + tDebug("cli work thread %p start to quit", pThrd); destroyCmsg(pMsg); destroyConnPool(pThrd->pool); @@ -472,57 +472,57 @@ static void clientHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd) { pThrd->quit = true; uv_stop(pThrd->loop); } -static SCliConn* clientGetConn(SCliMsg* pMsg, SCliThrdObj* pThrd) { +static SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrdObj* pThrd) { SCliConn* conn = NULL; if (pMsg->msg.handle != NULL) { conn = (SCliConn*)(pMsg->msg.handle); transUnrefCliHandle(conn); if (conn != NULL) { - tTrace("%s client conn %p reused", CONN_GET_INST_LABEL(conn), conn); + tTrace("%s cli conn %p reused", CONN_GET_INST_LABEL(conn), conn); } } else { STransConnCtx* pCtx = pMsg->ctx; conn = getConnFromPool(pThrd->pool, pCtx->ip, pCtx->port); - if (conn != NULL) tTrace("%s client conn %p get from conn pool", CONN_GET_INST_LABEL(conn), conn); + if (conn != NULL) tTrace("%s cli conn %p get from conn pool", CONN_GET_INST_LABEL(conn), conn); } return conn; } -static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { +static void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { uint64_t et = taosGetTimestampUs(); uint64_t el = et - pMsg->st; - tTrace("%s client msg tran time cost: %" PRIu64 "us", ((SRpcInfo*)pThrd->pTransInst)->label, el); + tTrace("%s cli msg tran time cost: %" PRIu64 "us", ((SRpcInfo*)pThrd->pTransInst)->label, el); STransConnCtx* pCtx = pMsg->ctx; SRpcInfo* pTransInst = pThrd->pTransInst; - SCliConn* conn = clientGetConn(pMsg, pThrd); + SCliConn* conn = cliGetConn(pMsg, pThrd); if (conn != NULL) { conn->data = pMsg; transDestroyBuffer(&conn->readBuf); - clientSendData(conn); + cliSend(conn); } else { - conn = clientConnCreate(pThrd); + conn = cliCreateConn(pThrd); conn->data = pMsg; int ret = transSetConnOption((uv_tcp_t*)conn->stream); if (ret) { - tError("%s client conn %p failed to set conn option, errmsg %s", pTransInst->label, conn, uv_err_name(ret)); + tError("%s cli conn %p failed to set conn option, errmsg %s", pTransInst->label, conn, uv_err_name(ret)); } struct sockaddr_in addr; uv_ip4_addr(pMsg->ctx->ip, pMsg->ctx->port, &addr); // handle error in callback if fail to connect - tTrace("%s client conn %p try to connect to %s:%d", pTransInst->label, conn, pMsg->ctx->ip, pMsg->ctx->port); - uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, clientConnCb); + tTrace("%s cli conn %p try to connect to %s:%d", pTransInst->label, conn, pMsg->ctx->ip, pMsg->ctx->port); + uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb); } conn->hThrdIdx = pCtx->hThrdIdx; } -static void clientAsyncCb(uv_async_t* handle) { +static void cliAsyncCb(uv_async_t* handle) { SAsyncItem* item = handle->data; SCliThrdObj* pThrd = item->pThrd; SCliMsg* pMsg = NULL; - queue wq; // batch process to avoid to lock/unlock frequently + queue wq; pthread_mutex_lock(&item->mtx); QUEUE_MOVE(&item->qmsg, &wq); pthread_mutex_unlock(&item->mtx); @@ -534,25 +534,25 @@ static void clientAsyncCb(uv_async_t* handle) { SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q); if (pMsg->ctx == NULL) { - clientHandleQuit(pMsg, pThrd); + cliHandleQuit(pMsg, pThrd); } else { - clientHandleReq(pMsg, pThrd); + cliHandleReq(pMsg, pThrd); } count++; } if (count >= 2) { - tTrace("client process batch size: %d", count); + tTrace("cli process batch size: %d", count); } } -static void* clientThread(void* arg) { +static void* cliWorkThread(void* arg) { SCliThrdObj* pThrd = (SCliThrdObj*)arg; - setThreadName("trans-client-work"); + setThreadName("trans-cli-work"); uv_run(pThrd->loop, UV_RUN_DEFAULT); } void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) { - SClientObj* cli = calloc(1, sizeof(SClientObj)); + SCliObj* cli = calloc(1, sizeof(SCliObj)); SRpcInfo* pRpc = shandle; memcpy(cli->label, label, strlen(label)); @@ -564,9 +564,9 @@ void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime); pThrd->pTransInst = shandle; - int err = pthread_create(&pThrd->thread, NULL, clientThread, (void*)(pThrd)); + int err = pthread_create(&pThrd->thread, NULL, cliWorkThread, (void*)(pThrd)); if (err == 0) { - tDebug("success to create tranport-client thread %d", i); + tDebug("success to create tranport-cli thread %d", i); } cli->pThreadObj[i] = pThrd; } @@ -591,13 +591,14 @@ static void destroyCmsg(SCliMsg* pMsg) { static SCliThrdObj* createThrdObj() { SCliThrdObj* pThrd = (SCliThrdObj*)calloc(1, sizeof(SCliThrdObj)); + QUEUE_INIT(&pThrd->msg); pthread_mutex_init(&pThrd->msgMtx, NULL); pThrd->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t)); uv_loop_init(pThrd->loop); - pThrd->asyncPool = transCreateAsyncPool(pThrd->loop, 5, pThrd, clientAsyncCb); + pThrd->asyncPool = transCreateAsyncPool(pThrd->loop, 5, pThrd, cliAsyncCb); uv_timer_init(pThrd->loop, &pThrd->timer); pThrd->timer.data = pThrd; @@ -628,21 +629,21 @@ static void transDestroyConnCtx(STransConnCtx* ctx) { free(ctx); } // -static void clientSendQuit(SCliThrdObj* thrd) { +static void cliSendQuit(SCliThrdObj* thrd) { // cli can stop gracefully SCliMsg* msg = calloc(1, sizeof(SCliMsg)); transSendAsync(thrd->asyncPool, &msg->q); } void taosCloseClient(void* arg) { - SClientObj* cli = arg; + SCliObj* cli = arg; for (int i = 0; i < cli->numOfThreads; i++) { - clientSendQuit(cli->pThreadObj[i]); + cliSendQuit(cli->pThreadObj[i]); destroyThrdObj(cli->pThreadObj[i]); } free(cli->pThreadObj); free(cli); } -static int clientRBChoseIdx(SRpcInfo* pTransInst) { +static int cliRBChoseIdx(SRpcInfo* pTransInst) { int64_t index = pTransInst->index; if (pTransInst->index++ >= pTransInst->numOfThreads) { pTransInst->index = 0; @@ -662,7 +663,7 @@ void transUnrefCliHandle(void* handle) { } int ref = T_REF_DEC((SCliConn*)handle); if (ref == 0) { - clientConnDestroy((SCliConn*)handle, true); + cliDestroyConn((SCliConn*)handle, true); } // unref cli handle @@ -676,7 +677,7 @@ void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* int index = CONN_HOST_THREAD_INDEX(pMsg->handle); if (index == -1) { - index = clientRBChoseIdx(pTransInst); + index = cliRBChoseIdx(pTransInst); } int32_t flen = 0; if (transCompressMsg(pMsg->pCont, pMsg->contLen, &flen)) { @@ -697,7 +698,7 @@ void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* cliMsg->msg = *pMsg; cliMsg->st = taosGetTimestampUs(); - SCliThrdObj* thrd = ((SClientObj*)pTransInst->tcphandle)->pThreadObj[index]; + SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[index]; transSendAsync(thrd->asyncPool, &(cliMsg->q)); } @@ -709,7 +710,7 @@ void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pReq, SRpcMsg* pRsp) { int index = CONN_HOST_THREAD_INDEX(pReq->handle); if (index == -1) { - index = clientRBChoseIdx(pTransInst); + index = cliRBChoseIdx(pTransInst); } STransConnCtx* pCtx = calloc(1, sizeof(STransConnCtx)); @@ -727,7 +728,7 @@ void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pReq, SRpcMsg* pRsp) { cliMsg->msg = *pReq; cliMsg->st = taosGetTimestampUs(); - SCliThrdObj* thrd = ((SClientObj*)pTransInst->tcphandle)->pThreadObj[index]; + SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[index]; transSendAsync(thrd->asyncPool, &(cliMsg->q)); tsem_t* pSem = pCtx->pSem; tsem_wait(pSem); From 5f376274be94397fd9644601923ff88c56292a02 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 12 Mar 2022 15:28:15 +0800 Subject: [PATCH 5/5] [td-13039] fix bug in project query. --- source/libs/executor/inc/executorimpl.h | 3 ++- source/libs/executor/src/executorimpl.c | 32 ++++++++++++++++++------- 2 files changed, 25 insertions(+), 10 deletions(-) diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index d0c1a3f0b5..ee841e3ce9 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -334,9 +334,10 @@ typedef struct SOperatorInfo { struct SOperatorInfo** pDownstream; // downstram pointer list int32_t numOfDownstream; // number of downstream. The value is always ONE expect for join operator - __optr_open_fn_t openFn; __optr_fn_t getNextFn; + __optr_fn_t cleanupFn; __optr_close_fn_t closeFn; + __optr_open_fn_t _openFn; // DO NOT invoke this function directly } SOperatorInfo; typedef struct { diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index ec220098ac..420d70233c 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -224,9 +224,12 @@ static void doSetOperatorCompleted(SOperatorInfo* pOperator) { } } +#define OPTR_IS_OPENED(_optr) (((_optr)->status & OP_OPENED) == OP_OPENED) +#define OPTR_SET_OPENED(_optr) ((_optr)->status |= OP_OPENED) + static int32_t operatorDummyOpenFn(void* param) { SOperatorInfo* pOperator = (SOperatorInfo*) param; - pOperator->status = OP_OPENED; + OPTR_SET_OPENED(pOperator); return TSDB_CODE_SUCCESS; } @@ -4734,6 +4737,11 @@ static SSDataBlock* doTableScan(void* param, bool *newgroup) { STableScanInfo *pTableScanInfo = pOperator->info; SExecTaskInfo *pTaskInfo = pOperator->pTaskInfo; + pTaskInfo->code = pOperator->_openFn(pOperator); + if (pTaskInfo->code != TSDB_CODE_SUCCESS) { + return NULL; + } + // The read handle is not initialized yet, since no qualified tables exists if (pTableScanInfo->pTsdbReadHandle == NULL) { return NULL; @@ -4851,6 +4859,11 @@ static SSDataBlock* doStreamBlockScan(void* param, bool* newgroup) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SStreamBlockScanInfo* pInfo = pOperator->info; + pTaskInfo->code = pOperator->_openFn(pOperator); + if (pTaskInfo->code != TSDB_CODE_SUCCESS) { + return NULL; + } + SDataBlockInfo* pBlockInfo = &pInfo->pRes->info; pBlockInfo->rows = 0; @@ -5159,7 +5172,7 @@ static SSDataBlock* seqLoadRemoteData(SOperatorInfo *pOperator) { static int32_t prepareLoadRemoteData(void* param) { SOperatorInfo *pOperator = (SOperatorInfo*) param; - if ((pOperator->status & OP_OPENED) == OP_OPENED) { + if (OPTR_IS_OPENED(pOperator)) { return TSDB_CODE_SUCCESS; } @@ -5173,7 +5186,7 @@ static int32_t prepareLoadRemoteData(void* param) { } } - pOperator->status = OP_OPENED; + OPTR_SET_OPENED(pOperator); return TSDB_CODE_SUCCESS; } @@ -5183,12 +5196,13 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) { SExchangeInfo *pExchangeInfo = pOperator->info; SExecTaskInfo *pTaskInfo = pOperator->pTaskInfo; - size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources); - if ((pOperator->status & OP_OPENED) != OP_OPENED) { - pTaskInfo->code = TSDB_CODE_QRY_IN_EXEC; // TODO add a new error code + int32_t code = pOperator->_openFn(pOperator); + if (code != TSDB_CODE_SUCCESS) { + pTaskInfo->code = code; return NULL; } + size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources); if (pOperator->status == OP_EXEC_DONE) { qDebug("%s all %"PRIzu" source(s) are exhausted, total rows:%"PRIu64" bytes:%"PRIu64", elapsed:%.2f ms", GET_TASKID(pTaskInfo), totalSources, pExchangeInfo->totalRows, pExchangeInfo->totalSize, pExchangeInfo->totalElapsed/1000.0); @@ -5286,7 +5300,7 @@ SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock pOperator->info = pInfo; pOperator->numOfOutput = size; pOperator->pTaskInfo = pTaskInfo; - pOperator->openFn = prepareLoadRemoteData; // assign a dummy function. + pOperator->_openFn = prepareLoadRemoteData; // assign a dummy function. pOperator->getNextFn = doLoadRemoteData; pOperator->closeFn = destroyExchangeOperatorInfo; @@ -5377,7 +5391,7 @@ SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; pOperator->numOfOutput = numOfOutput; - pOperator->openFn = operatorDummyOpenFn; + pOperator->_openFn = operatorDummyOpenFn; pOperator->getNextFn = doTableScan; pOperator->closeFn = operatorDummyCloseFn; pOperator->pTaskInfo = pTaskInfo; @@ -5461,7 +5475,7 @@ SOperatorInfo* createStreamScanOperatorInfo(void *streamReadHandle, SSDataBlock* pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; pOperator->numOfOutput = pResBlock->info.numOfCols; - pOperator->openFn = operatorDummyOpenFn; + pOperator->_openFn = operatorDummyOpenFn; pOperator->getNextFn = doStreamBlockScan; pOperator->closeFn = operatorDummyCloseFn;