feat(stream): add result group id
This commit is contained in:
parent
bf71325184
commit
4371165235
|
@ -660,6 +660,62 @@ void printDataBlock(SSDataBlock* pBlock, const char* flag) {
|
|||
taosMemoryFree(pBuf);
|
||||
}
|
||||
|
||||
typedef int32_t (*__compare_fn_t)(void* pKey, void* data, int32_t index);
|
||||
|
||||
int32_t binarySearchCom(void* keyList, int num, void* pKey, int order, __compare_fn_t comparefn) {
|
||||
int firstPos = 0, lastPos = num - 1, midPos = -1;
|
||||
int numOfRows = 0;
|
||||
|
||||
if (num <= 0) return -1;
|
||||
if (order == TSDB_ORDER_DESC) {
|
||||
// find the first position which is smaller or equal than the key
|
||||
while (1) {
|
||||
if (comparefn(pKey, keyList, lastPos) >= 0) return lastPos;
|
||||
if (comparefn(pKey, keyList, firstPos) == 0) return firstPos;
|
||||
if (comparefn(pKey, keyList, firstPos) < 0) return firstPos - 1;
|
||||
|
||||
numOfRows = lastPos - firstPos + 1;
|
||||
midPos = (numOfRows >> 1) + firstPos;
|
||||
|
||||
if (comparefn(pKey, keyList, midPos) < 0) {
|
||||
lastPos = midPos - 1;
|
||||
} else if (comparefn(pKey, keyList, midPos) > 0) {
|
||||
firstPos = midPos + 1;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
} else {
|
||||
// find the first position which is bigger or equal than the key
|
||||
while (1) {
|
||||
if (comparefn(pKey, keyList, firstPos) <= 0) return firstPos;
|
||||
if (comparefn(pKey, keyList, lastPos) == 0) return lastPos;
|
||||
|
||||
if (comparefn(pKey, keyList, lastPos) > 0) {
|
||||
lastPos = lastPos + 1;
|
||||
if (lastPos >= num)
|
||||
return -1;
|
||||
else
|
||||
return lastPos;
|
||||
}
|
||||
|
||||
numOfRows = lastPos - firstPos + 1;
|
||||
midPos = (numOfRows >> 1) + firstPos;
|
||||
|
||||
if (comparefn(pKey, keyList, midPos) < 0) {
|
||||
lastPos = midPos - 1;
|
||||
} else if (comparefn(pKey, keyList, midPos) > 0) {
|
||||
firstPos = midPos + 1;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return midPos;
|
||||
}
|
||||
|
||||
typedef int64_t (*__get_value_fn_t)(void* data, int32_t index);
|
||||
|
||||
int32_t binarySearch(void* keyList, int num, TSKEY key, int order, __get_value_fn_t getValuefn) {
|
||||
|
@ -722,14 +778,31 @@ int64_t getReskey(void* data, int32_t index) {
|
|||
return *(int64_t*)pos->key;
|
||||
}
|
||||
|
||||
int32_t compareResKey(void* pKey, void* data, int32_t index) {
|
||||
SArray* res = (SArray*)data;
|
||||
SResKeyPos* pos = taosArrayGetP(res, index);
|
||||
SWinRes* pData = (SWinRes*) pKey;
|
||||
if (pData->ts == *(int64_t*)pos->key) {
|
||||
if (pData->groupId > pos->groupId) {
|
||||
return 1;
|
||||
} else if (pData->groupId < pos->groupId) {
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
} else if (pData->ts > *(int64_t*)pos->key) {
|
||||
return 1;
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
|
||||
static int32_t saveResult(int64_t ts, int32_t pageId, int32_t offset, uint64_t groupId, SArray* pUpdated) {
|
||||
int32_t size = taosArrayGetSize(pUpdated);
|
||||
int32_t index = binarySearch(pUpdated, size, ts, TSDB_ORDER_DESC, getReskey);
|
||||
SWinRes data = {.ts = ts, .groupId = groupId};
|
||||
int32_t index = binarySearchCom(pUpdated, size, &data, TSDB_ORDER_DESC, compareResKey);
|
||||
if (index == -1) {
|
||||
index = 0;
|
||||
} else {
|
||||
TSKEY resTs = getReskey(pUpdated, index);
|
||||
if (resTs < ts) {
|
||||
if (compareResKey(&data, pUpdated, index) > 0) {
|
||||
index++;
|
||||
} else {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -753,10 +826,10 @@ static int32_t saveResultRow(SResultRow* result, uint64_t groupId, SArray* pUpda
|
|||
return saveResult(result->win.skey, result->pageId, result->offset, groupId, pUpdated);
|
||||
}
|
||||
|
||||
static void removeResult(SArray* pUpdated, TSKEY key) {
|
||||
static void removeResult(SArray* pUpdated, SWinRes* pKey) {
|
||||
int32_t size = taosArrayGetSize(pUpdated);
|
||||
int32_t index = binarySearch(pUpdated, size, key, TSDB_ORDER_DESC, getReskey);
|
||||
if (index >= 0 && key == getReskey(pUpdated, index)) {
|
||||
int32_t index = binarySearchCom(pUpdated, size, pKey, TSDB_ORDER_DESC, compareResKey);
|
||||
if (index >= 0 && 0 == compareResKey(pKey, pUpdated, index)) {
|
||||
taosArrayRemove(pUpdated, index);
|
||||
}
|
||||
}
|
||||
|
@ -765,7 +838,7 @@ static void removeResults(SArray* pWins, SArray* pUpdated) {
|
|||
int32_t size = taosArrayGetSize(pWins);
|
||||
for (int32_t i = 0; i < size; i++) {
|
||||
SWinRes* pW = taosArrayGet(pWins, i);
|
||||
removeResult(pUpdated, pW->ts);
|
||||
removeResult(pUpdated, pW);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -775,14 +848,30 @@ int64_t getWinReskey(void* data, int32_t index) {
|
|||
return pos->ts;
|
||||
}
|
||||
|
||||
int32_t compareWinRes(void* pKey, void* data, int32_t index) {
|
||||
SArray* res = (SArray*)data;
|
||||
SWinRes* pos = taosArrayGetP(res, index);
|
||||
SResKeyPos* pData = (SResKeyPos*) pKey;
|
||||
if (*(int64_t*)pData->key == pos->ts) {
|
||||
if (pData->groupId > pos->groupId) {
|
||||
return 1;
|
||||
} else if (pData->groupId < pos->groupId) {
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
} else if (*(int64_t*)pData->key > pos->ts) {
|
||||
return 1;
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
|
||||
static void removeDeleteResults(SArray* pUpdated, SArray* pDelWins) {
|
||||
int32_t upSize = taosArrayGetSize(pUpdated);
|
||||
int32_t delSize = taosArrayGetSize(pDelWins);
|
||||
for (int32_t i = 0; i < upSize; i++) {
|
||||
SResKeyPos* pResKey = taosArrayGetP(pUpdated, i);
|
||||
int64_t key = *(int64_t*)pResKey->key;
|
||||
int32_t index = binarySearch(pDelWins, delSize, key, TSDB_ORDER_DESC, getWinReskey);
|
||||
if (index >= 0 && key == getWinReskey(pDelWins, index)) {
|
||||
int32_t index = binarySearchCom(pDelWins, delSize, pResKey, TSDB_ORDER_DESC, compareWinRes);
|
||||
if (index >= 0 && 0 == compareWinRes(pResKey, pDelWins, index)) {
|
||||
taosArrayRemove(pDelWins, index);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -462,6 +462,113 @@ if $data25 != 3 then
|
|||
return -1
|
||||
endi
|
||||
|
||||
sql create database test2 vgroups 1
|
||||
sql show databases
|
||||
|
||||
sql use test2
|
||||
sql create stable st(ts timestamp, a int, b int, c int, d double) tags(ta int,tb int,tc int);
|
||||
sql create table t1 using st tags(1,1,1);
|
||||
sql create table t2 using st tags(2,2,2);
|
||||
sql create table t3 using st tags(2,2,2);
|
||||
sql create table t4 using st tags(2,2,2);
|
||||
sql create table t5 using st tags(2,2,2);
|
||||
sql create stream streams2 trigger at_once into streamt as select _wstart, count(*) c1, sum(a) c3,max(b) c4 from st partition by tbname interval(10s)
|
||||
|
||||
sql insert into t1 values(1648791213000,1,1,1,1.0) t2 values(1648791213000,2,2,2,2.0) t3 values(1648791213000,3,3,3,3.0) t4 values(1648791213000,4,4,4,4.0);
|
||||
|
||||
$loop_count = 0
|
||||
|
||||
loop0:
|
||||
sleep 300
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select * from streamt;
|
||||
|
||||
if $rows != 4 then
|
||||
print =====rows=$rows
|
||||
goto loop0
|
||||
endi
|
||||
|
||||
sql insert into t1 values(1648791213000,5,5,5,5.0) t2 values(1648791213000,6,6,6,6.0) t5 values(1648791213000,7,7,7,7.0);
|
||||
|
||||
|
||||
$loop_count = 0
|
||||
|
||||
loop1:
|
||||
sleep 300
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select * from streamt order by c4 desc;
|
||||
|
||||
if $rows != 5 then
|
||||
print =====rows=$rows
|
||||
goto loop1
|
||||
endi
|
||||
|
||||
# row 0
|
||||
if $data01 != 1 then
|
||||
print =====data01=$data01
|
||||
goto loop1
|
||||
endi
|
||||
|
||||
if $data02 != 7 then
|
||||
print =====data02=$data02
|
||||
goto loop1
|
||||
endi
|
||||
|
||||
# row 1
|
||||
if $data11 != 1 then
|
||||
print =====data11=$data11
|
||||
goto loop1
|
||||
endi
|
||||
|
||||
if $data12 != 6 then
|
||||
print =====data12=$data12
|
||||
goto loop1
|
||||
endi
|
||||
|
||||
# row 2
|
||||
if $data21 != 1 then
|
||||
print =====data21=$data21
|
||||
goto loop1
|
||||
endi
|
||||
|
||||
if $data22 != 5 then
|
||||
print =====data22=$data22
|
||||
goto loop1
|
||||
endi
|
||||
|
||||
sql insert into t1 values(1648791213000,8,8,8,8.0);
|
||||
|
||||
$loop_count = 0
|
||||
|
||||
loop2:
|
||||
sleep 300
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select * from streamt order by c4 desc;
|
||||
|
||||
# row 0
|
||||
if $data01 != 1 then
|
||||
print =====data01=$data01
|
||||
goto loop2
|
||||
endi
|
||||
|
||||
if $data02 != 8 then
|
||||
print =====data02=$data02
|
||||
goto loop2
|
||||
endi
|
||||
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||
|
|
|
@ -366,18 +366,21 @@ if $data32 != 8 then
|
|||
goto loop1
|
||||
endi
|
||||
|
||||
#$loop_all = 0
|
||||
#looptest:
|
||||
|
||||
sql drop database IF EXISTS test2;
|
||||
sql drop stream IF EXISTS streams21;
|
||||
sql drop stream IF EXISTS streams22;
|
||||
|
||||
sql create database test2 vgroups 2;
|
||||
sql create database test2 vgroups 6;
|
||||
sql use test2;
|
||||
sql create stable st(ts timestamp, a int, b int, c int, d double) tags(ta int,tb int,tc int);
|
||||
sql create table t1 using st tags(1,1,1);
|
||||
sql create table t2 using st tags(2,2,2);
|
||||
|
||||
sql create stream streams21 trigger at_once into streamt as select _wstart, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from t1 interval(10s);
|
||||
sql create stream streams22 trigger at_once into streamt2 as select _wstart, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from st interval(10s);
|
||||
sql create stream streams21 trigger at_once into streamt as select _wstart, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from t1 interval(10s, 5s);
|
||||
sql create stream streams22 trigger at_once into streamt2 as select _wstart, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from st interval(10s, 5s);
|
||||
|
||||
sql insert into t1 values(1648791213000,1,1,1,1.0);
|
||||
sql insert into t1 values(1648791223001,2,2,2,1.1);
|
||||
|
@ -394,7 +397,7 @@ sql insert into t2 values(1648791213004,4,10,10,4.1);
|
|||
$loop_count = 0
|
||||
|
||||
loop2:
|
||||
sleep 300
|
||||
sleep 100
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
|
@ -452,7 +455,7 @@ print step 6
|
|||
$loop_count = 0
|
||||
|
||||
loop3:
|
||||
sleep 300
|
||||
# sleep 300
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
|
@ -464,7 +467,7 @@ sql select * from streamt2;
|
|||
# row 0
|
||||
if $data01 != 4 then
|
||||
print =====data01=$data01
|
||||
# goto loop3
|
||||
goto loop3
|
||||
endi
|
||||
|
||||
if $data02 != 10 then
|
||||
|
@ -505,4 +508,9 @@ if $data32 != 8 then
|
|||
goto loop3
|
||||
endi
|
||||
|
||||
$loop_all = $loop_all + 1
|
||||
print ============loop_all=$loop_all
|
||||
|
||||
#goto looptest
|
||||
|
||||
system sh/stop_dnodes.sh
|
Loading…
Reference in New Issue