enh: add procedures on server for udf/udaf in nested queries where outer query is
constant table
This commit is contained in:
parent
0ec69ec3fe
commit
cde9eac954
|
@ -615,14 +615,59 @@ SSDataBlock* doGenerateSourceData(SOperatorInfo* pOperator) {
|
||||||
for (int32_t k = 0; k < pSup->numOfExprs; ++k) {
|
for (int32_t k = 0; k < pSup->numOfExprs; ++k) {
|
||||||
int32_t outputSlotId = pExpr[k].base.resSchema.slotId;
|
int32_t outputSlotId = pExpr[k].base.resSchema.slotId;
|
||||||
|
|
||||||
ASSERT(pExpr[k].pExpr->nodeType == QUERY_NODE_VALUE);
|
if (pExpr[k].pExpr->nodeType == QUERY_NODE_VALUE) {
|
||||||
SColumnInfoData* pColInfoData = taosArrayGet(pRes->pDataBlock, outputSlotId);
|
SColumnInfoData* pColInfoData = taosArrayGet(pRes->pDataBlock, outputSlotId);
|
||||||
|
|
||||||
int32_t type = pExpr[k].base.pParam[0].param.nType;
|
int32_t type = pExpr[k].base.pParam[0].param.nType;
|
||||||
if (TSDB_DATA_TYPE_NULL == type) {
|
if (TSDB_DATA_TYPE_NULL == type) {
|
||||||
colDataSetNNULL(pColInfoData, 0, 1);
|
colDataSetNNULL(pColInfoData, 0, 1);
|
||||||
} else {
|
} else {
|
||||||
colDataSetVal(pColInfoData, 0, taosVariantGet(&pExpr[k].base.pParam[0].param, type), false);
|
colDataSetVal(pColInfoData, 0, taosVariantGet(&pExpr[k].base.pParam[0].param, type), false);
|
||||||
|
}
|
||||||
|
} else if (pExpr[k].pExpr->nodeType == QUERY_NODE_FUNCTION) {
|
||||||
|
SqlFunctionCtx* pfCtx = &pSup->pCtx[k];
|
||||||
|
|
||||||
|
if (fmIsAggFunc(pfCtx->functionId)) {
|
||||||
|
// selective value output should be set during corresponding function execution
|
||||||
|
if (fmIsSelectValueFunc(pfCtx->functionId)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
SColumnInfoData* pOutput = taosArrayGet(pRes->pDataBlock, outputSlotId);
|
||||||
|
int32_t slotId = pfCtx->param[0].pCol->slotId;
|
||||||
|
|
||||||
|
// todo handle the json tag
|
||||||
|
//SColumnInfoData* pInput = taosArrayGet(pSrcBlock->pDataBlock, slotId);
|
||||||
|
//for (int32_t f = 0; f < pSrcBlock->info.rows; ++f) {
|
||||||
|
// bool isNull = colDataIsNull_s(pInput, f);
|
||||||
|
// if (isNull) {
|
||||||
|
// colDataSetNULL(pOutput, pRes->info.rows + f);
|
||||||
|
// } else {
|
||||||
|
// char* data = colDataGetData(pInput, f);
|
||||||
|
// colDataSetVal(pOutput, pRes->info.rows + f, data, isNull);
|
||||||
|
// }
|
||||||
|
//}
|
||||||
|
} else {
|
||||||
|
SArray* pBlockList = taosArrayInit(4, POINTER_BYTES);
|
||||||
|
taosArrayPush(pBlockList, &pRes);
|
||||||
|
|
||||||
|
SColumnInfoData* pResColData = taosArrayGet(pRes->pDataBlock, outputSlotId);
|
||||||
|
SColumnInfoData idata = {.info = pResColData->info, .hasNull = true};
|
||||||
|
|
||||||
|
SScalarParam dest = {.columnData = &idata};
|
||||||
|
int32_t code = scalarCalculate((SNode*)pExpr[k].pExpr->_function.pFunctNode, pBlockList, &dest);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
taosArrayDestroy(pBlockList);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t startOffset = pRes->info.rows;
|
||||||
|
ASSERT(pRes->info.capacity > 0);
|
||||||
|
colDataMergeCol(pResColData, startOffset, (int32_t*)&pRes->info.capacity, &idata, dest.numOfRows);
|
||||||
|
colDataDestroy(&idata);
|
||||||
|
|
||||||
|
taosArrayDestroy(pBlockList);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1694,7 +1694,8 @@ int32_t scalarCalculate(SNode *pNode, SArray *pBlockList, SScalarParam *pDst) {
|
||||||
SCL_ERR_JRET(TSDB_CODE_APP_ERROR);
|
SCL_ERR_JRET(TSDB_CODE_APP_ERROR);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (1 == res->numOfRows) {
|
SSDataBlock *pb = taosArrayGetP(pBlockList, 0);
|
||||||
|
if (1 == res->numOfRows && pb->info.rows > 0) {
|
||||||
SCL_ERR_JRET(sclExtendResRows(pDst, res, pBlockList));
|
SCL_ERR_JRET(sclExtendResRows(pDst, res, pBlockList));
|
||||||
} else {
|
} else {
|
||||||
colInfoDataEnsureCapacity(pDst->columnData, res->numOfRows, true);
|
colInfoDataEnsureCapacity(pDst->columnData, res->numOfRows, true);
|
||||||
|
|
Loading…
Reference in New Issue