From 0aadc6c996a9a2eaee3d24b7481f8e68763646c4 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 19 May 2021 13:53:45 +0800 Subject: [PATCH] [td-225] refactor. --- src/client/src/tscUtil.c | 35 ++++++++++++++++++++--------------- 1 file changed, 20 insertions(+), 15 deletions(-) diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index b1a7a34011..5bd7417f5f 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -658,9 +658,10 @@ typedef struct SJoinStatus { } SJoinStatus; typedef struct SJoinOperatorInfo { - SSDataBlock *pRes; - SJoinStatus *status; - int32_t numOfUpstream; + SSDataBlock *pRes; + SJoinStatus *status; + int32_t numOfUpstream; + SRspResultInfo resultInfo; // todo refactor, add this info for each operator } SJoinOperatorInfo; SSDataBlock* doGetDataBlock(void* param, bool* newgroup) { @@ -796,37 +797,41 @@ SOperatorInfo* createDummyInputOperator(char* pResult, SSchema* pSchema, int32_t SOperatorInfo* createJoinOperator(SOperatorInfo** pUpstream, int32_t numOfUpstream, SSchema* pSchema, int32_t numOfOutput) { SJoinOperatorInfo* pInfo = calloc(1, sizeof(SJoinOperatorInfo)); + pInfo->numOfUpstream = numOfUpstream; pInfo->status = calloc(numOfUpstream, sizeof(SJoinStatus)); + SRspResultInfo* pResInfo = &pInfo->resultInfo; + pResInfo->capacity = 4096; + pResInfo->threshold = 4096 * 0.8; + pInfo->pRes = calloc(1, sizeof(SSDataBlock)); pInfo->pRes->info.numOfCols = numOfOutput; - pInfo->pRes->pDataBlock = taosArrayInit(numOfOutput, sizeof(SColumnInfoData)); for(int32_t i = 0; i < numOfOutput; ++i) { SColumnInfoData colData = {{0}}; colData.info.bytes = pSchema[i].bytes; colData.info.type = pSchema[i].type; colData.info.colId = pSchema[i].colId; - colData.pData = calloc(1, colData.info.bytes * 4096); + colData.pData = calloc(1, colData.info.bytes * pResInfo->capacity); taosArrayPush(pInfo->pRes->pDataBlock, &colData); } - SOperatorInfo* pOptr = calloc(1, sizeof(SOperatorInfo)); - pOptr->name = "JoinOperator"; - pOptr->operatorType = OP_Join; - pOptr->numOfOutput = numOfOutput; - pOptr->blockingOptr = false; - pOptr->info = pInfo; - pOptr->exec = doBlockJoin; - pOptr->cleanup = destroyDummyInputOperator; + SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); + pOperator->name = "JoinOperator"; + pOperator->operatorType = OP_Join; + pOperator->numOfOutput = numOfOutput; + pOperator->blockingOptr = false; + pOperator->info = pInfo; + pOperator->exec = doBlockJoin; + pOperator->cleanup = destroyDummyInputOperator; for(int32_t i = 0; i < numOfUpstream; ++i) { - appendUpstream(pOptr, pUpstream[i]); + appendUpstream(pOperator, pUpstream[i]); } - return pOptr; + return pOperator; } void convertQueryResult(SSqlRes* pRes, SQueryInfo* pQueryInfo) {