feat(stream): partition by

This commit is contained in:
54liuyao 2022-06-14 11:15:10 +08:00
parent 03eca3072a
commit 22f86ee529
1 changed files with 10 additions and 0 deletions

View File

@ -582,6 +582,15 @@ int32_t* setupColumnOffset(const SSDataBlock* pBlock, int32_t rowCapacity) {
return offset;
}
static void clearPartitionOperator(SPartitionOperatorInfo* pInfo) {
void *ite = NULL;
while( (ite = taosHashIterate(pInfo->pGroupSet, ite)) != NULL ) {
taosArrayDestroy( ((SDataGroupInfo *)ite)->pPageList);
}
taosHashClear(pInfo->pGroupSet);
clearDiskbasedBuf(pInfo->pBuf);
}
static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) {
SPartitionOperatorInfo* pInfo = pOperator->info;
@ -591,6 +600,7 @@ static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) {
pInfo->pGroupIter = taosHashIterate(pInfo->pGroupSet, pInfo->pGroupIter);
if (pInfo->pGroupIter == NULL) {
doSetOperatorCompleted(pOperator);
clearPartitionOperator(pInfo);
return NULL;
}