From 24090f0fc3077d1f4b72d842dd385fa1df336fc7 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 12 Jul 2022 15:30:38 +0800 Subject: [PATCH] refactor: do some internal refactor. --- source/libs/executor/src/executorimpl.c | 19 +---- source/libs/executor/src/scanoperator.c | 98 ------------------------- source/libs/function/src/builtinsimpl.c | 11 ++- 3 files changed, 11 insertions(+), 117 deletions(-) diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index e9a244b573..2ae09e0434 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -538,7 +538,7 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt return code; } -static int32_t doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SqlFunctionCtx* pCtx) { +static int32_t doAggregateImpl(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx) { for (int32_t k = 0; k < pOperator->exprSupp.numOfExprs; ++k) { if (functionNeedToExecute(&pCtx[k])) { // todo add a dummy funtion to avoid process check @@ -2969,25 +2969,10 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { // the pDataBlock are always the same one, no need to call this again setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.groupId, pAggInfo); setInputDataBlock(pOperator, pSup->pCtx, pBlock, order, scanFlag, true); - code = doAggregateImpl(pOperator, 0, pSup->pCtx); + code = doAggregateImpl(pOperator, pSup->pCtx); if (code != 0) { longjmp(pTaskInfo->env, code); } - -#if 0 // test for encode/decode result info - if(pOperator->fpSet.encodeResultRow){ - char *result = NULL; - int32_t length = 0; - pOperator->fpSet.encodeResultRow(pOperator, &result, &length); - SAggSupporter* pSup = &pAggInfo->aggSup; - taosHashClear(pSup->pResultRowHashTable); - pInfo->resultRowInfo.size = 0; - pOperator->fpSet.decodeResultRow(pOperator, result); - if(result){ - taosMemoryFree(result); - } - } -#endif } closeAllResultRows(&pAggInfo->binfo.resultRowInfo); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 6ab3effc85..f3c240b6f2 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2860,101 +2860,3 @@ _error: return NULL; } -static SSDataBlock* doScanLastrow(SOperatorInfo* pOperator) { - if (pOperator->status == OP_EXEC_DONE) { - return NULL; - } - - SLastrowScanInfo* pInfo = pOperator->info; - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - - int32_t size = taosArrayGetSize(pInfo->pTableList); - if (size == 0) { - setTaskStatus(pTaskInfo, TASK_COMPLETED); - return NULL; - } - - // check if it is a group by tbname - if (size == taosArrayGetSize(pInfo->pTableList)) { - blockDataCleanup(pInfo->pRes); - tsdbRetrieveLastRow(pInfo->pLastrowReader, pInfo->pRes, pInfo->pSlotIds); - return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes; - } else { - // todo fetch the result for each group - } - - return pInfo->pRes->info.rows == 0 ? NULL : pInfo->pRes; -} - -static void destroyLastrowScanOperator(void* param, int32_t numOfOutput) { - SLastrowScanInfo* pInfo = (SLastrowScanInfo*)param; - blockDataDestroy(pInfo->pRes); - tsdbLastrowReaderClose(pInfo->pLastrowReader); - - taosMemoryFreeClear(param); -} - -SOperatorInfo* createLastrowScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandle* readHandle, SArray* pTableList, - SExecTaskInfo* pTaskInfo) { - SLastrowScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SLastrowScanInfo)); - SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); - if (pInfo == NULL || pOperator == NULL) { - goto _error; - } - - pInfo->pTableList = pTableList; - pInfo->readHandle = *readHandle; - pInfo->pRes = createResDataBlock(pScanNode->node.pOutputDataBlockDesc); - - int32_t numOfCols = 0; - pInfo->pColMatchInfo = extractColMatchInfo(pScanNode->pScanCols, pScanNode->node.pOutputDataBlockDesc, &numOfCols, - COL_MATCH_FROM_COL_ID); - int32_t* pCols = taosMemoryMalloc(numOfCols * sizeof(int32_t)); - for (int32_t i = 0; i < numOfCols; ++i) { - SColMatchInfo* pColMatch = taosArrayGet(pInfo->pColMatchInfo, i); - pCols[i] = pColMatch->colId; - } - - pInfo->pSlotIds = taosMemoryMalloc(numOfCols * sizeof(pInfo->pSlotIds[0])); - for (int32_t i = 0; i < numOfCols; ++i) { - SColMatchInfo* pColMatch = taosArrayGet(pInfo->pColMatchInfo, i); - for (int32_t j = 0; j < pTaskInfo->schemaVer.sw->nCols; ++j) { - if (pColMatch->colId == pTaskInfo->schemaVer.sw->pSchema[j].colId && - pColMatch->colId == PRIMARYKEY_TIMESTAMP_COL_ID) { - pInfo->pSlotIds[pColMatch->targetSlotId] = -1; - break; - } - - if (pColMatch->colId == pTaskInfo->schemaVer.sw->pSchema[j].colId) { - pInfo->pSlotIds[pColMatch->targetSlotId] = j; - break; - } - } - } - - tsdbLastRowReaderOpen(readHandle->vnode, LASTROW_RETRIEVE_TYPE_ALL, pTableList, pCols, numOfCols, - &pInfo->pLastrowReader); - taosMemoryFree(pCols); - - pOperator->name = "LastrowScanOperator"; - pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN; - pOperator->blocking = false; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; - pOperator->pTaskInfo = pTaskInfo; - pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock); - - initResultSizeInfo(pOperator, 1024); - blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); - - pOperator->fpSet = - createOperatorFpSet(operatorDummyOpenFn, doScanLastrow, NULL, NULL, destroyLastrowScanOperator, NULL, NULL, NULL); - pOperator->cost.openCost = 0; - return pOperator; - -_error: - pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; - taosMemoryFree(pInfo); - taosMemoryFree(pOperator); - return NULL; -} diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 932bfb8793..5dee2e8480 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -2729,7 +2729,6 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) { } } pInfo->hasResult = true; - // DO_UPDATE_TAG_COLUMNS(pCtx, ts); pResInfo->numOfRes = 1; break; } @@ -2826,7 +2825,6 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) { } pInfo->hasResult = true; pResInfo->numOfRes = 1; - // DO_UPDATE_TAG_COLUMNS(pCtx, ts); } break; } @@ -5980,6 +5978,15 @@ int32_t lastrowFunction(SqlFunctionCtx* pCtx) { pInfo->hasResult = true; pResInfo->numOfRes = 1; + + if (pCtx->subsidiaries.num > 0) { + STuplePos* pTuplePos = (STuplePos*)(pInfo->buf + bytes + sizeof(TSKEY)); + if (!pInfo->hasResult) { + saveTupleData(pCtx, i, pCtx->pSrcBlock, pTuplePos); + } else { + copyTupleData(pCtx, i, pCtx->pSrcBlock, pTuplePos); + } + } } }