From 945de54b688fee2a098f022bffcd58313f6b1570 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 7 Apr 2022 14:51:52 +0800 Subject: [PATCH] [td-14493] add new api. --- include/client/taos.h | 2 + source/client/src/clientMain.c | 40 +++++++++++-- source/libs/executor/inc/executorimpl.h | 6 +- source/libs/executor/src/executorimpl.c | 49 ++++++++-------- source/libs/executor/src/groupoperator.c | 74 ++++++++++++++++-------- 5 files changed, 117 insertions(+), 54 deletions(-) diff --git a/include/client/taos.h b/include/client/taos.h index d537943b82..d3856d432e 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -188,6 +188,8 @@ DLL_EXPORT void taos_stop_query(TAOS_RES *res); DLL_EXPORT bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col); DLL_EXPORT bool taos_is_update_query(TAOS_RES *res); DLL_EXPORT int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows); +DLL_EXPORT int taos_fetch_block_s(TAOS_RES *res, int* numOfRows, TAOS_ROW *rows); +DLL_EXPORT int taos_fetch_raw_block(TAOS_RES *res, int* numOfRows, void** pData); DLL_EXPORT int *taos_get_column_data_offset(TAOS_RES *res, int columnIndex); DLL_EXPORT int taos_validate_sql(TAOS *taos, const char *sql); DLL_EXPORT void taos_reset_current_db(TAOS *taos); diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 6fe41176da..e10cf5179e 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -385,6 +385,37 @@ bool taos_is_update_query(TAOS_RES *res) { } int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) { + int32_t numOfRows = 0; + /*int32_t code = */taos_fetch_block_s(res, &numOfRows, rows); + return numOfRows; +} + +int taos_fetch_block_s(TAOS_RES *res, int* numOfRows, TAOS_ROW *rows) { + SRequestObj *pRequest = (SRequestObj *)res; + if (pRequest == NULL) { + return 0; + } + + (*rows) = NULL; + (*numOfRows) = 0; + + if (pRequest->type == TSDB_SQL_RETRIEVE_EMPTY_RESULT || pRequest->type == TSDB_SQL_INSERT || + pRequest->code != TSDB_CODE_SUCCESS || taos_num_fields(res) == 0) { + return 0; + } + + doFetchRow(pRequest, false); + + // TODO refactor + SReqResultInfo *pResultInfo = &pRequest->body.resInfo; + pResultInfo->current = pResultInfo->numOfRows; + + (*rows) = pResultInfo->row; + (*numOfRows) = pResultInfo->numOfRows; + return pRequest->code; +} + +int taos_fetch_raw_block(TAOS_RES *res, int* numOfRows, void** pData) { SRequestObj *pRequest = (SRequestObj *)res; if (pRequest == NULL) { return 0; @@ -397,12 +428,13 @@ int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) { doFetchRow(pRequest, false); - // TODO refactor SReqResultInfo *pResultInfo = &pRequest->body.resInfo; - pResultInfo->current = pResultInfo->numOfRows; - *rows = pResultInfo->row; - return pResultInfo->numOfRows; + pResultInfo->current = pResultInfo->numOfRows; + (*numOfRows) = pResultInfo->numOfRows; + (*pData) = (void*) pResultInfo->pData; + + return 0; } int *taos_get_column_data_offset(TAOS_RES *res, int columnIndex) { diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index ac8cd82213..57edc40007 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -641,6 +641,8 @@ int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadI uint64_t* total, SArray* pColList); void doSetOperatorCompleted(SOperatorInfo* pOperator); void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock); +SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity); +SSDataBlock* loadNextDataBlock(void* param); SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo); SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfCols, int32_t repeatTime, @@ -665,8 +667,8 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExp int32_t fillType, char* fillVal, bool multigroupResult, SExecTaskInfo* pTaskInfo); SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, - SArray* pGroupColList, SNode* pCondition, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); +SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResultBlock, SArray* pSortInfo, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); + #if 0 SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv); SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index c5dca0c77d..c73886b5b0 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4737,8 +4737,7 @@ static void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHan pBlock->info.rows += 1; } -static SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, bool hasVarCol, - int32_t capacity) { +SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity) { blockDataCleanup(pDataBlock); while (1) { @@ -4759,7 +4758,6 @@ static SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataB SSDataBlock* loadNextDataBlock(void* param) { SOperatorInfo* pOperator = (SOperatorInfo*)param; bool newgroup = false; - return pOperator->getNextFn(pOperator, &newgroup); } @@ -4939,7 +4937,7 @@ static SSDataBlock* doSortedMerge(SOperatorInfo* pOperator, bool* newgroup) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SSortedMergeOperatorInfo* pInfo = pOperator->info; if (pOperator->status == OP_RES_TO_RETURN) { - return getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pInfo->hasVarCol, pInfo->binfo.capacity); + return getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pInfo->binfo.capacity); } int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize; @@ -5084,15 +5082,14 @@ static SSDataBlock* doSort(SOperatorInfo* pOperator, bool* newgroup) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SSortOperatorInfo* pInfo = pOperator->info; - bool hasVarCol = pInfo->pDataBlock->info.hasVarCol; if (pOperator->status == OP_RES_TO_RETURN) { - return getSortedBlockData(pInfo->pSortHandle, pInfo->pDataBlock, hasVarCol, pInfo->numOfRowsInRes); + return getSortedBlockData(pInfo->pSortHandle, pInfo->pDataBlock, pInfo->numOfRowsInRes); } int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize; pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, pInfo->bufPageSize, numOfBufPage, - pInfo->pDataBlock, "GET_TASKID(pTaskInfo)"); + pInfo->pDataBlock, pTaskInfo->id.str); tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock); @@ -5106,38 +5103,40 @@ static SSDataBlock* doSort(SOperatorInfo* pOperator, bool* newgroup) { } pOperator->status = OP_RES_TO_RETURN; - return getSortedBlockData(pInfo->pSortHandle, pInfo->pDataBlock, hasVarCol, pInfo->numOfRowsInRes); + return getSortedBlockData(pInfo->pSortHandle, pInfo->pDataBlock, pInfo->numOfRowsInRes); } -SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock, SArray* pSortInfo, - SExecTaskInfo* pTaskInfo) { +SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock, SArray* pSortInfo, SExecTaskInfo* pTaskInfo) { SSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { - taosMemoryFreeClear(pInfo); - taosMemoryFreeClear(pOperator); - terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; - return NULL; + goto _error; } - pInfo->sortBufSize = 1024 * 16; // 1MB, TODO dynamic set the available sort buffer - pInfo->bufPageSize = 1024; - pInfo->numOfRowsInRes = 1024; - pInfo->pDataBlock = pResBlock; - pInfo->pSortInfo = pSortInfo; + pInfo->sortBufSize = 1024 * 16; // TODO dynamic set the available sort buffer + pInfo->bufPageSize = 1024; + pInfo->numOfRowsInRes = 1024; + pInfo->pDataBlock = pResBlock; + pInfo->pSortInfo = pSortInfo; - pOperator->name = "Sort"; + pOperator->name = "SortOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SORT; pOperator->blockingOptr = true; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; - pOperator->pTaskInfo = pTaskInfo; - pOperator->getNextFn = doSort; - pOperator->closeFn = destroyOrderOperatorInfo; + pOperator->pTaskInfo = pTaskInfo; + pOperator->getNextFn = doSort; + pOperator->closeFn = destroyOrderOperatorInfo; int32_t code = appendDownstream(pOperator, &downstream, 1); return pOperator; + + _error: + pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; + taosMemoryFree(pInfo); + taosMemoryFree(pOperator); + return NULL; } static int32_t getTableScanOrder(STableScanInfo* pTableScanInfo) { return pTableScanInfo->order; } diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index b9733e118f..b3a8e09f16 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -345,45 +345,73 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx return pOperator; _error: + pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; taosMemoryFreeClear(pInfo); taosMemoryFreeClear(pOperator); return NULL; } -SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, - SArray* pGroupColList, SNode* pCondition, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo) { - SGroupbyOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SGroupbyOperatorInfo)); - SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); +static SSDataBlock* doPartitionData(SOperatorInfo* pOperator, bool* newgroup) { + if (pOperator->status == OP_EXEC_DONE) { + return NULL; + } + + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SSortOperatorInfo* pInfo = pOperator->info; + bool hasVarCol = pInfo->pDataBlock->info.hasVarCol; + + if (pOperator->status == OP_RES_TO_RETURN) { + return getSortedBlockData(pInfo->pSortHandle, pInfo->pDataBlock, pInfo->numOfRowsInRes); + } + + int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize; + pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, pInfo->bufPageSize, numOfBufPage, + pInfo->pDataBlock, pTaskInfo->id.str); + + tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock); + + SGenericSource* ps = taosMemoryCalloc(1, sizeof(SGenericSource)); + ps->param = pOperator->pDownstream[0]; + tsortAddSource(pInfo->pSortHandle, ps); + + int32_t code = tsortOpen(pInfo->pSortHandle); + if (code != TSDB_CODE_SUCCESS) { + longjmp(pTaskInfo->env, terrno); + } + + pOperator->status = OP_RES_TO_RETURN; + return getSortedBlockData(pInfo->pSortHandle, pInfo->pDataBlock, pInfo->numOfRowsInRes); +} + +SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResultBlock, SArray* pSortInfo, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo) { + SSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortOperatorInfo)); + SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { goto _error; } - pInfo->pGroupCols = pGroupColList; - pInfo->pCondition = pCondition; - initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, 4096, pResultBlock, pTaskInfo->id.str); - initResultRowInfo(&pInfo->binfo.resultRowInfo, 8); + pInfo->sortBufSize = 1024 * 16; // TODO dynamic set the available sort buffer + pInfo->bufPageSize = 1024; + pInfo->numOfRowsInRes = 1024; + pInfo->pDataBlock = pResultBlock; + pInfo->pSortInfo = pSortInfo; - int32_t code = initGroupOptrInfo(pInfo, pGroupColList); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } - - pOperator->name = "PartitionByOperator"; + pOperator->name = "PartitionOperator"; + pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PARTITION; pOperator->blockingOptr = true; pOperator->status = OP_NOT_OPENED; - pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PARTITION; - pOperator->pExpr = pExprInfo; - pOperator->numOfOutput = numOfCols; pOperator->info = pInfo; - pOperator->_openFn = operatorDummyOpenFn; - pOperator->getNextFn = hashGroupbyAggregate; - pOperator->closeFn = destroyGroupbyOperatorInfo; - code = appendDownstream(pOperator, &downstream, 1); + pOperator->pTaskInfo = pTaskInfo; + pOperator->getNextFn = doPartitionData; +// pOperator->closeFn = destroyOrderOperatorInfo; + + int32_t code = appendDownstream(pOperator, &downstream, 1); return pOperator; _error: - taosMemoryFreeClear(pInfo); - taosMemoryFreeClear(pOperator); + pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; + taosMemoryFree(pInfo); + taosMemoryFree(pOperator); return NULL; } \ No newline at end of file