diff --git a/source/dnode/mnode/impl/src/mndInfoSchema.c b/source/dnode/mnode/impl/src/mndInfoSchema.c index b076cf0eb0..dfe7bfdc25 100644 --- a/source/dnode/mnode/impl/src/mndInfoSchema.c +++ b/source/dnode/mnode/impl/src/mndInfoSchema.c @@ -118,7 +118,7 @@ static const SInfosTableSchema userUsersSchema[] = {{.name = "name", . static const SInfosTableSchema vgroupsSchema[] = {{.name = "vgroup_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, {.name = "db_name", .bytes = (TSDB_DB_NAME_LEN - 1) + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, {.name = "tables", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, - {.name = "status", .bytes = 10, .type = TSDB_DATA_TYPE_BINARY}, + {.name = "status", .bytes = 12 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, {.name = "onlines", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, {.name = "v1_dnode", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, {.name = "v1_status", .bytes = 10, .type = TSDB_DATA_TYPE_BINARY}, diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index bfe6f94c2a..4c268765e7 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -701,7 +701,7 @@ static int32_t mndRetrieveVnodes(SMnodeMsg *pReq, SShowObj *pShow, char *data, i SVgObj *pVgroup = NULL; char *pWrite; int32_t cols = 0; - int32_t dnodeId = pShow->replica; +// int32_t dnodeId = pShow->replica; while (numOfRows < rows) { pShow->pIter = sdbFetch(pSdb, SDB_VGROUP, pShow->pIter, (void **)&pVgroup); @@ -709,17 +709,33 @@ static int32_t mndRetrieveVnodes(SMnodeMsg *pReq, SShowObj *pShow, char *data, i for (int32_t i = 0; i < pVgroup->replica && numOfRows < rows; ++i) { SVnodeGid *pVgid = &pVgroup->vnodeGid[i]; - if (pVgid->dnodeId != dnodeId) continue; - cols = 0; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; *(uint32_t *)pWrite = pVgroup->vgId; cols++; + SName name = {0}; + char db[TSDB_DB_NAME_LEN] = {0}; + tNameFromString(&name, pVgroup->dbName, T_NAME_ACCT|T_NAME_DB); + tNameGetDbName(&name, db); + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + STR_TO_VARSTR(pWrite, db); + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + *(uint32_t *)pWrite = 0; //todo: Tables + cols++; + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; STR_TO_VARSTR(pWrite, mndGetRoleStr(pVgid->role)); cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + *(uint32_t *)pWrite = pVgroup->replica; //onlines + cols++; + numOfRows++; } diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index bc7d7353af..726e02fc4e 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -493,7 +493,8 @@ typedef struct SAggOperatorInfo { typedef struct SProjectOperatorInfo { SOptrBasicInfo binfo; - SSDataBlock* existDataBlock; + SSDataBlock *existDataBlock; + int32_t threshold; } SProjectOperatorInfo; typedef struct SLimitOperatorInfo { @@ -626,7 +627,7 @@ SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv); SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); -SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo); SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataBlock* pResBlock, const SName* pName, SNode* pCondition, SEpSet epset, SExecTaskInfo* pTaskInfo); diff --git a/source/libs/executor/src/dataDispatcher.c b/source/libs/executor/src/dataDispatcher.c index a2e526c2bd..8931385049 100644 --- a/source/libs/executor/src/dataDispatcher.c +++ b/source/libs/executor/src/dataDispatcher.c @@ -15,11 +15,12 @@ #include "dataSinkInt.h" #include "dataSinkMgt.h" +#include "executorimpl.h" #include "planner.h" #include "tcompression.h" #include "tglobal.h" #include "tqueue.h" -#include "executorimpl.h" +#include "tdatablock.h" typedef struct SDataDispatchBuf { int32_t useSize; @@ -84,8 +85,11 @@ static void copyData(const SInputData* pInput, const SDataBlockDescNode* pSchema *compLen += compSizes[col]; compSizes[col] = htonl(compSizes[col]); } else { - memmove(data, pColRes->pData, pColRes->info.bytes * pInput->pData->info.rows); - data += pColRes->info.bytes * pInput->pData->info.rows; + for(int32_t i = 0; i < pInput->pData->info.rows; ++i) { + char* pData = colDataGetData(pColRes, i); + memmove(data, pData, pColRes->info.bytes); + data += pColRes->info.bytes; + } } } } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 8ae8ac00d7..4be896bebf 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1183,13 +1183,6 @@ static void doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, ASSERT(pCtx[i].input.pData[0] != NULL); - // if (pCtx[i].functionId < 0) { - // SColumnInfoData* tsInfo = taosArrayGet(pBlock->pDataBlock, 0); - // pCtx[i].ptsList = (int64_t*) tsInfo->pData; - - // continue; - // } - // uint32_t status = aAggs[pCtx[i].functionId].status; // if ((status & (FUNCSTATE_SELECTIVITY | FUNCSTATE_NEED_TS)) != 0) { // SColumnInfoData* tsInfo = taosArrayGet(pBlock->pDataBlock, 0); @@ -1224,27 +1217,17 @@ static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SqlFunction } } -static void projectApplyFunctions(STaskRuntimeEnv *pRuntimeEnv, SqlFunctionCtx *pCtx, int32_t numOfOutput) { - STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr; - +static void projectApplyFunctions(SSDataBlock* pResult, SqlFunctionCtx *pCtx, int32_t numOfOutput) { for (int32_t k = 0; k < numOfOutput; ++k) { - pCtx[k].startTs = pQueryAttr->window.skey; + if (pCtx[k].fpSet.init == NULL) { // it is a project query + SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, k); + memcpy(pColInfoData->pData, pCtx[k].input.pData[0]->pData, colDataGetLength(pColInfoData, pCtx[k].input.numOfRows)); + } else { // TODO: arithmetic and other process. - // Always set the asc order for merge stage process - if (pCtx[k].currentStage == MERGE_STAGE) { - pCtx[k].order = TSDB_ORDER_ASC; - } - - pCtx[k].startTs = pQueryAttr->window.skey; - - if (pCtx[k].functionId < 0) { - // load the script and exec -// SUdfInfo* pUdfInfo = pRuntimeEnv->pUdfInfo; -// doInvokeUdf(pUdfInfo, &pCtx[k], 0, TSDB_UDF_FUNC_NORMAL); -// } else { -// aAggs[pCtx[k].functionId].xFunction(&pCtx[k]); } } + + pResult->info.rows = pCtx[0].input.numOfRows; } void doTimeWindowInterpolation(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo, SArray* pDataBlock, TSKEY prevTs, @@ -2049,9 +2032,14 @@ static SqlFunctionCtx* createSqlFunctionCtx_rv(SArray* pExprInfo, int32_t** rowC SExprBasicInfo *pFunct = &pExpr->base; SqlFunctionCtx* pCtx = &pFuncCtx[i]; - fmGetFuncExecFuncs(pExpr->pExpr->_function.pFunctNode->funcId, &pCtx->fpSet); - pCtx->input.numOfInputCols = pFunct->numOfParams; + if (pExpr->pExpr->_function.pFunctNode != NULL) { + SFuncExecEnv env = {0}; + fmGetFuncExecFuncs(pExpr->pExpr->_function.pFunctNode->funcId, &pCtx->fpSet); + pCtx->fpSet.getEnv(pExpr->pExpr->_function.pFunctNode, &env); + pCtx->resDataInfo.interBufSize = env.calcMemSize; + } + pCtx->input.numOfInputCols = pFunct->numOfParams; pCtx->input.pData = calloc(pFunct->numOfParams, POINTER_BYTES); pCtx->input.pColumnDataAgg = calloc(pFunct->numOfParams, POINTER_BYTES); @@ -2061,10 +2049,6 @@ static SqlFunctionCtx* createSqlFunctionCtx_rv(SArray* pExprInfo, int32_t** rowC pCtx->order = TSDB_ORDER_ASC; pCtx->start.key = INT64_MIN; pCtx->end.key = INT64_MIN; - - SFuncExecEnv env = {0}; - pCtx->fpSet.getEnv(pExpr->pExpr->_function.pFunctNode, &env); - pCtx->resDataInfo.interBufSize = env.calcMemSize; #if 0 for (int32_t j = 0; j < pCtx->numOfParams; ++j) { // int16_t type = pFunct->param[j].nType; @@ -5423,6 +5407,47 @@ static int32_t loadSysTableContentCb(void* param, const SDataBuf* pMsg, int32_t tsem_post(&pScanResInfo->ready); } + +static SSDataBlock* doFilterResult(SSysTableScanInfo* pInfo) { + if (pInfo->pCondition == NULL) { + return pInfo->pRes->info.rows == 0? NULL:pInfo->pRes; + } + + SFilterInfo* filter = NULL; + int32_t code = filterInitFromNode(pInfo->pCondition, &filter, 0); + + SFilterColumnParam param1 = {.numOfCols = pInfo->pRes->info.numOfCols, .pDataBlock = pInfo->pRes->pDataBlock}; + code = filterSetDataFromSlotId(filter, ¶m1); + + int8_t* rowRes = NULL; + bool keep = filterExecute(filter, pInfo->pRes, &rowRes, NULL, param1.numOfCols); + + SSDataBlock* px = createOneDataBlock(pInfo->pRes); + blockDataEnsureCapacity(px, pInfo->pRes->info.rows); + + //TODO refactor + int32_t numOfRow = 0; + for (int32_t i = 0; i < pInfo->pRes->info.numOfCols; ++i) { + SColumnInfoData* pDest = taosArrayGet(px->pDataBlock, i); + SColumnInfoData* pSrc = taosArrayGet(pInfo->pRes->pDataBlock, i); + + numOfRow = 0; + for (int32_t j = 0; j < pInfo->pRes->info.rows; ++j) { + if (rowRes[j] == 0) { + continue; + } + + colDataAppend(pDest, numOfRow, colDataGetData(pSrc, j), false); + numOfRow += 1; + } + } + + px->info.rows = numOfRow; + pInfo->pRes = px; + + return pInfo->pRes->info.rows == 0? NULL:pInfo->pRes; +} + static SSDataBlock* doSysTableScan(void* param, bool* newgroup) { // build message and send to mnode to fetch the content of system tables. SOperatorInfo* pOperator = (SOperatorInfo*) param; @@ -5457,7 +5482,6 @@ static SSDataBlock* doSysTableScan(void* param, bool* newgroup) { int64_t startTs = taosGetTimestampUs(); pInfo->req.type = pInfo->type; -// tNameGetFullDbName(&pInfo->name, pInfo->req.db); strncpy(pInfo->req.tb, tNameGetTableName(&pInfo->name), tListLen(pInfo->req.tb)); int32_t contLen = tSerializeSRetrieveTableReq(NULL, 0, &pInfo->req); @@ -5496,42 +5520,7 @@ static SSDataBlock* doSysTableScan(void* param, bool* newgroup) { setSDataBlockFromFetchRsp(pInfo->pRes, &pInfo->loadInfo, pTableRsp->numOfRows, pTableRsp->data, pTableRsp->compLen, pOperator->numOfOutput, startTs, NULL); - // do filter the qualified results - { - SFilterInfo *filter = NULL; - code = filterInitFromNode(pInfo->pCondition, &filter, 0); - - SFilterColumnParam param1 = {.numOfCols= pInfo->pRes->info.numOfCols, .pDataBlock = pInfo->pRes->pDataBlock}; - code = filterSetDataFromSlotId(filter, ¶m1); - - int8_t *rowRes = NULL; - bool keep = filterExecute(filter, pInfo->pRes, &rowRes, NULL, param1.numOfCols); - printf("%d, %d\n", rowRes[0], rowRes[1]); - - SSDataBlock* px = createOneDataBlock(pInfo->pRes); - blockDataEnsureCapacity(px, pInfo->pRes->info.rows); - int32_t numOfRow = 0; - - for(int32_t i = 0; i < pInfo->pRes->info.numOfCols; ++i) { - SColumnInfoData* pDest = taosArrayGet(px->pDataBlock, i); - SColumnInfoData* pSrc = taosArrayGet(pInfo->pRes->pDataBlock, i); - - numOfRow = 0; - for(int32_t j = 0; j < pInfo->pRes->info.rows; ++j) { - if (rowRes[j] == 0) { - continue; - } - - colDataAppend(pDest, numOfRow, colDataGetData(pSrc, j), false); - numOfRow += 1; - } - } - - px->info.rows = numOfRow; - pInfo->pRes = px; - } - - return pInfo->pRes; + return doFilterResult(pInfo); } return NULL; @@ -6331,54 +6320,50 @@ static SSDataBlock* doMultiTableAggregate(void* param, bool* newgroup) { } static SSDataBlock* doProjectOperation(void* param, bool* newgroup) { - SOperatorInfo* pOperator = (SOperatorInfo*) param; + SOperatorInfo* pOperator = param; SProjectOperatorInfo* pProjectInfo = pOperator->info; - STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; SOptrBasicInfo *pInfo = &pProjectInfo->binfo; SSDataBlock* pRes = pInfo->pRes; - int32_t order = pRuntimeEnv->pQueryAttr->order.order; - - pRes->info.rows = 0; + blockDataClearup(pRes, true); if (pProjectInfo->existDataBlock) { // TODO refactor - STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current; - SSDataBlock* pBlock = pProjectInfo->existDataBlock; pProjectInfo->existDataBlock = NULL; *newgroup = true; // todo dynamic set tags - if (pTableQueryInfo != NULL) { +// if (pTableQueryInfo != NULL) { // setTagValue(pOperator, pTableQueryInfo->pTable, pInfo->pCtx, pOperator->numOfOutput); - } +// } // the pDataBlock are always the same one, no need to call this again - setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order); - updateOutputBuf(pInfo, &pInfo->capacity, pBlock->info.rows); + setInputDataBlock(pOperator, pInfo->pCtx, pBlock, TSDB_ORDER_ASC); - projectApplyFunctions(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput); + blockDataEnsureCapacity(pInfo->pRes, pBlock->info.rows); + projectApplyFunctions(pInfo->pRes, pInfo->pCtx, pOperator->numOfOutput); pRes->info.rows = getNumOfResult(pInfo->pCtx, pOperator->numOfOutput, NULL); - if (pRes->info.rows >= pRuntimeEnv->resultInfo.threshold) { + if (pRes->info.rows >= pProjectInfo->binfo.capacity*0.8) { copyTsColoum(pRes, pInfo->pCtx, pOperator->numOfOutput); resetResultRowEntryResult(pInfo->pCtx, pOperator->numOfOutput); return pRes; } } + SOperatorInfo* downstream = pOperator->pDownstream[0]; + while(1) { bool prevVal = *newgroup; // The downstream exec may change the value of the newgroup, so use a local variable instead. - publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock* pBlock = pOperator->pDownstream[0]->nextDataFn(pOperator->pDownstream[0], newgroup); - publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC); + publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); + SSDataBlock* pBlock = downstream->nextDataFn(downstream, newgroup); + publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { assert(*newgroup == false); - *newgroup = prevVal; setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED); break; @@ -6397,25 +6382,25 @@ static SSDataBlock* doProjectOperation(void* param, bool* newgroup) { } } - STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current; - // todo dynamic set tags - if (pTableQueryInfo != NULL) { -// setTagValue(pOperator, pTableQueryInfo->pTable, pInfo->pCtx, pOperator->numOfOutput); - } + + // STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current; + // if (pTableQueryInfo != NULL) { + // setTagValue(pOperator, pTableQueryInfo->pTable, pInfo->pCtx, pOperator->numOfOutput); + // } // the pDataBlock are always the same one, no need to call this again - setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order); + setInputDataBlock(pOperator, pInfo->pCtx, pBlock, TSDB_ORDER_ASC); updateOutputBuf(pInfo, &pInfo->capacity, pBlock->info.rows); - projectApplyFunctions(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput); - pRes->info.rows = getNumOfResult(pInfo->pCtx, pOperator->numOfOutput, NULL); - if (pRes->info.rows >= 1000/*pRuntimeEnv->resultInfo.threshold*/) { + projectApplyFunctions(pInfo->pRes, pInfo->pCtx, pOperator->numOfOutput); + if (pRes->info.rows >= pProjectInfo->threshold) { break; } } + copyTsColoum(pRes, pInfo->pCtx, pOperator->numOfOutput); - resetResultRowEntryResult(pInfo->pCtx, pOperator->numOfOutput); +// resetResultRowEntryResult(pInfo->pCtx, pOperator->numOfOutput); return (pInfo->pRes->info.rows > 0)? pInfo->pRes:NULL; } @@ -7317,16 +7302,22 @@ SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SArray return pOperator; } -SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo) { +SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo) { SProjectOperatorInfo* pInfo = calloc(1, sizeof(SProjectOperatorInfo)); + SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); + if (pInfo == NULL || pOperator == NULL) { + goto _error; + } - pInfo->binfo.capacity = 4096; - pInfo->binfo.pRes = createOutputBuf_rv(pExprInfo, pInfo->binfo.capacity); + pInfo->binfo.pRes = pResBlock; pInfo->binfo.pCtx = createSqlFunctionCtx_rv(pExprInfo, &pInfo->binfo.rowCellInfoOffset); + if (pInfo->binfo.pCtx == NULL) { + goto _error; + } + // initResultRowInfo(&pBInfo->resultRowInfo, 8); // setFunctionResultOutput(pBInfo, MAIN_SCAN); - SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "ProjectOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PROJECT; pOperator->blockingOptr = false; @@ -7336,11 +7327,19 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SArray* pExp pOperator->numOfOutput = taosArrayGetSize(pExprInfo); pOperator->nextDataFn = doProjectOperation; - pOperator->pTaskInfo = pTaskInfo; pOperator->closeFn = destroyProjectOperatorInfo; + + pOperator->pTaskInfo = pTaskInfo; int32_t code = appendDownstream(pOperator, &downstream, 1); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } return pOperator; + + _error: + pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; + return NULL; } SColumnInfo* extractColumnFilterInfo(SExprInfo* pExpr, int32_t numOfOutput, int32_t* numOfFilterCols) { @@ -8047,11 +8046,11 @@ static SResSchema createResSchema(int32_t type, int32_t bytes, int32_t slotId, i return s; } -SArray* createExprInfo(SAggPhysiNode* pPhyNode) { - int32_t numOfAggFuncs = LIST_LENGTH(pPhyNode->pAggFuncs); +SArray* createExprInfo(SNodeList* pNodeList) { + int32_t numOfFuncs = LIST_LENGTH(pNodeList); - SArray* pArray = taosArrayInit(numOfAggFuncs, POINTER_BYTES); - for(int32_t i = 0; i < numOfAggFuncs; ++i) { + SArray* pArray = taosArrayInit(numOfFuncs, POINTER_BYTES); + for(int32_t i = 0; i < numOfFuncs; ++i) { SExprInfo* pExp = calloc(1, sizeof(SExprInfo)); pExp->pExpr = calloc(1, sizeof(tExprNode)); @@ -8063,31 +8062,46 @@ SArray* createExprInfo(SAggPhysiNode* pPhyNode) { pExp->base.pParam[0].pCol = calloc(1, sizeof(SColumn)); SColumn* pCol = pExp->base.pParam[0].pCol; - STargetNode* pTargetNode = (STargetNode*) nodesListGetNode(pPhyNode->pAggFuncs, i); + STargetNode* pTargetNode = (STargetNode*)nodesListGetNode(pNodeList, i); ASSERT(pTargetNode->slotId == i); - SFunctionNode* pFuncNode = (SFunctionNode*)pTargetNode->pExpr; + // it is a project query + if (nodeType(pTargetNode->pExpr) == QUERY_NODE_COLUMN) { + SColumnNode* pColNode = (SColumnNode*) pTargetNode->pExpr; - SDataType *pType = &pFuncNode->node.resType; - pExp->base.resSchema = createResSchema(pType->type, pType->bytes, pTargetNode->slotId, - pType->scale, pType->precision, pFuncNode->node.aliasName); + SDataType* pType = &pColNode->node.resType; + pExp->base.resSchema = createResSchema(pType->type, pType->bytes, pTargetNode->slotId, pType->scale, pType->precision, pColNode->colName); + pCol->slotId = pColNode->slotId; + pCol->bytes = pType->bytes; + pCol->type = pType->type; + pCol->scale = pType->scale; + pCol->precision = pType->precision; + } else { + SFunctionNode* pFuncNode = (SFunctionNode*)pTargetNode->pExpr; - pExp->pExpr->_function.pFunctNode = pFuncNode; - strncpy(pExp->pExpr->_function.functionName, pFuncNode->functionName, tListLen(pExp->pExpr->_function.functionName)); + SDataType* pType = &pFuncNode->node.resType; + pExp->base.resSchema = createResSchema(pType->type, pType->bytes, pTargetNode->slotId, pType->scale, + pType->precision, pFuncNode->node.aliasName); - // TODO: value parameter needs to be handled - int32_t numOfParam = LIST_LENGTH(pFuncNode->pParameterList); - for(int32_t j = 0; j < numOfParam; ++j) { - SNode* p1 = nodesListGetNode(pFuncNode->pParameterList, j); - SColumnNode* pcn = (SColumnNode*)p1; + pExp->pExpr->_function.pFunctNode = pFuncNode; + strncpy(pExp->pExpr->_function.functionName, pFuncNode->functionName, + tListLen(pExp->pExpr->_function.functionName)); - pCol->slotId = pcn->slotId; - pCol->bytes = pcn->node.resType.bytes; - pCol->type = pcn->node.resType.type; - pCol->scale = pcn->node.resType.scale; - pCol->precision = pcn->node.resType.precision; - pCol->dataBlockId = pcn->dataBlockId; + // TODO: value parameter needs to be handled + int32_t numOfParam = LIST_LENGTH(pFuncNode->pParameterList); + for (int32_t j = 0; j < numOfParam; ++j) { + SNode* p1 = nodesListGetNode(pFuncNode->pParameterList, j); + SColumnNode* pcn = (SColumnNode*)p1; + + pCol->slotId = pcn->slotId; + pCol->bytes = pcn->node.resType.bytes; + pCol->type = pcn->node.resType.type; + pCol->scale = pcn->node.resType.scale; + pCol->precision = pcn->node.resType.precision; + pCol->dataBlockId = pcn->dataBlockId; + } } + taosArrayPush(pArray, &pExp); } @@ -8115,9 +8129,9 @@ static SArray* extractTableIdList(const STableGroupInfo* pTableGroupInfo); static SArray* extractScanColumnId(SNodeList* pNodeList); SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, uint64_t queryId, uint64_t taskId, STableGroupInfo* pTableGroupInfo) { - if (nodeType(pPhyNode) == QUERY_NODE_PHYSICAL_PLAN_PROJECT) { // ignore the project node - pPhyNode = nodesListGetNode(pPhyNode->pChildren, 0); - } +// if (nodeType(pPhyNode) == QUERY_NODE_PHYSICAL_PLAN_PROJECT) { // ignore the project node +// pPhyNode = nodesListGetNode(pPhyNode->pChildren, 0); +// } if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) { if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == nodeType(pPhyNode)) { @@ -8158,7 +8172,17 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa } } - if (QUERY_NODE_PHYSICAL_PLAN_AGG == nodeType(pPhyNode)) { + if (QUERY_NODE_PHYSICAL_PLAN_PROJECT == nodeType(pPhyNode)) { + size_t size = LIST_LENGTH(pPhyNode->pChildren); + assert(size == 1); + + SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0); + SOperatorInfo* op = doCreateOperatorTreeNode(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo); + + SArray* pExprInfo = createExprInfo(((SProjectPhysiNode*)pPhyNode)->pProjections); + SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); + return createProjectOperatorInfo(op, pExprInfo, pResBlock, pTaskInfo); + } else if (QUERY_NODE_PHYSICAL_PLAN_AGG == nodeType(pPhyNode)) { size_t size = LIST_LENGTH(pPhyNode->pChildren); assert(size == 1); @@ -8166,7 +8190,7 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i); SOperatorInfo* op = doCreateOperatorTreeNode(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo); - SArray* pExprInfo = createExprInfo((SAggPhysiNode*)pPhyNode); + SArray* pExprInfo = createExprInfo(((SAggPhysiNode*)pPhyNode)->pAggFuncs); SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); return createAggregateOperatorInfo(op, pExprInfo, pResBlock, pTaskInfo, pTableGroupInfo); } diff --git a/source/libs/executor/test/executorTests.cpp b/source/libs/executor/test/executorTests.cpp index b2c5b9ae14..bf581c81a0 100644 --- a/source/libs/executor/test/executorTests.cpp +++ b/source/libs/executor/test/executorTests.cpp @@ -201,9 +201,9 @@ SOperatorInfo* createDummyOperator(int32_t startVal, int32_t numOfBlocks, int32_ pOperator->name = "dummyInputOpertor4Test"; if (numOfCols == 1) { - pOperator->getNextFn = getDummyBlock; + pOperator->nextDataFn = getDummyBlock; } else { - pOperator->getNextFn = get2ColsDummyBlock; + pOperator->nextDataFn = get2ColsDummyBlock; } SDummyInputInfo *pInfo = (SDummyInputInfo*) calloc(1, sizeof(SDummyInputInfo));