|
|
|
@ -571,8 +571,8 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
|
|
|
|
|
setPseudoOutputColInfo(pResult, pCtx, pPseudoList);
|
|
|
|
|
pResult->info.groupId = pSrcBlock->info.groupId;
|
|
|
|
|
|
|
|
|
|
// if the source equals to the destination, it is to create a new column as the result of scalar function or some
|
|
|
|
|
// operators.
|
|
|
|
|
// if the source equals to the destination, it is to create a new column as the result of scalar
|
|
|
|
|
// function or some operators.
|
|
|
|
|
bool createNewColModel = (pResult == pSrcBlock);
|
|
|
|
|
|
|
|
|
|
int32_t numOfRows = 0;
|
|
|
|
@ -580,17 +580,17 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
|
|
|
|
|
for (int32_t k = 0; k < numOfOutput; ++k) {
|
|
|
|
|
int32_t outputSlotId = pExpr[k].base.resSchema.slotId;
|
|
|
|
|
SqlFunctionCtx* pfCtx = &pCtx[k];
|
|
|
|
|
SInputColumnInfoData* pInputData = &pfCtx->input;
|
|
|
|
|
|
|
|
|
|
if (pExpr[k].pExpr->nodeType == QUERY_NODE_COLUMN) { // it is a project query
|
|
|
|
|
SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId);
|
|
|
|
|
if (pResult->info.rows > 0 && !createNewColModel) {
|
|
|
|
|
colDataMergeCol(pColInfoData, pResult->info.rows, &pResult->info.capacity, pfCtx->input.pData[0],
|
|
|
|
|
pfCtx->input.numOfRows);
|
|
|
|
|
colDataMergeCol(pColInfoData, pResult->info.rows, &pResult->info.capacity, pInputData->pData[0], pInputData->numOfRows);
|
|
|
|
|
} else {
|
|
|
|
|
colDataAssign(pColInfoData, pfCtx->input.pData[0], pfCtx->input.numOfRows, &pResult->info);
|
|
|
|
|
colDataAssign(pColInfoData, pInputData->pData[0], pInputData->numOfRows, &pResult->info);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
numOfRows = pfCtx->input.numOfRows;
|
|
|
|
|
numOfRows = pInputData->numOfRows;
|
|
|
|
|
} else if (pExpr[k].pExpr->nodeType == QUERY_NODE_VALUE) {
|
|
|
|
|
SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId);
|
|
|
|
|
|
|
|
|
@ -623,14 +623,12 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
|
|
|
|
|
numOfRows = dest.numOfRows;
|
|
|
|
|
taosArrayDestroy(pBlockList);
|
|
|
|
|
} else if (pExpr[k].pExpr->nodeType == QUERY_NODE_FUNCTION) {
|
|
|
|
|
ASSERT(!fmIsAggFunc(pfCtx->functionId));
|
|
|
|
|
|
|
|
|
|
// _rowts/_c0, not tbname column
|
|
|
|
|
if (fmIsPseudoColumnFunc(pfCtx->functionId) && (!fmIsScanPseudoColumnFunc(pfCtx->functionId))) {
|
|
|
|
|
// do nothing
|
|
|
|
|
} else if (fmIsIndefiniteRowsFunc(pfCtx->functionId)) {
|
|
|
|
|
SResultRowEntryInfo* pResInfo = GET_RES_INFO(&pCtx[k]);
|
|
|
|
|
pfCtx->fpSet.init(&pCtx[k], pResInfo);
|
|
|
|
|
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pfCtx);
|
|
|
|
|
pfCtx->fpSet.init(pfCtx, pResInfo);
|
|
|
|
|
|
|
|
|
|
pfCtx->pOutput = taosArrayGet(pResult->pDataBlock, outputSlotId);
|
|
|
|
|
pfCtx->offset = createNewColModel ? 0 : pResult->info.rows; // set the start offset
|
|
|
|
@ -642,6 +640,23 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
numOfRows = pfCtx->fpSet.process(pfCtx);
|
|
|
|
|
} else if (fmIsAggFunc(pfCtx->functionId)) {
|
|
|
|
|
// _group_key function for "partition by tbname" + csum(col_name) query
|
|
|
|
|
SColumnInfoData* pOutput = taosArrayGet(pResult->pDataBlock, outputSlotId);
|
|
|
|
|
int32_t slotId = pfCtx->param[0].pCol->slotId;
|
|
|
|
|
|
|
|
|
|
// todo handle the json tag
|
|
|
|
|
SColumnInfoData* pInput = taosArrayGet(pSrcBlock->pDataBlock, slotId);
|
|
|
|
|
for(int32_t f = 0; f < pSrcBlock->info.rows; ++f) {
|
|
|
|
|
bool isNull = colDataIsNull_s(pInput, f);
|
|
|
|
|
if (isNull) {
|
|
|
|
|
colDataAppendNULL(pOutput, pResult->info.rows + f);
|
|
|
|
|
} else {
|
|
|
|
|
char* data = colDataGetData(pInput, f);
|
|
|
|
|
colDataAppend(pOutput, pResult->info.rows + f, data, isNull);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
} else {
|
|
|
|
|
SArray* pBlockList = taosArrayInit(4, POINTER_BYTES);
|
|
|
|
|
taosArrayPush(pBlockList, &pSrcBlock);
|
|
|
|
@ -675,25 +690,6 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
|
|
|
|
|
return TSDB_CODE_SUCCESS;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void setResultRowKey(SResultRow* pResultRow, char* pData, int16_t type) {
|
|
|
|
|
if (IS_VAR_DATA_TYPE(type)) {
|
|
|
|
|
// todo disable this
|
|
|
|
|
|
|
|
|
|
// if (pResultRow->key == NULL) {
|
|
|
|
|
// pResultRow->key = taosMemoryMalloc(varDataTLen(pData));
|
|
|
|
|
// varDataCopy(pResultRow->key, pData);
|
|
|
|
|
// } else {
|
|
|
|
|
// ASSERT(memcmp(pResultRow->key, pData, varDataTLen(pData)) == 0);
|
|
|
|
|
// }
|
|
|
|
|
} else {
|
|
|
|
|
int64_t v = -1;
|
|
|
|
|
GET_TYPED_DATA(v, int64_t, type, pData);
|
|
|
|
|
|
|
|
|
|
pResultRow->win.skey = v;
|
|
|
|
|
pResultRow->win.ekey = v;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
bool functionNeedToExecute(SqlFunctionCtx* pCtx) {
|
|
|
|
|
struct SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
|
|
|
|
|
|
|
|
@ -3825,6 +3821,40 @@ _error:
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void doHandleDataBlock(SOperatorInfo* pOperator, SSDataBlock* pBlock, SOperatorInfo* downstream, SExecTaskInfo* pTaskInfo) {
|
|
|
|
|
int32_t order = 0;
|
|
|
|
|
int32_t scanFlag = 0;
|
|
|
|
|
|
|
|
|
|
SIndefOperatorInfo* pIndefInfo = pOperator->info;
|
|
|
|
|
SOptrBasicInfo* pInfo = &pIndefInfo->binfo;
|
|
|
|
|
SExprSupp* pSup = &pOperator->exprSupp;
|
|
|
|
|
|
|
|
|
|
// the pDataBlock are always the same one, no need to call this again
|
|
|
|
|
int32_t code = getTableScanInfo(downstream, &order, &scanFlag);
|
|
|
|
|
if (code != TSDB_CODE_SUCCESS) {
|
|
|
|
|
longjmp(pTaskInfo->env, code);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// there is an scalar expression that needs to be calculated before apply the group aggregation.
|
|
|
|
|
SExprSupp* pScalarSup = &pIndefInfo->scalarSup;
|
|
|
|
|
if (pScalarSup->pExprInfo != NULL) {
|
|
|
|
|
code = projectApplyFunctions(pScalarSup->pExprInfo, pBlock, pBlock, pScalarSup->pCtx, pScalarSup->numOfExprs,
|
|
|
|
|
pIndefInfo->pPseudoColInfo);
|
|
|
|
|
if (code != TSDB_CODE_SUCCESS) {
|
|
|
|
|
longjmp(pTaskInfo->env, code);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
setInputDataBlock(pOperator, pSup->pCtx, pBlock, order, scanFlag, false);
|
|
|
|
|
blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows);
|
|
|
|
|
|
|
|
|
|
code = projectApplyFunctions(pSup->pExprInfo, pInfo->pRes, pBlock, pSup->pCtx, pSup->numOfExprs,
|
|
|
|
|
pIndefInfo->pPseudoColInfo);
|
|
|
|
|
if (code != TSDB_CODE_SUCCESS) {
|
|
|
|
|
longjmp(pTaskInfo->env, code);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static SSDataBlock* doApplyIndefinitFunction(SOperatorInfo* pOperator) {
|
|
|
|
|
SIndefOperatorInfo* pIndefInfo = pOperator->info;
|
|
|
|
|
SOptrBasicInfo* pInfo = &pIndefInfo->binfo;
|
|
|
|
@ -3839,8 +3869,6 @@ static SSDataBlock* doApplyIndefinitFunction(SOperatorInfo* pOperator) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int64_t st = 0;
|
|
|
|
|
int32_t order = 0;
|
|
|
|
|
int32_t scanFlag = 0;
|
|
|
|
|
|
|
|
|
|
if (pOperator->cost.openCost == 0) {
|
|
|
|
|
st = taosGetTimestampUs();
|
|
|
|
@ -3848,42 +3876,54 @@ static SSDataBlock* doApplyIndefinitFunction(SOperatorInfo* pOperator) {
|
|
|
|
|
|
|
|
|
|
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
|
|
|
|
|
|
|
|
|
while (1) {
|
|
|
|
|
// The downstream exec may change the value of the newgroup, so use a local variable instead.
|
|
|
|
|
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
|
|
|
|
|
if (pBlock == NULL) {
|
|
|
|
|
doSetOperatorCompleted(pOperator);
|
|
|
|
|
break;
|
|
|
|
|
while(1) {
|
|
|
|
|
// here we need to handle the existsed group results
|
|
|
|
|
if (pIndefInfo->pNextGroupRes != NULL) { // todo extract method
|
|
|
|
|
for (int32_t k = 0; k < pSup->numOfExprs; ++k) {
|
|
|
|
|
SqlFunctionCtx* pCtx = &pSup->pCtx[k];
|
|
|
|
|
|
|
|
|
|
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
|
|
|
|
pResInfo->initialized = false;
|
|
|
|
|
pCtx->pOutput = NULL;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
doHandleDataBlock(pOperator, pIndefInfo->pNextGroupRes, downstream, pTaskInfo);
|
|
|
|
|
pIndefInfo->pNextGroupRes = NULL;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// the pDataBlock are always the same one, no need to call this again
|
|
|
|
|
int32_t code = getTableScanInfo(pOperator->pDownstream[0], &order, &scanFlag);
|
|
|
|
|
if (code != TSDB_CODE_SUCCESS) {
|
|
|
|
|
longjmp(pTaskInfo->env, code);
|
|
|
|
|
}
|
|
|
|
|
if (pInfo->pRes->info.rows < pOperator->resultInfo.threshold) {
|
|
|
|
|
while (1) {
|
|
|
|
|
// The downstream exec may change the value of the newgroup, so use a local variable instead.
|
|
|
|
|
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
|
|
|
|
|
if (pBlock == NULL) {
|
|
|
|
|
doSetOperatorCompleted(pOperator);
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// there is an scalar expression that needs to be calculated before apply the group aggregation.
|
|
|
|
|
SExprSupp* pScalarSup = &pIndefInfo->scalarSup;
|
|
|
|
|
if (pScalarSup->pExprInfo != NULL) {
|
|
|
|
|
code = projectApplyFunctions(pScalarSup->pExprInfo, pBlock, pBlock, pScalarSup->pCtx, pScalarSup->numOfExprs,
|
|
|
|
|
pIndefInfo->pPseudoColInfo);
|
|
|
|
|
if (code != TSDB_CODE_SUCCESS) {
|
|
|
|
|
longjmp(pTaskInfo->env, code);
|
|
|
|
|
if (pIndefInfo->groupId == 0 && pBlock->info.groupId != 0) {
|
|
|
|
|
pIndefInfo->groupId = pBlock->info.groupId; // this is the initial group result
|
|
|
|
|
} else {
|
|
|
|
|
if (pIndefInfo->groupId != pBlock->info.groupId) { // reset output buffer and computing status
|
|
|
|
|
pIndefInfo->groupId = pBlock->info.groupId;
|
|
|
|
|
pIndefInfo->pNextGroupRes = pBlock;
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
doHandleDataBlock(pOperator, pBlock, downstream, pTaskInfo);
|
|
|
|
|
if (pInfo->pRes->info.rows >= pOperator->resultInfo.threshold) {
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
setInputDataBlock(pOperator, pSup->pCtx, pBlock, order, scanFlag, false);
|
|
|
|
|
blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows);
|
|
|
|
|
|
|
|
|
|
code = projectApplyFunctions(pOperator->exprSupp.pExprInfo, pInfo->pRes, pBlock, pSup->pCtx,
|
|
|
|
|
pOperator->exprSupp.numOfExprs, pIndefInfo->pPseudoColInfo);
|
|
|
|
|
if (code != TSDB_CODE_SUCCESS) {
|
|
|
|
|
longjmp(pTaskInfo->env, code);
|
|
|
|
|
doFilter(pIndefInfo->pCondition, pInfo->pRes);
|
|
|
|
|
size_t rows = pInfo->pRes->info.rows;
|
|
|
|
|
if (rows >= 0) {
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
doFilter(pIndefInfo->pCondition, pInfo->pRes);
|
|
|
|
|
|
|
|
|
|
size_t rows = pInfo->pRes->info.rows;
|
|
|
|
|
pOperator->resultInfo.totalRows += rows;
|
|
|
|
|
|
|
|
|
@ -3928,24 +3968,23 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy
|
|
|
|
|
if (numOfRows * pResBlock->info.rowSize > TWOMB) {
|
|
|
|
|
numOfRows = TWOMB / pResBlock->info.rowSize;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
initResultSizeInfo(pOperator, numOfRows);
|
|
|
|
|
|
|
|
|
|
initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfExpr, keyBufSize, pTaskInfo->id.str);
|
|
|
|
|
initAggInfo(pSup, &pInfo->aggSup, pExprInfo, numOfExpr, keyBufSize, pTaskInfo->id.str);
|
|
|
|
|
initBasicInfo(&pInfo->binfo, pResBlock);
|
|
|
|
|
|
|
|
|
|
setFunctionResultOutput(pOperator, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfExpr);
|
|
|
|
|
|
|
|
|
|
pInfo->binfo.pRes = pResBlock;
|
|
|
|
|
pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pSup->pCtx, numOfExpr);
|
|
|
|
|
pInfo->pCondition = pPhyNode->node.pConditions;
|
|
|
|
|
pInfo->binfo.pRes = pResBlock;
|
|
|
|
|
pInfo->pCondition = pPhyNode->node.pConditions;
|
|
|
|
|
pInfo->pPseudoColInfo= setRowTsColumnOutputInfo(pSup->pCtx, numOfExpr);
|
|
|
|
|
|
|
|
|
|
pOperator->name = "IndefinitOperator";
|
|
|
|
|
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PROJECT;
|
|
|
|
|
pOperator->blocking = false;
|
|
|
|
|
pOperator->status = OP_NOT_OPENED;
|
|
|
|
|
pOperator->info = pInfo;
|
|
|
|
|
pOperator->exprSupp.pExprInfo = pExprInfo;
|
|
|
|
|
pOperator->exprSupp.numOfExprs = numOfExpr;
|
|
|
|
|
pOperator->name = "IndefinitOperator";
|
|
|
|
|
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC;
|
|
|
|
|
pOperator->blocking = false;
|
|
|
|
|
pOperator->status = OP_NOT_OPENED;
|
|
|
|
|
pOperator->info = pInfo;
|
|
|
|
|
pOperator->pTaskInfo = pTaskInfo;
|
|
|
|
|
|
|
|
|
|
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doApplyIndefinitFunction, NULL, NULL,
|
|
|
|
|