fix:stddev\fisrt\last combine function
This commit is contained in:
parent
7c56979ac6
commit
d7a38aeae9
|
@ -423,6 +423,8 @@ typedef struct SGroupKeyInfo {
|
|||
(_p).val = (_v); \
|
||||
} while (0)
|
||||
|
||||
static int32_t firstLastTransferInfoImpl(SFirstLastRes* pInput, SFirstLastRes* pOutput, bool isFirst);
|
||||
|
||||
bool functionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) {
|
||||
if (pResultInfo->initialized) {
|
||||
return false;
|
||||
|
@ -457,11 +459,12 @@ int32_t firstCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
|
|||
SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx);
|
||||
SFirstLastRes* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo);
|
||||
|
||||
if (pSResInfo->numOfRes != 0 && (pDResInfo->numOfRes == 0 || pDBuf->ts > pSBuf->ts)) {
|
||||
memcpy(pDBuf->buf, pSBuf->buf, bytes);
|
||||
pDBuf->ts = pSBuf->ts;
|
||||
pDResInfo->numOfRes = 1;
|
||||
if (TSDB_CODE_SUCCESS == firstLastTransferInfoImpl(pSBuf, pDBuf, true)) {
|
||||
pDBuf->hasResult = true;
|
||||
}
|
||||
|
||||
pDResInfo->numOfRes = TMAX(pDResInfo->numOfRes, pSResInfo->numOfRes);
|
||||
pDResInfo->isNullRes &= pSResInfo->isNullRes;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -1274,17 +1277,8 @@ int32_t stddevCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
|
|||
SStddevRes* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo);
|
||||
int16_t type = pDBuf->type == TSDB_DATA_TYPE_NULL ? pSBuf->type : pDBuf->type;
|
||||
|
||||
if (IS_SIGNED_NUMERIC_TYPE(type)) {
|
||||
pDBuf->isum += pSBuf->isum;
|
||||
pDBuf->quadraticISum += pSBuf->quadraticISum;
|
||||
} else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
|
||||
pDBuf->usum += pSBuf->usum;
|
||||
pDBuf->quadraticUSum += pSBuf->quadraticUSum;
|
||||
} else {
|
||||
pDBuf->dsum += pSBuf->dsum;
|
||||
pDBuf->quadraticDSum += pSBuf->quadraticDSum;
|
||||
}
|
||||
pDBuf->count += pSBuf->count;
|
||||
stddevTransferInfo(pSBuf, pDBuf);
|
||||
|
||||
pDResInfo->numOfRes = TMAX(pDResInfo->numOfRes, pSResInfo->numOfRes);
|
||||
pDResInfo->isNullRes &= pSResInfo->isNullRes;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -2289,16 +2283,15 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static void firstLastTransferInfo(SqlFunctionCtx* pCtx, SFirstLastRes* pInput, SFirstLastRes* pOutput, bool isFirst,
|
||||
int32_t rowIndex) {
|
||||
static int32_t firstLastTransferInfoImpl(SFirstLastRes* pInput, SFirstLastRes* pOutput, bool isFirst) {
|
||||
if (pOutput->hasResult) {
|
||||
if (isFirst) {
|
||||
if (pInput->ts > pOutput->ts) {
|
||||
return;
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
} else {
|
||||
if (pInput->ts < pOutput->ts) {
|
||||
return;
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -2308,9 +2301,15 @@ static void firstLastTransferInfo(SqlFunctionCtx* pCtx, SFirstLastRes* pInput, S
|
|||
pOutput->bytes = pInput->bytes;
|
||||
|
||||
memcpy(pOutput->buf, pInput->buf, pOutput->bytes);
|
||||
firstlastSaveTupleData(pCtx->pSrcBlock, rowIndex, pCtx, pOutput);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
pOutput->hasResult = true;
|
||||
static void firstLastTransferInfo(SqlFunctionCtx* pCtx, SFirstLastRes* pInput, SFirstLastRes* pOutput, bool isFirst,
|
||||
int32_t rowIndex) {
|
||||
if (TSDB_CODE_SUCCESS == firstLastTransferInfoImpl(pInput, pOutput, isFirst)) {
|
||||
firstlastSaveTupleData(pCtx->pSrcBlock, rowIndex, pCtx, pOutput);
|
||||
pOutput->hasResult = true;
|
||||
}
|
||||
}
|
||||
|
||||
static int32_t firstLastFunctionMergeImpl(SqlFunctionCtx* pCtx, bool isFirstQuery) {
|
||||
|
@ -2378,7 +2377,6 @@ int32_t firstLastPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
|||
return 1;
|
||||
}
|
||||
|
||||
// todo rewrite:
|
||||
int32_t lastCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
|
||||
SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx);
|
||||
SFirstLastRes* pDBuf = GET_ROWCELL_INTERBUF(pDResInfo);
|
||||
|
@ -2387,11 +2385,12 @@ int32_t lastCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
|
|||
SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx);
|
||||
SFirstLastRes* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo);
|
||||
|
||||
if (pSResInfo->numOfRes != 0 && (pDResInfo->numOfRes == 0 || pDBuf->ts < pSBuf->ts)) {
|
||||
memcpy(pDBuf->buf, pSBuf->buf, bytes);
|
||||
pDBuf->ts = pSBuf->ts;
|
||||
pDResInfo->numOfRes = 1;
|
||||
if (TSDB_CODE_SUCCESS == firstLastTransferInfoImpl(pSBuf, pDBuf, false)) {
|
||||
pDBuf->hasResult = true;
|
||||
}
|
||||
|
||||
pDResInfo->numOfRes = TMAX(pDResInfo->numOfRes, pSResInfo->numOfRes);
|
||||
pDResInfo->isNullRes &= pSResInfo->isNullRes;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
|
|
@ -197,4 +197,98 @@ if $data01 != 1 then
|
|||
return -1
|
||||
endi
|
||||
|
||||
sql create database test1 vgroups 1;
|
||||
sql use test1;
|
||||
sql create table t1(ts timestamp, a int, b int , c int, d double);
|
||||
sql create stream streams3 trigger at_once into streamt3 as select _wstart, count(*) c1 from t1 where a > 5 session(ts, 5s);
|
||||
sql insert into t1 values(1648791213000,1,2,3,1.0);
|
||||
|
||||
$loop_count = 0
|
||||
loop13:
|
||||
sleep 200
|
||||
|
||||
sql select * from streamt3;
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
# row 0
|
||||
if $rows != 0 then
|
||||
print =====rows=$rows
|
||||
goto loop13
|
||||
endi
|
||||
|
||||
sql insert into t1 values(1648791213000,6,2,3,1.0);
|
||||
|
||||
$loop_count = 0
|
||||
loop14:
|
||||
sleep 200
|
||||
sql select * from streamt3;
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data01 != 1 then
|
||||
print =====data01=$data01
|
||||
goto loop14
|
||||
endi
|
||||
|
||||
sql insert into t1 values(1648791213000,2,2,3,1.0);
|
||||
|
||||
$loop_count = 0
|
||||
loop15:
|
||||
sleep 200
|
||||
sql select * from streamt3;
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $rows != 0 then
|
||||
print =====rows=$rows
|
||||
goto loop15
|
||||
endi
|
||||
|
||||
|
||||
sql insert into t1 values(1648791223000,2,2,3,1.0);
|
||||
sql insert into t1 values(1648791223000,10,2,3,1.0);
|
||||
sql insert into t1 values(1648791233000,10,2,3,1.0);
|
||||
|
||||
$loop_count = 0
|
||||
loop16:
|
||||
sleep 200
|
||||
sql select * from streamt3;
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $rows != 2 then
|
||||
print =====rows=$rows
|
||||
goto loop16
|
||||
endi
|
||||
|
||||
sql insert into t1 values(1648791233000,2,2,3,1.0);
|
||||
|
||||
$loop_count = 0
|
||||
loop17:
|
||||
sleep 200
|
||||
sql select * from streamt3;
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $rows != 1 then
|
||||
print =====rows=$rows
|
||||
goto loop17
|
||||
endi
|
||||
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||
|
|
Loading…
Reference in New Issue