feat(stream): optimize result
This commit is contained in:
parent
b28553395f
commit
b7e84b3caf
|
@ -1395,6 +1395,7 @@ static void destroyStreamScanOperatorInfo(void* param, int32_t numOfOutput) {
|
||||||
if (pStreamScan->pTableScanOp && pStreamScan->pTableScanOp->info) {
|
if (pStreamScan->pTableScanOp && pStreamScan->pTableScanOp->info) {
|
||||||
STableScanInfo* pTableScanInfo = pStreamScan->pTableScanOp->info;
|
STableScanInfo* pTableScanInfo = pStreamScan->pTableScanOp->info;
|
||||||
destroyTableScanOperatorInfo(pTableScanInfo, numOfOutput);
|
destroyTableScanOperatorInfo(pTableScanInfo, numOfOutput);
|
||||||
|
taosMemoryFreeClear(pStreamScan->pTableScanOp);
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
if (pStreamScan->tqReader) {
|
if (pStreamScan->tqReader) {
|
||||||
|
|
|
@ -2762,11 +2762,13 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
printDataBlock(pInfo->binfo.pRes, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi");
|
printDataBlock(pInfo->binfo.pRes, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi");
|
||||||
return pInfo->binfo.pRes;
|
return pInfo->binfo.pRes;
|
||||||
} else {
|
} else {
|
||||||
|
if (!IS_FINAL_OP(pInfo)) {
|
||||||
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
|
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
|
||||||
if (pInfo->binfo.pRes->info.rows != 0) {
|
if (pInfo->binfo.pRes->info.rows != 0) {
|
||||||
printDataBlock(pInfo->binfo.pRes, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi");
|
printDataBlock(pInfo->binfo.pRes, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi");
|
||||||
return pInfo->binfo.pRes;
|
return pInfo->binfo.pRes;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
if (pInfo->pUpdateRes->info.rows != 0 && pInfo->returnUpdate) {
|
if (pInfo->pUpdateRes->info.rows != 0 && pInfo->returnUpdate) {
|
||||||
pInfo->returnUpdate = false;
|
pInfo->returnUpdate = false;
|
||||||
ASSERT(!IS_FINAL_OP(pInfo));
|
ASSERT(!IS_FINAL_OP(pInfo));
|
||||||
|
@ -3108,6 +3110,10 @@ void destroyStreamSessionAggOperatorInfo(void* param, int32_t numOfOutput) {
|
||||||
taosMemoryFreeClear(pChInfo);
|
taosMemoryFreeClear(pChInfo);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
colDataDestroy(&pInfo->twAggSup.timeWindowData);
|
||||||
|
blockDataDestroy(pInfo->pDelRes);
|
||||||
|
blockDataDestroy(pInfo->pWinBlock);
|
||||||
|
blockDataDestroy(pInfo->pUpdateRes);
|
||||||
|
|
||||||
taosMemoryFreeClear(param);
|
taosMemoryFreeClear(param);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue