From d6af555905fbc5d20fd0e290a8f29508811f9750 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 21 Mar 2022 22:49:47 +0800 Subject: [PATCH] [td-13039] refactor. --- include/common/tcommon.h | 10 +- source/libs/executor/inc/executorimpl.h | 8 +- source/libs/executor/src/executorimpl.c | 228 ++++-------------------- tests/script/tsim/insert/basic0.sim | 2 + 4 files changed, 36 insertions(+), 212 deletions(-) diff --git a/include/common/tcommon.h b/include/common/tcommon.h index 385c123fec..fb787a32df 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -60,16 +60,10 @@ typedef struct SDataBlockInfo { int16_t numOfCols; int16_t hasVarCol; union {int64_t uid; int64_t blockId;}; + int64_t groupId; // no need to serialize } SDataBlockInfo; -//typedef struct SConstantItem { -// SColumnInfo info; -// int32_t startRow; // run-length-encoding to save the space for multiple rows -// int32_t endRow; -// SVariant value; -//} SConstantItem; - -// info.numOfCols = taosArrayGetSize(pDataBlock) + taosArrayGetSize(pConstantList); +// info.numOfCols = taosArrayGetSize(pDataBlock) typedef struct SSDataBlock { SColumnDataAgg *pBlockAgg; SArray *pDataBlock; // SArray diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index de789f61ed..66d5f8b5a7 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -545,6 +545,7 @@ typedef struct SGroupbyOperatorInfo { SOptrBasicInfo binfo; int32_t colIndex; char* prevData; // previous group by value + SGroupResInfo groupResInfo; } SGroupbyOperatorInfo; typedef struct SSessionAggOperatorInfo { @@ -649,8 +650,7 @@ SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, S SOperatorInfo* createFillOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput, bool multigroupResult); -SOperatorInfo* createGroupbyOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, - int32_t numOfOutput); +SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo); SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput); @@ -673,7 +673,6 @@ SOperatorInfo* createSLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorI SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pdownstream, int32_t numOfDownstream, SSchema* pSchema, int32_t numOfOutput); -// SSDataBlock* doSLimit(void* param, bool* newgroup); void doSetFilterColumnInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols, SSDataBlock* pBlock); bool doFilterDataBlock(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols, int32_t numOfRows, int8_t* p); void doCompactSDataBlock(SSDataBlock* pBlock, int32_t numOfRows, int8_t* p); @@ -697,11 +696,8 @@ void freeColumnFilterInfo(SColumnFilterInfo* pFilter, int32_t numOfFilters); STableQueryInfo* createTableQueryInfo(void* buf, bool groupbyColumn, STimeWindow win); STableQueryInfo* createTmpTableQueryInfo(STimeWindow win); -int32_t buildArithmeticExprFromMsg(SExprInfo* pArithExprInfo, void* pQueryMsg); - bool isTaskKilled(SExecTaskInfo* pTaskInfo); int32_t checkForQueryBuf(size_t numOfTables); -bool checkNeedToCompressQueryCol(SQInfo* pQInfo); void setTaskKilled(SExecTaskInfo* pTaskInfo); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 3a1560ba33..a1b11c8161 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -336,27 +336,6 @@ SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput, int32_t numO return res; } -SSDataBlock* createOutputBuf_rv(SArray* pExprInfo, int32_t numOfRows) { - size_t numOfOutput = taosArrayGetSize(pExprInfo); - - SSDataBlock *res = calloc(1, sizeof(SSDataBlock)); - res->info.numOfCols = numOfOutput; - res->pDataBlock = taosArrayInit(numOfOutput, sizeof(SColumnInfoData)); - - for (int32_t i = 0; i < numOfOutput; ++i) { - SColumnInfoData idata = {{0}}; - SExprInfo* pExpr = taosArrayGetP(pExprInfo, i); - - idata.info.type = pExpr->base.resSchema.type; - idata.info.bytes = pExpr->base.resSchema.bytes; - idata.info.colId = pExpr->base.resSchema.colId; - taosArrayPush(res->pDataBlock, &idata); - } - - blockDataEnsureCapacity(res, numOfRows); - return res; -} - SSDataBlock* createOutputBuf_rv1(SDataBlockDescNode* pNode) { int32_t numOfCols = LIST_LENGTH(pNode->pSlots); @@ -3327,41 +3306,6 @@ int32_t initResultRow(SResultRow *pResultRow) { * +------------+--------------------------------------------+--------------------------------------------+ * offset[0] offset[1] offset[2] */ -void setDefaultOutputBuf(STaskRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *pInfo, int64_t uid, int32_t stage) { - SqlFunctionCtx* pCtx = pInfo->pCtx; - SSDataBlock* pDataBlock = pInfo->pRes; - int32_t* rowCellInfoOffset = pInfo->rowCellInfoOffset; - SResultRowInfo* pResultRowInfo = &pInfo->resultRowInfo; - - int64_t tid = 0; - pRuntimeEnv->keyBuf = realloc(pRuntimeEnv->keyBuf, sizeof(tid) + sizeof(int64_t) + POINTER_BYTES); - SResultRow* pRow = doSetResultOutBufByKey(pRuntimeEnv, pResultRowInfo, tid, (char *)&tid, sizeof(tid), true, uid); - - for (int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) { - SColumnInfoData* pData = taosArrayGet(pDataBlock->pDataBlock, i); - - /* - * set the output buffer information and intermediate buffer - * not all queries require the interResultBuf, such as COUNT/TAGPRJ/PRJ/TAG etc. - */ - struct SResultRowEntryInfo* pEntry = getResultCell(pRow, i, rowCellInfoOffset); - cleanupResultRowEntry(pEntry); - - pCtx[i].resultInfo = pEntry; - pCtx[i].pOutput = pData->pData; - pCtx[i].currentStage = stage; - assert(pCtx[i].pOutput != NULL); - - // set the timestamp output buffer for top/bottom/diff query - int32_t fid = pCtx[i].functionId; - if (fid == FUNCTION_TOP || fid == FUNCTION_BOTTOM || fid == FUNCTION_DIFF || fid == FUNCTION_DERIVATIVE) { - if (i > 0) pCtx[i].ptsOutputBuf = pCtx[i-1].pOutput; - } - } - - initCtxOutputBuffer(pCtx, pDataBlock->info.numOfCols); -} - // TODO refactor: some function move away void setFunctionResultOutput(SOptrBasicInfo* pInfo, SAggSupporter* pSup, int32_t stage, SExecTaskInfo* pTaskInfo) { SqlFunctionCtx* pCtx = pInfo->pCtx; @@ -4024,81 +3968,6 @@ static int32_t compressQueryColData(SColumnInfoData *pColRes, int32_t numOfRows, colSize + COMP_OVERFLOW_BYTES, compressed, NULL, 0); } -static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data, int8_t compressed, int32_t *compLen) { - STaskRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv; - STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr; - - SSDataBlock* pRes = pRuntimeEnv->outputBuf; - - int32_t *compSizes = NULL; - int32_t numOfCols = pQueryAttr->pExpr2 ? pQueryAttr->numOfExpr2 : pQueryAttr->numOfOutput; - - if (compressed) { - compSizes = calloc(numOfCols, sizeof(int32_t)); - } - - if (pQueryAttr->pExpr2 == NULL) { - for (int32_t col = 0; col < numOfCols; ++col) { - SColumnInfoData* pColRes = taosArrayGet(pRes->pDataBlock, col); - if (compressed) { - compSizes[col] = compressQueryColData(pColRes, pRes->info.rows, data, compressed); - data += compSizes[col]; - *compLen += compSizes[col]; - compSizes[col] = htonl(compSizes[col]); - } else { - memmove(data, pColRes->pData, pColRes->info.bytes * pRes->info.rows); - data += pColRes->info.bytes * pRes->info.rows; - } - } - } else { - for (int32_t col = 0; col < numOfCols; ++col) { - SColumnInfoData* pColRes = taosArrayGet(pRes->pDataBlock, col); - if (compressed) { - compSizes[col] = htonl(compressQueryColData(pColRes, numOfRows, data, compressed)); - data += compSizes[col]; - *compLen += compSizes[col]; - compSizes[col] = htonl(compSizes[col]); - } else { - memmove(data, pColRes->pData, pColRes->info.bytes * numOfRows); - data += pColRes->info.bytes * numOfRows; - } - } - } - - if (compressed) { - memmove(data, (char *)compSizes, numOfCols * sizeof(int32_t)); - data += numOfCols * sizeof(int32_t); - - tfree(compSizes); - } - - int32_t numOfTables = (int32_t) taosHashGetSize(pRuntimeEnv->pTableRetrieveTsMap); - *(int32_t*)data = htonl(numOfTables); - data += sizeof(int32_t); - - int32_t total = 0; - STableIdInfo* item = taosHashIterate(pRuntimeEnv->pTableRetrieveTsMap, NULL); - - while(item) { - STableIdInfo* pDst = (STableIdInfo*)data; - pDst->uid = htobe64(item->uid); - pDst->key = htobe64(item->key); - - data += sizeof(STableIdInfo); - total++; - - //qDebug("QInfo:0x%"PRIx64" set subscribe info, tid:%d, uid:%"PRIu64", skey:%"PRId64, pQInfo->qId, item->tid, item->uid, item->key); - item = taosHashIterate(pRuntimeEnv->pTableRetrieveTsMap, item); - } - - //qDebug("QInfo:0x%"PRIx64" set %d subscribe info", pQInfo->qId, total); - - // Check if query is completed or not for stable query or normal table query respectively. - if (Q_STATUS_EQUAL(pRuntimeEnv->status, TASK_COMPLETED) && pRuntimeEnv->proot->status == OP_EXEC_DONE) { -// setTaskStatus(pOperator->pTaskInfo, QUERY_OVER); - } -} - int32_t doFillTimeIntervalGapsInResults(struct SFillInfo* pFillInfo, SSDataBlock *pOutput, int32_t capacity, void** p) { // for(int32_t i = 0; i < pFillInfo->numOfCols; ++i) { // SColumnInfoData* pColInfoData = taosArrayGet(pOutput->pDataBlock, i); @@ -4119,10 +3988,10 @@ void publishOperatorProfEvent(SOperatorInfo* operatorInfo, EQueryProfEventType e event.operatorType = operatorInfo->operatorType; if (operatorInfo->pRuntimeEnv) { - SQInfo* pQInfo = operatorInfo->pRuntimeEnv->qinfo; - if (pQInfo->summary.queryProfEvents) { - taosArrayPush(pQInfo->summary.queryProfEvents, &event); - } +// SQInfo* pQInfo = operatorInfo->pRuntimeEnv->qinfo; +// if (pQInfo->summary.queryProfEvents) { +// taosArrayPush(pQInfo->summary.queryProfEvents, &event); +// } } } @@ -6787,10 +6656,6 @@ static SSDataBlock* doSTableIntervalAgg(SOperatorInfo *pOperator, bool* newgroup if (pIntervalInfo->binfo.pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) { doSetOperatorCompleted(pOperator); } - - SQInfo* pQInfo = pRuntimeEnv->qinfo; - pQInfo->summary.firstStageMergeTime += (taosGetTimestampUs() - st); - return pIntervalInfo->binfo.pRes; } @@ -6883,8 +6748,8 @@ static SSDataBlock* doAllSTableIntervalAgg(SOperatorInfo *pOperator, bool* newgr pOperator->status = OP_EXEC_DONE; } - SQInfo* pQInfo = pRuntimeEnv->qinfo; - pQInfo->summary.firstStageMergeTime += (taosGetTimestampUs() - st); +// SQInfo* pQInfo = pRuntimeEnv->qinfo; +// pQInfo->summary.firstStageMergeTime += (taosGetTimestampUs() - st); return pIntervalInfo->binfo.pRes; } @@ -7075,19 +6940,16 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo *pOperator, bool* newgrou } SGroupbyOperatorInfo *pInfo = pOperator->info; - - STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; if (pOperator->status == OP_RES_TO_RETURN) { // toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->binfo.pRes); - - if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) { + if (pInfo->binfo.pRes->info.rows == 0/*|| !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)*/) { pOperator->status = OP_EXEC_DONE; } - return pInfo->binfo.pRes; } SOperatorInfo* downstream = pOperator->pDownstream[0]; + int32_t order = TSDB_ORDER_ASC; while(1) { publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); @@ -7098,10 +6960,10 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo *pOperator, bool* newgrou } // the pDataBlock are always the same one, no need to call this again - setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, pRuntimeEnv->pQueryAttr->order.order); + setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order); // setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->binfo.pCtx, pOperator->numOfOutput); if (pInfo->colIndex == -1) { - pInfo->colIndex = getGroupbyColumnIndex(pRuntimeEnv->pQueryAttr->pGroupbyExpr, pBlock); + pInfo->colIndex = getGroupbyColumnIndex(NULL/*pInfo->pGroupbyExpr*/, pBlock); } doHashGroupbyAgg(pOperator, pInfo, pBlock); @@ -7109,21 +6971,20 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo *pOperator, bool* newgrou pOperator->status = OP_RES_TO_RETURN; closeAllResultRows(&pInfo->binfo.resultRowInfo); -// setTaskStatus(pOperator->pTaskInfo, QUERY_COMPLETED); - if (!pRuntimeEnv->pQueryAttr->stableQuery) { // finalize include the update of result rows +// if (!pRuntimeEnv->pQueryAttr->stableQuery) { // finalize include the update of result rows finalizeQueryResult(pInfo->binfo.pCtx, pOperator->numOfOutput); - } else { - updateNumOfRowsInResultRows(pInfo->binfo.pCtx, pOperator->numOfOutput, &pInfo->binfo.resultRowInfo, pInfo->binfo.rowCellInfoOffset); - } +// } else { +// updateNumOfRowsInResultRows(pInfo->binfo.pCtx, pOperator->numOfOutput, &pInfo->binfo.resultRowInfo, pInfo->binfo.rowCellInfoOffset); +// } - initGroupResInfo(&pRuntimeEnv->groupResInfo, &pInfo->binfo.resultRowInfo); - if (!pRuntimeEnv->pQueryAttr->stableQuery) { - sortGroupResByOrderList(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->binfo.pRes); - } + initGroupResInfo(&pInfo->groupResInfo, &pInfo->binfo.resultRowInfo); +// if (!pRuntimeEnv->pQueryAttr->stableQuery) { +// sortGroupResByOrderList(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->binfo.pRes); +// } // toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->binfo.pRes); - if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) { + if (pInfo->binfo.pRes->info.rows == 0/* || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)*/) { pOperator->status = OP_EXEC_DONE; } @@ -7802,19 +7663,16 @@ SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRun return pOperator; } -SOperatorInfo* createGroupbyOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput) { +SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo) { SGroupbyOperatorInfo* pInfo = calloc(1, sizeof(SGroupbyOperatorInfo)); pInfo->colIndex = -1; // group by column index + pInfo->binfo.pCtx = createSqlFunctionCtx_rv(pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); - pInfo->binfo.pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); +// pQueryAttr->resultRowSize = (pQueryAttr->resultRowSize * +// (int32_t)(getRowNumForMultioutput(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery))); - STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr; - - pQueryAttr->resultRowSize = (pQueryAttr->resultRowSize * - (int32_t)(getRowNumForMultioutput(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery))); - - pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); + pInfo->binfo.pRes = pResBlock; initResultRowInfo(&pInfo->binfo.resultRowInfo, 8); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); @@ -7825,11 +7683,11 @@ SOperatorInfo* createGroupbyOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperator pOperator->pExpr = pExpr; pOperator->numOfOutput = numOfOutput; pOperator->info = pInfo; - pOperator->pRuntimeEnv = pRuntimeEnv; - pOperator->getNextFn = hashGroupbyAggregate; - pOperator->closeFn = destroyGroupbyOperatorInfo; + pOperator->_openFn = operatorDummyOpenFn; + pOperator->getNextFn = hashGroupbyAggregate; + pOperator->closeFn = destroyGroupbyOperatorInfo; - int32_t code = appendDownstream(pOperator, &downstream, 1); + int32_t code = appendDownstream(pOperator, &downstream, 1); return pOperator; } @@ -8330,12 +8188,10 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, int32_t* numOfExprs) { SFunctionNode* pFuncNode = (SFunctionNode*)pTargetNode->pExpr; SDataType* pType = &pFuncNode->node.resType; - pExp->base.resSchema = createResSchema(pType->type, pType->bytes, pTargetNode->slotId, pType->scale, - pType->precision, pFuncNode->node.aliasName); + pExp->base.resSchema = createResSchema(pType->type, pType->bytes, pTargetNode->slotId, pType->scale, pType->precision, pFuncNode->node.aliasName); pExp->pExpr->_function.pFunctNode = pFuncNode; - strncpy(pExp->pExpr->_function.functionName, pFuncNode->functionName, - tListLen(pExp->pExpr->_function.functionName)); + strncpy(pExp->pExpr->_function.functionName, pFuncNode->functionName, tListLen(pExp->pExpr->_function.functionName)); // TODO: value parameter needs to be handled int32_t numOfParam = LIST_LENGTH(pFuncNode->pParameterList); @@ -8933,30 +8789,6 @@ int32_t checkForQueryBuf(size_t numOfTables) { return TSDB_CODE_QRY_NOT_ENOUGH_BUFFER; } -bool checkNeedToCompressQueryCol(SQInfo *pQInfo) { - STaskRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv; - STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr; - - SSDataBlock* pRes = pRuntimeEnv->outputBuf; - - if (GET_NUM_OF_RESULTS(&(pQInfo->runtimeEnv)) <= 0) { - return false; - } - - int32_t numOfRows = pQueryAttr->pExpr2 ? GET_NUM_OF_RESULTS(pRuntimeEnv) : pRes->info.rows; - int32_t numOfCols = pQueryAttr->pExpr2 ? pQueryAttr->numOfExpr2 : pQueryAttr->numOfOutput; - - for (int32_t col = 0; col < numOfCols; ++col) { - SColumnInfoData* pColRes = taosArrayGet(pRes->pDataBlock, col); - int32_t colSize = pColRes->info.bytes * numOfRows; - if (NEEDTO_COMPRESS_QUERY(colSize)) { - return true; - } - } - - return false; -} - void releaseQueryBuf(size_t numOfTables) { if (tsQueryBufferSizeBytes < 0) { return; diff --git a/tests/script/tsim/insert/basic0.sim b/tests/script/tsim/insert/basic0.sim index 79a4682643..b5114d9a29 100644 --- a/tests/script/tsim/insert/basic0.sim +++ b/tests/script/tsim/insert/basic0.sim @@ -52,6 +52,8 @@ sql insert into ct3 values('2021-01-01 00:00:00.000', 10, 2.0, 3.0) #=================================================================== print =============== query data from child table sql select * from ct1 +print ========> value is : $data00 + if $rows != 4 then # after fix bug, modify 4 to 7 return -1 endi