feat(stream):delete window

This commit is contained in:
54liuyao 2022-08-11 16:31:51 +08:00
parent a07e9068d0
commit c10a1adbd3
3 changed files with 13 additions and 6 deletions

View File

@ -3586,9 +3586,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pInfo->pStDeleted = taosHashInit(64, hashFn, true, HASH_NO_LOCK);
pInfo->pDelIterator = NULL;
// pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT);
pInfo->pDelRes = createOneDataBlock(pInfo->binfo.pRes, false); // todo(liuyao) for delete
pInfo->pDelRes->info.type = STREAM_DELETE_RESULT; // todo(liuyao) for delete
pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT);
pInfo->pChildren = NULL;
pInfo->isFinal = false;
pInfo->pPhyNode = pPhyNode;

View File

@ -748,6 +748,7 @@ int32_t sumCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
pDBuf->dsum += pSBuf->dsum;
}
pDResInfo->numOfRes = TMAX(pDResInfo->numOfRes, pSResInfo->numOfRes);
pDResInfo->isNullRes &= pSResInfo->isNullRes;
return TSDB_CODE_SUCCESS;
}
@ -1746,6 +1747,7 @@ int32_t minMaxCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx, int3
}
}
pDResInfo->numOfRes = TMAX(pDResInfo->numOfRes, pSResInfo->numOfRes);
pDResInfo->isNullRes &= pSResInfo->isNullRes;
return TSDB_CODE_SUCCESS;
}
@ -2121,6 +2123,7 @@ int32_t stddevCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
}
pDBuf->count += pSBuf->count;
pDResInfo->numOfRes = TMAX(pDResInfo->numOfRes, pSResInfo->numOfRes);
pDResInfo->isNullRes &= pSResInfo->isNullRes;
return TSDB_CODE_SUCCESS;
}
@ -2311,6 +2314,7 @@ int32_t leastSQRCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
pDparam[1][2] += pSparam[1][2];
pDBuf->num += pSBuf->num;
pDResInfo->numOfRes = TMAX(pDResInfo->numOfRes, pSResInfo->numOfRes);
pDResInfo->isNullRes &= pSResInfo->isNullRes;
return TSDB_CODE_SUCCESS;
}
@ -2707,6 +2711,7 @@ int32_t apercentileCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx)
apercentileTransferInfo(pSBuf, pDBuf);
pDResInfo->numOfRes = TMAX(pDResInfo->numOfRes, pSResInfo->numOfRes);
pDResInfo->isNullRes &= pSResInfo->isNullRes;
return TSDB_CODE_SUCCESS;
}
@ -3890,6 +3895,7 @@ int32_t spreadCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
SSpreadInfo* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo);
spreadTransferInfo(pSBuf, pDBuf);
pDResInfo->numOfRes = TMAX(pDResInfo->numOfRes, pSResInfo->numOfRes);
pDResInfo->isNullRes &= pSResInfo->isNullRes;
return TSDB_CODE_SUCCESS;
}
@ -4062,6 +4068,7 @@ int32_t elapsedCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
elapsedTransferInfo(pSBuf, pDBuf);
pDResInfo->numOfRes = TMAX(pDResInfo->numOfRes, pSResInfo->numOfRes);
pDResInfo->isNullRes &= pSResInfo->isNullRes;
return TSDB_CODE_SUCCESS;
}
@ -4379,6 +4386,7 @@ int32_t histogramCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
histogramTransferInfo(pSBuf, pDBuf);
pDResInfo->numOfRes = TMAX(pDResInfo->numOfRes, pSResInfo->numOfRes);
pDResInfo->isNullRes &= pSResInfo->isNullRes;
return TSDB_CODE_SUCCESS;
}
@ -4576,6 +4584,7 @@ int32_t hllCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
hllTransferInfo(pSBuf, pDBuf);
pDResInfo->numOfRes = TMAX(pDResInfo->numOfRes, pSResInfo->numOfRes);
pDResInfo->isNullRes &= pSResInfo->isNullRes;
return TSDB_CODE_SUCCESS;
}

View File

@ -5,15 +5,15 @@ sleep 50
sql connect
print =============== create database
sql create database test vgroups 1
sql show databases
sql create database test vgroups 1;
sql show databases;
if $rows != 3 then
return -1
endi
print $data00 $data01 $data02
sql use test
sql use test;
sql create table t1(ts timestamp, a int, b int , c int, d double,id int);