fix(query): avoid the output result overlap within the project operator buffer.
This commit is contained in:
parent
9864e367bb
commit
9d1a9d9510
|
@ -203,6 +203,7 @@ int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, co
|
||||||
if (pSource->hasNull) {
|
if (pSource->hasNull) {
|
||||||
pColumnInfoData->hasNull = pSource->hasNull;
|
pColumnInfoData->hasNull = pSource->hasNull;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
|
if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
|
||||||
// Handle the bitmap
|
// Handle the bitmap
|
||||||
char* p = taosMemoryRealloc(pColumnInfoData->varmeta.offset, sizeof(int32_t) * (numOfRow1 + numOfRow2));
|
char* p = taosMemoryRealloc(pColumnInfoData->varmeta.offset, sizeof(int32_t) * (numOfRow1 + numOfRow2));
|
||||||
|
|
|
@ -1171,31 +1171,43 @@ void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock*
|
||||||
setPseudoOutputColInfo(pResult, pCtx, pPseudoList);
|
setPseudoOutputColInfo(pResult, pCtx, pPseudoList);
|
||||||
pResult->info.groupId = pSrcBlock->info.groupId;
|
pResult->info.groupId = pSrcBlock->info.groupId;
|
||||||
|
|
||||||
|
int32_t numOfRows = 0;
|
||||||
|
|
||||||
for (int32_t k = 0; k < numOfOutput; ++k) {
|
for (int32_t k = 0; k < numOfOutput; ++k) {
|
||||||
int32_t outputSlotId = pExpr[k].base.resSchema.slotId;
|
int32_t outputSlotId = pExpr[k].base.resSchema.slotId;
|
||||||
SqlFunctionCtx* pfCtx = &pCtx[k];
|
SqlFunctionCtx* pfCtx = &pCtx[k];
|
||||||
|
|
||||||
if (pExpr[k].pExpr->nodeType == QUERY_NODE_COLUMN) { // it is a project query
|
if (pExpr[k].pExpr->nodeType == QUERY_NODE_COLUMN) { // it is a project query
|
||||||
SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId);
|
SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId);
|
||||||
colDataAssign(pColInfoData, pfCtx->input.pData[0], pfCtx->input.numOfRows);
|
if (pResult->info.rows > 0) {
|
||||||
|
colDataMergeCol(pColInfoData, pResult->info.rows, pfCtx->input.pData[0], pfCtx->input.numOfRows);
|
||||||
|
} else {
|
||||||
|
colDataAssign(pColInfoData, pfCtx->input.pData[0], pfCtx->input.numOfRows);
|
||||||
|
}
|
||||||
|
|
||||||
pResult->info.rows = pSrcBlock->info.rows;
|
numOfRows = pfCtx->input.numOfRows;
|
||||||
} else if (pExpr[k].pExpr->nodeType == QUERY_NODE_VALUE) {
|
} else if (pExpr[k].pExpr->nodeType == QUERY_NODE_VALUE) {
|
||||||
SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId);
|
SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId);
|
||||||
|
|
||||||
|
int32_t offset = pResult->info.rows;
|
||||||
for (int32_t i = 0; i < pSrcBlock->info.rows; ++i) {
|
for (int32_t i = 0; i < pSrcBlock->info.rows; ++i) {
|
||||||
colDataAppend(pColInfoData, i, taosVariantGet(&pExpr[k].base.pParam[0].param, pExpr[k].base.pParam[0].param.nType), TSDB_DATA_TYPE_NULL == pExpr[k].base.pParam[0].param.nType);
|
colDataAppend(pColInfoData, i + offset, taosVariantGet(&pExpr[k].base.pParam[0].param, pExpr[k].base.pParam[0].param.nType), TSDB_DATA_TYPE_NULL == pExpr[k].base.pParam[0].param.nType);
|
||||||
}
|
}
|
||||||
pResult->info.rows = pSrcBlock->info.rows;
|
|
||||||
|
numOfRows = pSrcBlock->info.rows;
|
||||||
} else if (pExpr[k].pExpr->nodeType == QUERY_NODE_OPERATOR) {
|
} else if (pExpr[k].pExpr->nodeType == QUERY_NODE_OPERATOR) {
|
||||||
SArray* pBlockList = taosArrayInit(4, POINTER_BYTES);
|
SArray* pBlockList = taosArrayInit(4, POINTER_BYTES);
|
||||||
taosArrayPush(pBlockList, &pSrcBlock);
|
taosArrayPush(pBlockList, &pSrcBlock);
|
||||||
|
|
||||||
SScalarParam dest = {0};
|
SColumnInfoData* pResColData = taosArrayGet(pResult->pDataBlock, outputSlotId);
|
||||||
dest.columnData = taosArrayGet(pResult->pDataBlock, outputSlotId);
|
SColumnInfoData idata = {.info = pResColData->info};
|
||||||
|
|
||||||
|
SScalarParam dest = {.columnData = &idata};
|
||||||
scalarCalculate(pExpr[k].pExpr->_optrRoot.pRootNode, pBlockList, &dest);
|
scalarCalculate(pExpr[k].pExpr->_optrRoot.pRootNode, pBlockList, &dest);
|
||||||
pResult->info.rows = dest.numOfRows;
|
|
||||||
|
|
||||||
|
colDataMergeCol(pResColData, pResult->info.rows, &idata, dest.numOfRows);
|
||||||
|
|
||||||
|
numOfRows = dest.numOfRows;
|
||||||
taosArrayDestroy(pBlockList);
|
taosArrayDestroy(pBlockList);
|
||||||
} else if (pExpr[k].pExpr->nodeType == QUERY_NODE_FUNCTION) {
|
} else if (pExpr[k].pExpr->nodeType == QUERY_NODE_FUNCTION) {
|
||||||
ASSERT(!fmIsAggFunc(pfCtx->functionId));
|
ASSERT(!fmIsAggFunc(pfCtx->functionId));
|
||||||
|
@ -1212,28 +1224,33 @@ void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock*
|
||||||
pfCtx->pOutput = taosArrayGet(pResult->pDataBlock, outputSlotId);
|
pfCtx->pOutput = taosArrayGet(pResult->pDataBlock, outputSlotId);
|
||||||
pfCtx->offset = pResult->info.rows; // set the start offset
|
pfCtx->offset = pResult->info.rows; // set the start offset
|
||||||
|
|
||||||
|
// set the timestamp(_rowts) output buffer
|
||||||
if (taosArrayGetSize(pPseudoList) > 0) {
|
if (taosArrayGetSize(pPseudoList) > 0) {
|
||||||
int32_t* outputColIndex = taosArrayGet(pPseudoList, 0);
|
int32_t* outputColIndex = taosArrayGet(pPseudoList, 0);
|
||||||
pfCtx->pTsOutput = (SColumnInfoData*)pCtx[*outputColIndex].pOutput;
|
pfCtx->pTsOutput = (SColumnInfoData*)pCtx[*outputColIndex].pOutput;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t numOfRows = pfCtx->fpSet.process(pfCtx);
|
numOfRows = pfCtx->fpSet.process(pfCtx);
|
||||||
pResult->info.rows += numOfRows;
|
|
||||||
} else {
|
} else {
|
||||||
SArray* pBlockList = taosArrayInit(4, POINTER_BYTES);
|
SArray* pBlockList = taosArrayInit(4, POINTER_BYTES);
|
||||||
taosArrayPush(pBlockList, &pSrcBlock);
|
taosArrayPush(pBlockList, &pSrcBlock);
|
||||||
|
|
||||||
SScalarParam dest = {0};
|
SColumnInfoData* pResColData = taosArrayGet(pResult->pDataBlock, outputSlotId);
|
||||||
dest.columnData = taosArrayGet(pResult->pDataBlock, outputSlotId);
|
SColumnInfoData idata = {.info = pResColData->info};
|
||||||
|
|
||||||
|
SScalarParam dest = {.columnData = &idata};
|
||||||
scalarCalculate((SNode*)pExpr[k].pExpr->_function.pFunctNode, pBlockList, &dest);
|
scalarCalculate((SNode*)pExpr[k].pExpr->_function.pFunctNode, pBlockList, &dest);
|
||||||
pResult->info.rows = dest.numOfRows;
|
colDataMergeCol(pResColData, pResult->info.rows, &idata, dest.numOfRows);
|
||||||
|
|
||||||
|
numOfRows = dest.numOfRows;
|
||||||
taosArrayDestroy(pBlockList);
|
taosArrayDestroy(pBlockList);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pResult->info.rows += numOfRows;
|
||||||
}
|
}
|
||||||
|
|
||||||
void doTimeWindowInterpolation(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo, SArray* pDataBlock, TSKEY prevTs,
|
void doTimeWindowInterpolation(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo, SArray* pDataBlock, TSKEY prevTs,
|
||||||
|
|
Loading…
Reference in New Issue