[td-13039] refactor.

This commit is contained in:
Haojun Liao 2022-03-18 18:02:00 +08:00
parent 0354ac2f33
commit 46242e8a43
6 changed files with 182 additions and 137 deletions

View File

@ -118,7 +118,7 @@ static const SInfosTableSchema userUsersSchema[] = {{.name = "name", .
static const SInfosTableSchema vgroupsSchema[] = {{.name = "vgroup_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, static const SInfosTableSchema vgroupsSchema[] = {{.name = "vgroup_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
{.name = "db_name", .bytes = (TSDB_DB_NAME_LEN - 1) + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, {.name = "db_name", .bytes = (TSDB_DB_NAME_LEN - 1) + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
{.name = "tables", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, {.name = "tables", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
{.name = "status", .bytes = 10, .type = TSDB_DATA_TYPE_BINARY}, {.name = "status", .bytes = 12 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
{.name = "onlines", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, {.name = "onlines", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
{.name = "v1_dnode", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, {.name = "v1_dnode", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
{.name = "v1_status", .bytes = 10, .type = TSDB_DATA_TYPE_BINARY}, {.name = "v1_status", .bytes = 10, .type = TSDB_DATA_TYPE_BINARY},

View File

@ -701,7 +701,7 @@ static int32_t mndRetrieveVnodes(SMnodeMsg *pReq, SShowObj *pShow, char *data, i
SVgObj *pVgroup = NULL; SVgObj *pVgroup = NULL;
char *pWrite; char *pWrite;
int32_t cols = 0; int32_t cols = 0;
int32_t dnodeId = pShow->replica; // int32_t dnodeId = pShow->replica;
while (numOfRows < rows) { while (numOfRows < rows) {
pShow->pIter = sdbFetch(pSdb, SDB_VGROUP, pShow->pIter, (void **)&pVgroup); pShow->pIter = sdbFetch(pSdb, SDB_VGROUP, pShow->pIter, (void **)&pVgroup);
@ -709,17 +709,33 @@ static int32_t mndRetrieveVnodes(SMnodeMsg *pReq, SShowObj *pShow, char *data, i
for (int32_t i = 0; i < pVgroup->replica && numOfRows < rows; ++i) { for (int32_t i = 0; i < pVgroup->replica && numOfRows < rows; ++i) {
SVnodeGid *pVgid = &pVgroup->vnodeGid[i]; SVnodeGid *pVgid = &pVgroup->vnodeGid[i];
if (pVgid->dnodeId != dnodeId) continue;
cols = 0; cols = 0;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(uint32_t *)pWrite = pVgroup->vgId; *(uint32_t *)pWrite = pVgroup->vgId;
cols++; cols++;
SName name = {0};
char db[TSDB_DB_NAME_LEN] = {0};
tNameFromString(&name, pVgroup->dbName, T_NAME_ACCT|T_NAME_DB);
tNameGetDbName(&name, db);
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
STR_TO_VARSTR(pWrite, db);
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(uint32_t *)pWrite = 0; //todo: Tables
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
STR_TO_VARSTR(pWrite, mndGetRoleStr(pVgid->role)); STR_TO_VARSTR(pWrite, mndGetRoleStr(pVgid->role));
cols++; cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(uint32_t *)pWrite = pVgroup->replica; //onlines
cols++;
numOfRows++; numOfRows++;
} }

View File

@ -493,7 +493,8 @@ typedef struct SAggOperatorInfo {
typedef struct SProjectOperatorInfo { typedef struct SProjectOperatorInfo {
SOptrBasicInfo binfo; SOptrBasicInfo binfo;
SSDataBlock* existDataBlock; SSDataBlock *existDataBlock;
int32_t threshold;
} SProjectOperatorInfo; } SProjectOperatorInfo;
typedef struct SLimitOperatorInfo { typedef struct SLimitOperatorInfo {
@ -626,7 +627,7 @@ SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order,
SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv); SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv);
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo); SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataBlock* pResBlock, const SName* pName, SNode* pCondition, SEpSet epset, SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataBlock* pResBlock, const SName* pName, SNode* pCondition, SEpSet epset,
SExecTaskInfo* pTaskInfo); SExecTaskInfo* pTaskInfo);

View File

@ -15,11 +15,12 @@
#include "dataSinkInt.h" #include "dataSinkInt.h"
#include "dataSinkMgt.h" #include "dataSinkMgt.h"
#include "executorimpl.h"
#include "planner.h" #include "planner.h"
#include "tcompression.h" #include "tcompression.h"
#include "tglobal.h" #include "tglobal.h"
#include "tqueue.h" #include "tqueue.h"
#include "executorimpl.h" #include "tdatablock.h"
typedef struct SDataDispatchBuf { typedef struct SDataDispatchBuf {
int32_t useSize; int32_t useSize;
@ -84,8 +85,11 @@ static void copyData(const SInputData* pInput, const SDataBlockDescNode* pSchema
*compLen += compSizes[col]; *compLen += compSizes[col];
compSizes[col] = htonl(compSizes[col]); compSizes[col] = htonl(compSizes[col]);
} else { } else {
memmove(data, pColRes->pData, pColRes->info.bytes * pInput->pData->info.rows); for(int32_t i = 0; i < pInput->pData->info.rows; ++i) {
data += pColRes->info.bytes * pInput->pData->info.rows; char* pData = colDataGetData(pColRes, i);
memmove(data, pData, pColRes->info.bytes);
data += pColRes->info.bytes;
}
} }
} }
} }

View File

@ -1183,13 +1183,6 @@ static void doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx,
ASSERT(pCtx[i].input.pData[0] != NULL); ASSERT(pCtx[i].input.pData[0] != NULL);
// if (pCtx[i].functionId < 0) {
// SColumnInfoData* tsInfo = taosArrayGet(pBlock->pDataBlock, 0);
// pCtx[i].ptsList = (int64_t*) tsInfo->pData;
// continue;
// }
// uint32_t status = aAggs[pCtx[i].functionId].status; // uint32_t status = aAggs[pCtx[i].functionId].status;
// if ((status & (FUNCSTATE_SELECTIVITY | FUNCSTATE_NEED_TS)) != 0) { // if ((status & (FUNCSTATE_SELECTIVITY | FUNCSTATE_NEED_TS)) != 0) {
// SColumnInfoData* tsInfo = taosArrayGet(pBlock->pDataBlock, 0); // SColumnInfoData* tsInfo = taosArrayGet(pBlock->pDataBlock, 0);
@ -1224,27 +1217,17 @@ static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SqlFunction
} }
} }
static void projectApplyFunctions(STaskRuntimeEnv *pRuntimeEnv, SqlFunctionCtx *pCtx, int32_t numOfOutput) { static void projectApplyFunctions(SSDataBlock* pResult, SqlFunctionCtx *pCtx, int32_t numOfOutput) {
STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
for (int32_t k = 0; k < numOfOutput; ++k) { for (int32_t k = 0; k < numOfOutput; ++k) {
pCtx[k].startTs = pQueryAttr->window.skey; if (pCtx[k].fpSet.init == NULL) { // it is a project query
SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, k);
memcpy(pColInfoData->pData, pCtx[k].input.pData[0]->pData, colDataGetLength(pColInfoData, pCtx[k].input.numOfRows));
} else { // TODO: arithmetic and other process.
// Always set the asc order for merge stage process
if (pCtx[k].currentStage == MERGE_STAGE) {
pCtx[k].order = TSDB_ORDER_ASC;
}
pCtx[k].startTs = pQueryAttr->window.skey;
if (pCtx[k].functionId < 0) {
// load the script and exec
// SUdfInfo* pUdfInfo = pRuntimeEnv->pUdfInfo;
// doInvokeUdf(pUdfInfo, &pCtx[k], 0, TSDB_UDF_FUNC_NORMAL);
// } else {
// aAggs[pCtx[k].functionId].xFunction(&pCtx[k]);
} }
} }
pResult->info.rows = pCtx[0].input.numOfRows;
} }
void doTimeWindowInterpolation(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo, SArray* pDataBlock, TSKEY prevTs, void doTimeWindowInterpolation(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo, SArray* pDataBlock, TSKEY prevTs,
@ -2049,9 +2032,14 @@ static SqlFunctionCtx* createSqlFunctionCtx_rv(SArray* pExprInfo, int32_t** rowC
SExprBasicInfo *pFunct = &pExpr->base; SExprBasicInfo *pFunct = &pExpr->base;
SqlFunctionCtx* pCtx = &pFuncCtx[i]; SqlFunctionCtx* pCtx = &pFuncCtx[i];
fmGetFuncExecFuncs(pExpr->pExpr->_function.pFunctNode->funcId, &pCtx->fpSet); if (pExpr->pExpr->_function.pFunctNode != NULL) {
pCtx->input.numOfInputCols = pFunct->numOfParams; SFuncExecEnv env = {0};
fmGetFuncExecFuncs(pExpr->pExpr->_function.pFunctNode->funcId, &pCtx->fpSet);
pCtx->fpSet.getEnv(pExpr->pExpr->_function.pFunctNode, &env);
pCtx->resDataInfo.interBufSize = env.calcMemSize;
}
pCtx->input.numOfInputCols = pFunct->numOfParams;
pCtx->input.pData = calloc(pFunct->numOfParams, POINTER_BYTES); pCtx->input.pData = calloc(pFunct->numOfParams, POINTER_BYTES);
pCtx->input.pColumnDataAgg = calloc(pFunct->numOfParams, POINTER_BYTES); pCtx->input.pColumnDataAgg = calloc(pFunct->numOfParams, POINTER_BYTES);
@ -2061,10 +2049,6 @@ static SqlFunctionCtx* createSqlFunctionCtx_rv(SArray* pExprInfo, int32_t** rowC
pCtx->order = TSDB_ORDER_ASC; pCtx->order = TSDB_ORDER_ASC;
pCtx->start.key = INT64_MIN; pCtx->start.key = INT64_MIN;
pCtx->end.key = INT64_MIN; pCtx->end.key = INT64_MIN;
SFuncExecEnv env = {0};
pCtx->fpSet.getEnv(pExpr->pExpr->_function.pFunctNode, &env);
pCtx->resDataInfo.interBufSize = env.calcMemSize;
#if 0 #if 0
for (int32_t j = 0; j < pCtx->numOfParams; ++j) { for (int32_t j = 0; j < pCtx->numOfParams; ++j) {
// int16_t type = pFunct->param[j].nType; // int16_t type = pFunct->param[j].nType;
@ -5423,6 +5407,47 @@ static int32_t loadSysTableContentCb(void* param, const SDataBuf* pMsg, int32_t
tsem_post(&pScanResInfo->ready); tsem_post(&pScanResInfo->ready);
} }
static SSDataBlock* doFilterResult(SSysTableScanInfo* pInfo) {
if (pInfo->pCondition == NULL) {
return pInfo->pRes->info.rows == 0? NULL:pInfo->pRes;
}
SFilterInfo* filter = NULL;
int32_t code = filterInitFromNode(pInfo->pCondition, &filter, 0);
SFilterColumnParam param1 = {.numOfCols = pInfo->pRes->info.numOfCols, .pDataBlock = pInfo->pRes->pDataBlock};
code = filterSetDataFromSlotId(filter, &param1);
int8_t* rowRes = NULL;
bool keep = filterExecute(filter, pInfo->pRes, &rowRes, NULL, param1.numOfCols);
SSDataBlock* px = createOneDataBlock(pInfo->pRes);
blockDataEnsureCapacity(px, pInfo->pRes->info.rows);
//TODO refactor
int32_t numOfRow = 0;
for (int32_t i = 0; i < pInfo->pRes->info.numOfCols; ++i) {
SColumnInfoData* pDest = taosArrayGet(px->pDataBlock, i);
SColumnInfoData* pSrc = taosArrayGet(pInfo->pRes->pDataBlock, i);
numOfRow = 0;
for (int32_t j = 0; j < pInfo->pRes->info.rows; ++j) {
if (rowRes[j] == 0) {
continue;
}
colDataAppend(pDest, numOfRow, colDataGetData(pSrc, j), false);
numOfRow += 1;
}
}
px->info.rows = numOfRow;
pInfo->pRes = px;
return pInfo->pRes->info.rows == 0? NULL:pInfo->pRes;
}
static SSDataBlock* doSysTableScan(void* param, bool* newgroup) { static SSDataBlock* doSysTableScan(void* param, bool* newgroup) {
// build message and send to mnode to fetch the content of system tables. // build message and send to mnode to fetch the content of system tables.
SOperatorInfo* pOperator = (SOperatorInfo*) param; SOperatorInfo* pOperator = (SOperatorInfo*) param;
@ -5457,7 +5482,6 @@ static SSDataBlock* doSysTableScan(void* param, bool* newgroup) {
int64_t startTs = taosGetTimestampUs(); int64_t startTs = taosGetTimestampUs();
pInfo->req.type = pInfo->type; pInfo->req.type = pInfo->type;
// tNameGetFullDbName(&pInfo->name, pInfo->req.db);
strncpy(pInfo->req.tb, tNameGetTableName(&pInfo->name), tListLen(pInfo->req.tb)); strncpy(pInfo->req.tb, tNameGetTableName(&pInfo->name), tListLen(pInfo->req.tb));
int32_t contLen = tSerializeSRetrieveTableReq(NULL, 0, &pInfo->req); int32_t contLen = tSerializeSRetrieveTableReq(NULL, 0, &pInfo->req);
@ -5496,42 +5520,7 @@ static SSDataBlock* doSysTableScan(void* param, bool* newgroup) {
setSDataBlockFromFetchRsp(pInfo->pRes, &pInfo->loadInfo, pTableRsp->numOfRows, setSDataBlockFromFetchRsp(pInfo->pRes, &pInfo->loadInfo, pTableRsp->numOfRows,
pTableRsp->data, pTableRsp->compLen, pOperator->numOfOutput, startTs, NULL); pTableRsp->data, pTableRsp->compLen, pOperator->numOfOutput, startTs, NULL);
// do filter the qualified results return doFilterResult(pInfo);
{
SFilterInfo *filter = NULL;
code = filterInitFromNode(pInfo->pCondition, &filter, 0);
SFilterColumnParam param1 = {.numOfCols= pInfo->pRes->info.numOfCols, .pDataBlock = pInfo->pRes->pDataBlock};
code = filterSetDataFromSlotId(filter, &param1);
int8_t *rowRes = NULL;
bool keep = filterExecute(filter, pInfo->pRes, &rowRes, NULL, param1.numOfCols);
printf("%d, %d\n", rowRes[0], rowRes[1]);
SSDataBlock* px = createOneDataBlock(pInfo->pRes);
blockDataEnsureCapacity(px, pInfo->pRes->info.rows);
int32_t numOfRow = 0;
for(int32_t i = 0; i < pInfo->pRes->info.numOfCols; ++i) {
SColumnInfoData* pDest = taosArrayGet(px->pDataBlock, i);
SColumnInfoData* pSrc = taosArrayGet(pInfo->pRes->pDataBlock, i);
numOfRow = 0;
for(int32_t j = 0; j < pInfo->pRes->info.rows; ++j) {
if (rowRes[j] == 0) {
continue;
}
colDataAppend(pDest, numOfRow, colDataGetData(pSrc, j), false);
numOfRow += 1;
}
}
px->info.rows = numOfRow;
pInfo->pRes = px;
}
return pInfo->pRes;
} }
return NULL; return NULL;
@ -6331,54 +6320,50 @@ static SSDataBlock* doMultiTableAggregate(void* param, bool* newgroup) {
} }
static SSDataBlock* doProjectOperation(void* param, bool* newgroup) { static SSDataBlock* doProjectOperation(void* param, bool* newgroup) {
SOperatorInfo* pOperator = (SOperatorInfo*) param; SOperatorInfo* pOperator = param;
SProjectOperatorInfo* pProjectInfo = pOperator->info; SProjectOperatorInfo* pProjectInfo = pOperator->info;
STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
SOptrBasicInfo *pInfo = &pProjectInfo->binfo; SOptrBasicInfo *pInfo = &pProjectInfo->binfo;
SSDataBlock* pRes = pInfo->pRes; SSDataBlock* pRes = pInfo->pRes;
int32_t order = pRuntimeEnv->pQueryAttr->order.order; blockDataClearup(pRes, true);
pRes->info.rows = 0;
if (pProjectInfo->existDataBlock) { // TODO refactor if (pProjectInfo->existDataBlock) { // TODO refactor
STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current;
SSDataBlock* pBlock = pProjectInfo->existDataBlock; SSDataBlock* pBlock = pProjectInfo->existDataBlock;
pProjectInfo->existDataBlock = NULL; pProjectInfo->existDataBlock = NULL;
*newgroup = true; *newgroup = true;
// todo dynamic set tags // todo dynamic set tags
if (pTableQueryInfo != NULL) { // if (pTableQueryInfo != NULL) {
// setTagValue(pOperator, pTableQueryInfo->pTable, pInfo->pCtx, pOperator->numOfOutput); // setTagValue(pOperator, pTableQueryInfo->pTable, pInfo->pCtx, pOperator->numOfOutput);
} // }
// the pDataBlock are always the same one, no need to call this again // the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order); setInputDataBlock(pOperator, pInfo->pCtx, pBlock, TSDB_ORDER_ASC);
updateOutputBuf(pInfo, &pInfo->capacity, pBlock->info.rows);
projectApplyFunctions(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput); blockDataEnsureCapacity(pInfo->pRes, pBlock->info.rows);
projectApplyFunctions(pInfo->pRes, pInfo->pCtx, pOperator->numOfOutput);
pRes->info.rows = getNumOfResult(pInfo->pCtx, pOperator->numOfOutput, NULL); pRes->info.rows = getNumOfResult(pInfo->pCtx, pOperator->numOfOutput, NULL);
if (pRes->info.rows >= pRuntimeEnv->resultInfo.threshold) { if (pRes->info.rows >= pProjectInfo->binfo.capacity*0.8) {
copyTsColoum(pRes, pInfo->pCtx, pOperator->numOfOutput); copyTsColoum(pRes, pInfo->pCtx, pOperator->numOfOutput);
resetResultRowEntryResult(pInfo->pCtx, pOperator->numOfOutput); resetResultRowEntryResult(pInfo->pCtx, pOperator->numOfOutput);
return pRes; return pRes;
} }
} }
SOperatorInfo* downstream = pOperator->pDownstream[0];
while(1) { while(1) {
bool prevVal = *newgroup; bool prevVal = *newgroup;
// The downstream exec may change the value of the newgroup, so use a local variable instead. // The downstream exec may change the value of the newgroup, so use a local variable instead.
publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = pOperator->pDownstream[0]->nextDataFn(pOperator->pDownstream[0], newgroup); SSDataBlock* pBlock = downstream->nextDataFn(downstream, newgroup);
publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC); publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) { if (pBlock == NULL) {
assert(*newgroup == false); assert(*newgroup == false);
*newgroup = prevVal; *newgroup = prevVal;
setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED); setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
break; break;
@ -6397,25 +6382,25 @@ static SSDataBlock* doProjectOperation(void* param, bool* newgroup) {
} }
} }
STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current;
// todo dynamic set tags // todo dynamic set tags
if (pTableQueryInfo != NULL) {
// setTagValue(pOperator, pTableQueryInfo->pTable, pInfo->pCtx, pOperator->numOfOutput); // STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current;
} // if (pTableQueryInfo != NULL) {
// setTagValue(pOperator, pTableQueryInfo->pTable, pInfo->pCtx, pOperator->numOfOutput);
// }
// the pDataBlock are always the same one, no need to call this again // the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order); setInputDataBlock(pOperator, pInfo->pCtx, pBlock, TSDB_ORDER_ASC);
updateOutputBuf(pInfo, &pInfo->capacity, pBlock->info.rows); updateOutputBuf(pInfo, &pInfo->capacity, pBlock->info.rows);
projectApplyFunctions(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput); projectApplyFunctions(pInfo->pRes, pInfo->pCtx, pOperator->numOfOutput);
pRes->info.rows = getNumOfResult(pInfo->pCtx, pOperator->numOfOutput, NULL); if (pRes->info.rows >= pProjectInfo->threshold) {
if (pRes->info.rows >= 1000/*pRuntimeEnv->resultInfo.threshold*/) {
break; break;
} }
} }
copyTsColoum(pRes, pInfo->pCtx, pOperator->numOfOutput); copyTsColoum(pRes, pInfo->pCtx, pOperator->numOfOutput);
resetResultRowEntryResult(pInfo->pCtx, pOperator->numOfOutput); // resetResultRowEntryResult(pInfo->pCtx, pOperator->numOfOutput);
return (pInfo->pRes->info.rows > 0)? pInfo->pRes:NULL; return (pInfo->pRes->info.rows > 0)? pInfo->pRes:NULL;
} }
@ -7317,16 +7302,22 @@ SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SArray
return pOperator; return pOperator;
} }
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo) { SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo) {
SProjectOperatorInfo* pInfo = calloc(1, sizeof(SProjectOperatorInfo)); SProjectOperatorInfo* pInfo = calloc(1, sizeof(SProjectOperatorInfo));
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
goto _error;
}
pInfo->binfo.capacity = 4096; pInfo->binfo.pRes = pResBlock;
pInfo->binfo.pRes = createOutputBuf_rv(pExprInfo, pInfo->binfo.capacity);
pInfo->binfo.pCtx = createSqlFunctionCtx_rv(pExprInfo, &pInfo->binfo.rowCellInfoOffset); pInfo->binfo.pCtx = createSqlFunctionCtx_rv(pExprInfo, &pInfo->binfo.rowCellInfoOffset);
if (pInfo->binfo.pCtx == NULL) {
goto _error;
}
// initResultRowInfo(&pBInfo->resultRowInfo, 8); // initResultRowInfo(&pBInfo->resultRowInfo, 8);
// setFunctionResultOutput(pBInfo, MAIN_SCAN); // setFunctionResultOutput(pBInfo, MAIN_SCAN);
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "ProjectOperator"; pOperator->name = "ProjectOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PROJECT; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PROJECT;
pOperator->blockingOptr = false; pOperator->blockingOptr = false;
@ -7336,11 +7327,19 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SArray* pExp
pOperator->numOfOutput = taosArrayGetSize(pExprInfo); pOperator->numOfOutput = taosArrayGetSize(pExprInfo);
pOperator->nextDataFn = doProjectOperation; pOperator->nextDataFn = doProjectOperation;
pOperator->pTaskInfo = pTaskInfo;
pOperator->closeFn = destroyProjectOperatorInfo; pOperator->closeFn = destroyProjectOperatorInfo;
pOperator->pTaskInfo = pTaskInfo;
int32_t code = appendDownstream(pOperator, &downstream, 1); int32_t code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
return pOperator; return pOperator;
_error:
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
} }
SColumnInfo* extractColumnFilterInfo(SExprInfo* pExpr, int32_t numOfOutput, int32_t* numOfFilterCols) { SColumnInfo* extractColumnFilterInfo(SExprInfo* pExpr, int32_t numOfOutput, int32_t* numOfFilterCols) {
@ -8047,11 +8046,11 @@ static SResSchema createResSchema(int32_t type, int32_t bytes, int32_t slotId, i
return s; return s;
} }
SArray* createExprInfo(SAggPhysiNode* pPhyNode) { SArray* createExprInfo(SNodeList* pNodeList) {
int32_t numOfAggFuncs = LIST_LENGTH(pPhyNode->pAggFuncs); int32_t numOfFuncs = LIST_LENGTH(pNodeList);
SArray* pArray = taosArrayInit(numOfAggFuncs, POINTER_BYTES); SArray* pArray = taosArrayInit(numOfFuncs, POINTER_BYTES);
for(int32_t i = 0; i < numOfAggFuncs; ++i) { for(int32_t i = 0; i < numOfFuncs; ++i) {
SExprInfo* pExp = calloc(1, sizeof(SExprInfo)); SExprInfo* pExp = calloc(1, sizeof(SExprInfo));
pExp->pExpr = calloc(1, sizeof(tExprNode)); pExp->pExpr = calloc(1, sizeof(tExprNode));
@ -8063,31 +8062,46 @@ SArray* createExprInfo(SAggPhysiNode* pPhyNode) {
pExp->base.pParam[0].pCol = calloc(1, sizeof(SColumn)); pExp->base.pParam[0].pCol = calloc(1, sizeof(SColumn));
SColumn* pCol = pExp->base.pParam[0].pCol; SColumn* pCol = pExp->base.pParam[0].pCol;
STargetNode* pTargetNode = (STargetNode*) nodesListGetNode(pPhyNode->pAggFuncs, i); STargetNode* pTargetNode = (STargetNode*)nodesListGetNode(pNodeList, i);
ASSERT(pTargetNode->slotId == i); ASSERT(pTargetNode->slotId == i);
SFunctionNode* pFuncNode = (SFunctionNode*)pTargetNode->pExpr; // it is a project query
if (nodeType(pTargetNode->pExpr) == QUERY_NODE_COLUMN) {
SColumnNode* pColNode = (SColumnNode*) pTargetNode->pExpr;
SDataType *pType = &pFuncNode->node.resType; SDataType* pType = &pColNode->node.resType;
pExp->base.resSchema = createResSchema(pType->type, pType->bytes, pTargetNode->slotId, pExp->base.resSchema = createResSchema(pType->type, pType->bytes, pTargetNode->slotId, pType->scale, pType->precision, pColNode->colName);
pType->scale, pType->precision, pFuncNode->node.aliasName); pCol->slotId = pColNode->slotId;
pCol->bytes = pType->bytes;
pCol->type = pType->type;
pCol->scale = pType->scale;
pCol->precision = pType->precision;
} else {
SFunctionNode* pFuncNode = (SFunctionNode*)pTargetNode->pExpr;
pExp->pExpr->_function.pFunctNode = pFuncNode; SDataType* pType = &pFuncNode->node.resType;
strncpy(pExp->pExpr->_function.functionName, pFuncNode->functionName, tListLen(pExp->pExpr->_function.functionName)); pExp->base.resSchema = createResSchema(pType->type, pType->bytes, pTargetNode->slotId, pType->scale,
pType->precision, pFuncNode->node.aliasName);
// TODO: value parameter needs to be handled pExp->pExpr->_function.pFunctNode = pFuncNode;
int32_t numOfParam = LIST_LENGTH(pFuncNode->pParameterList); strncpy(pExp->pExpr->_function.functionName, pFuncNode->functionName,
for(int32_t j = 0; j < numOfParam; ++j) { tListLen(pExp->pExpr->_function.functionName));
SNode* p1 = nodesListGetNode(pFuncNode->pParameterList, j);
SColumnNode* pcn = (SColumnNode*)p1;
pCol->slotId = pcn->slotId; // TODO: value parameter needs to be handled
pCol->bytes = pcn->node.resType.bytes; int32_t numOfParam = LIST_LENGTH(pFuncNode->pParameterList);
pCol->type = pcn->node.resType.type; for (int32_t j = 0; j < numOfParam; ++j) {
pCol->scale = pcn->node.resType.scale; SNode* p1 = nodesListGetNode(pFuncNode->pParameterList, j);
pCol->precision = pcn->node.resType.precision; SColumnNode* pcn = (SColumnNode*)p1;
pCol->dataBlockId = pcn->dataBlockId;
pCol->slotId = pcn->slotId;
pCol->bytes = pcn->node.resType.bytes;
pCol->type = pcn->node.resType.type;
pCol->scale = pcn->node.resType.scale;
pCol->precision = pcn->node.resType.precision;
pCol->dataBlockId = pcn->dataBlockId;
}
} }
taosArrayPush(pArray, &pExp); taosArrayPush(pArray, &pExp);
} }
@ -8115,9 +8129,9 @@ static SArray* extractTableIdList(const STableGroupInfo* pTableGroupInfo);
static SArray* extractScanColumnId(SNodeList* pNodeList); static SArray* extractScanColumnId(SNodeList* pNodeList);
SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, uint64_t queryId, uint64_t taskId, STableGroupInfo* pTableGroupInfo) { SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, uint64_t queryId, uint64_t taskId, STableGroupInfo* pTableGroupInfo) {
if (nodeType(pPhyNode) == QUERY_NODE_PHYSICAL_PLAN_PROJECT) { // ignore the project node // if (nodeType(pPhyNode) == QUERY_NODE_PHYSICAL_PLAN_PROJECT) { // ignore the project node
pPhyNode = nodesListGetNode(pPhyNode->pChildren, 0); // pPhyNode = nodesListGetNode(pPhyNode->pChildren, 0);
} // }
if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) { if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) {
if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == nodeType(pPhyNode)) { if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == nodeType(pPhyNode)) {
@ -8158,7 +8172,17 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa
} }
} }
if (QUERY_NODE_PHYSICAL_PLAN_AGG == nodeType(pPhyNode)) { if (QUERY_NODE_PHYSICAL_PLAN_PROJECT == nodeType(pPhyNode)) {
size_t size = LIST_LENGTH(pPhyNode->pChildren);
assert(size == 1);
SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0);
SOperatorInfo* op = doCreateOperatorTreeNode(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo);
SArray* pExprInfo = createExprInfo(((SProjectPhysiNode*)pPhyNode)->pProjections);
SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc);
return createProjectOperatorInfo(op, pExprInfo, pResBlock, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_AGG == nodeType(pPhyNode)) {
size_t size = LIST_LENGTH(pPhyNode->pChildren); size_t size = LIST_LENGTH(pPhyNode->pChildren);
assert(size == 1); assert(size == 1);
@ -8166,7 +8190,7 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa
SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i); SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i);
SOperatorInfo* op = doCreateOperatorTreeNode(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo); SOperatorInfo* op = doCreateOperatorTreeNode(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo);
SArray* pExprInfo = createExprInfo((SAggPhysiNode*)pPhyNode); SArray* pExprInfo = createExprInfo(((SAggPhysiNode*)pPhyNode)->pAggFuncs);
SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc);
return createAggregateOperatorInfo(op, pExprInfo, pResBlock, pTaskInfo, pTableGroupInfo); return createAggregateOperatorInfo(op, pExprInfo, pResBlock, pTaskInfo, pTableGroupInfo);
} }

View File

@ -201,9 +201,9 @@ SOperatorInfo* createDummyOperator(int32_t startVal, int32_t numOfBlocks, int32_
pOperator->name = "dummyInputOpertor4Test"; pOperator->name = "dummyInputOpertor4Test";
if (numOfCols == 1) { if (numOfCols == 1) {
pOperator->getNextFn = getDummyBlock; pOperator->nextDataFn = getDummyBlock;
} else { } else {
pOperator->getNextFn = get2ColsDummyBlock; pOperator->nextDataFn = get2ColsDummyBlock;
} }
SDummyInputInfo *pInfo = (SDummyInputInfo*) calloc(1, sizeof(SDummyInputInfo)); SDummyInputInfo *pInfo = (SDummyInputInfo*) calloc(1, sizeof(SDummyInputInfo));