|
|
|
@ -1241,17 +1241,25 @@ static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SqlFunction
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void projectApplyFunctions(SSDataBlock* pResult, SqlFunctionCtx *pCtx, int32_t numOfOutput) {
|
|
|
|
|
static void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx *pCtx, int32_t numOfOutput) {
|
|
|
|
|
for (int32_t k = 0; k < numOfOutput; ++k) {
|
|
|
|
|
if (pCtx[k].fpSet.init == NULL) { // it is a project query
|
|
|
|
|
if (pExpr[k].base.type == QUERY_NODE_COLUMN) { // it is a project query
|
|
|
|
|
SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, k);
|
|
|
|
|
colDataAssign(pColInfoData, pCtx[k].input.pData[0], pCtx[k].input.numOfRows);
|
|
|
|
|
} else { // TODO: arithmetic and other process.
|
|
|
|
|
|
|
|
|
|
pResult->info.rows = pCtx[0].input.numOfRows;
|
|
|
|
|
} else if (pExpr[k].base.type == QUERY_NODE_OPERATOR) {
|
|
|
|
|
SArray* pBlockList = taosArrayInit(4, POINTER_BYTES);
|
|
|
|
|
taosArrayPush(pBlockList, &pSrcBlock);
|
|
|
|
|
|
|
|
|
|
SScalarParam p = {.numOfRows = pSrcBlock->info.rows};
|
|
|
|
|
p.columnData = taosArrayGet(pResult->pDataBlock, k);
|
|
|
|
|
scalarCalculate(pExpr[k].pExpr->_optrRoot.pRootNode, pBlockList, &p);
|
|
|
|
|
pResult->info.rows = p.numOfRows;
|
|
|
|
|
} else {
|
|
|
|
|
ASSERT(0);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pResult->info.rows = pCtx[0].input.numOfRows;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void doTimeWindowInterpolation(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo, SArray* pDataBlock, TSKEY prevTs,
|
|
|
|
@ -2008,107 +2016,6 @@ static int32_t setCtxTagColumnInfo(SqlFunctionCtx *pCtx, int32_t numOfOutput) {
|
|
|
|
|
return TSDB_CODE_SUCCESS;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static SqlFunctionCtx* createSqlFunctionCtx(STaskRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput,
|
|
|
|
|
int32_t** rowCellInfoOffset) {
|
|
|
|
|
STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
|
|
|
|
|
|
|
|
|
|
SqlFunctionCtx * pFuncCtx = (SqlFunctionCtx *)calloc(numOfOutput, sizeof(SqlFunctionCtx));
|
|
|
|
|
if (pFuncCtx == NULL) {
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
*rowCellInfoOffset = calloc(numOfOutput, sizeof(int32_t));
|
|
|
|
|
if (*rowCellInfoOffset == 0) {
|
|
|
|
|
tfree(pFuncCtx);
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for (int32_t i = 0; i < numOfOutput; ++i) {
|
|
|
|
|
SExprBasicInfo *pFunct = &pExpr[i].base;
|
|
|
|
|
SqlFunctionCtx* pCtx = &pFuncCtx[i];
|
|
|
|
|
#if 0
|
|
|
|
|
SColIndex *pIndex = &pFunct->colInfo;
|
|
|
|
|
|
|
|
|
|
if (TSDB_COL_REQ_NULL(pIndex->flag)) {
|
|
|
|
|
pCtx->requireNull = true;
|
|
|
|
|
pIndex->flag &= ~(TSDB_COL_NULL);
|
|
|
|
|
} else {
|
|
|
|
|
pCtx->requireNull = false;
|
|
|
|
|
}
|
|
|
|
|
#endif
|
|
|
|
|
// pCtx->inputBytes = pFunct->colBytes;
|
|
|
|
|
// pCtx->inputType = pFunct->colType;
|
|
|
|
|
|
|
|
|
|
pCtx->ptsOutputBuf = NULL;
|
|
|
|
|
|
|
|
|
|
pCtx->resDataInfo.bytes = pFunct->resSchema.bytes;
|
|
|
|
|
pCtx->resDataInfo.type = pFunct->resSchema.type;
|
|
|
|
|
|
|
|
|
|
pCtx->order = pQueryAttr->order.order;
|
|
|
|
|
// pCtx->functionId = pFunct->functionId;
|
|
|
|
|
pCtx->stableQuery = pQueryAttr->stableQuery;
|
|
|
|
|
// pCtx->resDataInfo.interBufSize = pFunct->interBytes;
|
|
|
|
|
pCtx->start.key = INT64_MIN;
|
|
|
|
|
pCtx->end.key = INT64_MIN;
|
|
|
|
|
|
|
|
|
|
pCtx->numOfParams = pFunct->numOfParams;
|
|
|
|
|
for (int32_t j = 0; j < pCtx->numOfParams; ++j) {
|
|
|
|
|
int16_t type = pFunct->pParam[j].param.nType;
|
|
|
|
|
int16_t bytes = pFunct->pParam[j].param.nType;
|
|
|
|
|
|
|
|
|
|
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
|
|
|
|
|
// taosVariantCreateFromBinary(&pCtx->param[j], pFunct->param[j].pz, bytes, type);
|
|
|
|
|
} else {
|
|
|
|
|
// taosVariantCreateFromBinary(&pCtx->param[j], (char *)&pFunct->param[j].i, bytes, type);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// set the order information for top/bottom query
|
|
|
|
|
int32_t functionId = pCtx->functionId;
|
|
|
|
|
|
|
|
|
|
if (functionId == FUNCTION_TOP || functionId == FUNCTION_BOTTOM || functionId == FUNCTION_DIFF) {
|
|
|
|
|
int32_t f = getExprFunctionId(&pExpr[0]);
|
|
|
|
|
assert(f == FUNCTION_TS || f == FUNCTION_TS_DUMMY);
|
|
|
|
|
|
|
|
|
|
pCtx->param[2].i = pQueryAttr->order.order;
|
|
|
|
|
pCtx->param[2].nType = TSDB_DATA_TYPE_BIGINT;
|
|
|
|
|
pCtx->param[3].i = functionId;
|
|
|
|
|
pCtx->param[3].nType = TSDB_DATA_TYPE_BIGINT;
|
|
|
|
|
|
|
|
|
|
pCtx->param[1].i = pQueryAttr->order.col.colId;
|
|
|
|
|
} else if (functionId == FUNCTION_INTERP) {
|
|
|
|
|
pCtx->param[2].i = (int8_t)pQueryAttr->fillType;
|
|
|
|
|
if (pQueryAttr->fillVal != NULL) {
|
|
|
|
|
if (isNull((const char *)&pQueryAttr->fillVal[i], pCtx->inputType)) {
|
|
|
|
|
pCtx->param[1].nType = TSDB_DATA_TYPE_NULL;
|
|
|
|
|
} else { // todo refactor, taosVariantCreateFromBinary should handle the NULL value
|
|
|
|
|
if (pCtx->inputType != TSDB_DATA_TYPE_BINARY && pCtx->inputType != TSDB_DATA_TYPE_NCHAR) {
|
|
|
|
|
taosVariantCreateFromBinary(&pCtx->param[1], (char *)&pQueryAttr->fillVal[i], pCtx->inputBytes, pCtx->inputType);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
} else if (functionId == FUNCTION_TS_COMP) {
|
|
|
|
|
pCtx->param[0].i = pQueryAttr->vgId; //TODO this should be the parameter from client
|
|
|
|
|
pCtx->param[0].nType = TSDB_DATA_TYPE_BIGINT;
|
|
|
|
|
} else if (functionId == FUNCTION_TWA) {
|
|
|
|
|
pCtx->param[1].i = pQueryAttr->window.skey;
|
|
|
|
|
pCtx->param[1].nType = TSDB_DATA_TYPE_BIGINT;
|
|
|
|
|
pCtx->param[2].i = pQueryAttr->window.ekey;
|
|
|
|
|
pCtx->param[2].nType = TSDB_DATA_TYPE_BIGINT;
|
|
|
|
|
} else if (functionId == FUNCTION_ARITHM) {
|
|
|
|
|
// pCtx->param[1].pz = (char*) getScalarFuncSupport(pRuntimeEnv->scalarSup, i);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// for(int32_t i = 1; i < numOfOutput; ++i) {
|
|
|
|
|
// (*rowCellInfoOffset)[i] = (int32_t)((*rowCellInfoOffset)[i - 1] + sizeof(SResultRowEntryInfo) + pExpr[i - 1].base.interBytes);
|
|
|
|
|
// }
|
|
|
|
|
|
|
|
|
|
setCtxTagColumnInfo(pFuncCtx, numOfOutput);
|
|
|
|
|
|
|
|
|
|
return pFuncCtx;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static SqlFunctionCtx* createSqlFunctionCtx_rv(SExprInfo* pExprInfo, int32_t numOfOutput, int32_t** rowCellInfoOffset) {
|
|
|
|
|
SqlFunctionCtx * pFuncCtx = (SqlFunctionCtx *)calloc(numOfOutput, sizeof(SqlFunctionCtx));
|
|
|
|
|
if (pFuncCtx == NULL) {
|
|
|
|
@ -2127,15 +2034,18 @@ static SqlFunctionCtx* createSqlFunctionCtx_rv(SExprInfo* pExprInfo, int32_t num
|
|
|
|
|
SExprBasicInfo *pFunct = &pExpr->base;
|
|
|
|
|
SqlFunctionCtx* pCtx = &pFuncCtx[i];
|
|
|
|
|
|
|
|
|
|
if (pExpr->pExpr->_function.pFunctNode != NULL) {
|
|
|
|
|
pCtx->functionId = -1;
|
|
|
|
|
if (pExpr->base.type == QUERY_NODE_FUNCTION) {
|
|
|
|
|
SFuncExecEnv env = {0};
|
|
|
|
|
pCtx->functionId = pExpr->pExpr->_function.pFunctNode->funcId;
|
|
|
|
|
|
|
|
|
|
fmGetFuncExecFuncs(pCtx->functionId, &pCtx->fpSet);
|
|
|
|
|
pCtx->fpSet.getEnv(pExpr->pExpr->_function.pFunctNode, &env);
|
|
|
|
|
pCtx->resDataInfo.interBufSize = env.calcMemSize;
|
|
|
|
|
} else {
|
|
|
|
|
pCtx->functionId = -1;
|
|
|
|
|
} else if (pExpr->base.type == QUERY_NODE_COLUMN) {
|
|
|
|
|
|
|
|
|
|
} else if (pExpr->base.type == QUERY_NODE_OPERATOR) {
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pCtx->input.numOfInputCols = pFunct->numOfParams;
|
|
|
|
@ -6560,7 +6470,6 @@ static SSDataBlock* doProjectOperation(SOperatorInfo *pOperator, bool* newgroup)
|
|
|
|
|
blockDataCleanup(pRes);
|
|
|
|
|
|
|
|
|
|
if (pProjectInfo->existDataBlock) { // TODO refactor
|
|
|
|
|
// STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current;
|
|
|
|
|
SSDataBlock* pBlock = pProjectInfo->existDataBlock;
|
|
|
|
|
pProjectInfo->existDataBlock = NULL;
|
|
|
|
|
*newgroup = true;
|
|
|
|
@ -6574,9 +6483,7 @@ static SSDataBlock* doProjectOperation(SOperatorInfo *pOperator, bool* newgroup)
|
|
|
|
|
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, TSDB_ORDER_ASC);
|
|
|
|
|
|
|
|
|
|
blockDataEnsureCapacity(pInfo->pRes, pBlock->info.rows);
|
|
|
|
|
projectApplyFunctions(pInfo->pRes, pInfo->pCtx, pOperator->numOfOutput);
|
|
|
|
|
|
|
|
|
|
pRes->info.rows = getNumOfResult(pInfo->pCtx, pOperator->numOfOutput, NULL);
|
|
|
|
|
projectApplyFunctions(pOperator->pExpr, pInfo->pRes, pBlock, pInfo->pCtx, pOperator->numOfOutput);
|
|
|
|
|
if (pRes->info.rows >= pProjectInfo->binfo.capacity*0.8) {
|
|
|
|
|
copyTsColoum(pRes, pInfo->pCtx, pOperator->numOfOutput);
|
|
|
|
|
resetResultRowEntryResult(pInfo->pCtx, pOperator->numOfOutput);
|
|
|
|
@ -6619,15 +6526,13 @@ static SSDataBlock* doProjectOperation(SOperatorInfo *pOperator, bool* newgroup)
|
|
|
|
|
|
|
|
|
|
// the pDataBlock are always the same one, no need to call this again
|
|
|
|
|
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, TSDB_ORDER_ASC);
|
|
|
|
|
updateOutputBuf(pInfo, &pInfo->capacity, pBlock->info.rows);
|
|
|
|
|
|
|
|
|
|
projectApplyFunctions(pInfo->pRes, pInfo->pCtx, pOperator->numOfOutput);
|
|
|
|
|
if (pRes->info.rows >= pProjectInfo->threshold) {
|
|
|
|
|
projectApplyFunctions(pOperator->pExpr, pInfo->pRes, pBlock, pInfo->pCtx, pOperator->numOfOutput);
|
|
|
|
|
if (pRes->info.rows >= pOperator->resultInfo.threshold) {
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
copyTsColoum(pRes, pInfo->pCtx, pOperator->numOfOutput);
|
|
|
|
|
// copyTsColoum(pRes, pInfo->pCtx, pOperator->numOfOutput);
|
|
|
|
|
return (pInfo->pRes->info.rows > 0)? pInfo->pRes:NULL;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -7717,7 +7622,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
|
|
|
|
|
SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput) {
|
|
|
|
|
STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo));
|
|
|
|
|
|
|
|
|
|
pInfo->binfo.pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
|
|
|
|
|
// pInfo->binfo.pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
|
|
|
|
|
// pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pResultInfo->capacity);
|
|
|
|
|
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8);
|
|
|
|
|
|
|
|
|
@ -7742,7 +7647,7 @@ SOperatorInfo* createStatewindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOper
|
|
|
|
|
SStateWindowOperatorInfo* pInfo = calloc(1, sizeof(SStateWindowOperatorInfo));
|
|
|
|
|
pInfo->colIndex = -1;
|
|
|
|
|
pInfo->reptScan = false;
|
|
|
|
|
pInfo->binfo.pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
|
|
|
|
|
// pInfo->binfo.pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
|
|
|
|
|
// pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pResultInfo->capacity);
|
|
|
|
|
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8);
|
|
|
|
|
|
|
|
|
@ -7807,7 +7712,7 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo
|
|
|
|
|
SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput) {
|
|
|
|
|
STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo));
|
|
|
|
|
|
|
|
|
|
pInfo->binfo.pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
|
|
|
|
|
// pInfo->binfo.pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
|
|
|
|
|
// pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pResultInfo->capacity);
|
|
|
|
|
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8);
|
|
|
|
|
|
|
|
|
@ -7831,7 +7736,7 @@ SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntim
|
|
|
|
|
SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput) {
|
|
|
|
|
STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo));
|
|
|
|
|
|
|
|
|
|
pInfo->binfo.pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
|
|
|
|
|
// pInfo->binfo.pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
|
|
|
|
|
// pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pResultInfo->capacity);
|
|
|
|
|
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8);
|
|
|
|
|
|
|
|
|
@ -8439,16 +8344,18 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t*
|
|
|
|
|
|
|
|
|
|
// it is a project query, or group by column
|
|
|
|
|
if (nodeType(pTargetNode->pExpr) == QUERY_NODE_COLUMN) {
|
|
|
|
|
pExp->base.type = QUERY_NODE_COLUMN;
|
|
|
|
|
SColumnNode* pColNode = (SColumnNode*) pTargetNode->pExpr;
|
|
|
|
|
|
|
|
|
|
SDataType* pType = &pColNode->node.resType;
|
|
|
|
|
pExp->base.resSchema = createResSchema(pType->type, pType->bytes, pTargetNode->slotId, pType->scale, pType->precision, pColNode->colName);
|
|
|
|
|
pCol->slotId = pColNode->slotId;
|
|
|
|
|
pCol->slotId = pColNode->slotId; // TODO refactor
|
|
|
|
|
pCol->bytes = pType->bytes;
|
|
|
|
|
pCol->type = pType->type;
|
|
|
|
|
pCol->scale = pType->scale;
|
|
|
|
|
pCol->precision = pType->precision;
|
|
|
|
|
} else {
|
|
|
|
|
} else if (nodeType(pTargetNode->pExpr) == QUERY_NODE_FUNCTION) {
|
|
|
|
|
pExp->base.type = QUERY_NODE_FUNCTION;
|
|
|
|
|
SFunctionNode* pFuncNode = (SFunctionNode*)pTargetNode->pExpr;
|
|
|
|
|
|
|
|
|
|
SDataType* pType = &pFuncNode->node.resType;
|
|
|
|
@ -8462,7 +8369,7 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t*
|
|
|
|
|
int32_t numOfParam = LIST_LENGTH(pFuncNode->pParameterList);
|
|
|
|
|
for (int32_t j = 0; j < numOfParam; ++j) {
|
|
|
|
|
SNode* p1 = nodesListGetNode(pFuncNode->pParameterList, j);
|
|
|
|
|
SColumnNode* pcn = (SColumnNode*)p1;
|
|
|
|
|
SColumnNode* pcn = (SColumnNode*)p1; // TODO refactor
|
|
|
|
|
|
|
|
|
|
pCol->slotId = pcn->slotId;
|
|
|
|
|
pCol->bytes = pcn->node.resType.bytes;
|
|
|
|
@ -8471,6 +8378,22 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t*
|
|
|
|
|
pCol->precision = pcn->node.resType.precision;
|
|
|
|
|
pCol->dataBlockId = pcn->dataBlockId;
|
|
|
|
|
}
|
|
|
|
|
} else if (nodeType(pTargetNode->pExpr) == QUERY_NODE_OPERATOR) {
|
|
|
|
|
pExp->base.type = QUERY_NODE_OPERATOR;
|
|
|
|
|
SOperatorNode* pNode = (SOperatorNode*) pTargetNode->pExpr;
|
|
|
|
|
|
|
|
|
|
SDataType* pType = &pNode->node.resType;
|
|
|
|
|
pExp->base.resSchema = createResSchema(pType->type, pType->bytes, pTargetNode->slotId, pType->scale, pType->precision, pNode->node.aliasName);
|
|
|
|
|
|
|
|
|
|
pExp->pExpr->_optrRoot.pRootNode = pTargetNode->pExpr;
|
|
|
|
|
|
|
|
|
|
pCol->slotId = pTargetNode->slotId; // TODO refactor
|
|
|
|
|
pCol->bytes = pType->bytes;
|
|
|
|
|
pCol->type = pType->type;
|
|
|
|
|
pCol->scale = pType->scale;
|
|
|
|
|
pCol->precision = pType->precision;
|
|
|
|
|
} else {
|
|
|
|
|
ASSERT(0);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|