Merge pull request #25012 from taosdata/fix/TD-28997
load operator checkpoint
This commit is contained in:
commit
f4265092ce
|
@ -390,10 +390,12 @@ void* doStreamCountDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOperato
|
||||||
buf = taosDecodeFixedI32(buf, &mapSize);
|
buf = taosDecodeFixedI32(buf, &mapSize);
|
||||||
for (int32_t i = 0; i < mapSize; i++) {
|
for (int32_t i = 0; i < mapSize; i++) {
|
||||||
SSessionKey key = {0};
|
SSessionKey key = {0};
|
||||||
SResultWindowInfo winfo = {0};
|
SCountWindowInfo curWin = {0};
|
||||||
buf = decodeSSessionKey(buf, &key);
|
buf = decodeSSessionKey(buf, &key);
|
||||||
buf = decodeSResultWindowInfo(buf, &winfo, pInfo->streamAggSup.resultRowSize);
|
SBuffInfo buffInfo = {.rebuildWindow = false, .winBuffOp = NONE_WINDOW, .pCur = NULL};
|
||||||
tSimpleHashPut(pInfo->streamAggSup.pResultRows, &key, sizeof(SSessionKey), &winfo, sizeof(SResultWindowInfo));
|
setCountOutputBuf(&pInfo->streamAggSup, key.win.skey, key.groupId, &curWin, &buffInfo);
|
||||||
|
buf = decodeSResultWindowInfo(buf, &curWin.winInfo, pInfo->streamAggSup.resultRowSize);
|
||||||
|
tSimpleHashPut(pInfo->streamAggSup.pResultRows, &key, sizeof(SSessionKey), &curWin.winInfo, sizeof(SResultWindowInfo));
|
||||||
}
|
}
|
||||||
|
|
||||||
// 2.twAggSup
|
// 2.twAggSup
|
||||||
|
@ -694,6 +696,8 @@ SOperatorInfo* createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhys
|
||||||
pInfo->recvGetAll = false;
|
pInfo->recvGetAll = false;
|
||||||
|
|
||||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_COUNT;
|
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_COUNT;
|
||||||
|
setOperatorInfo(pOperator, getStreamOpName(pOperator->operatorType), QUERY_NODE_PHYSICAL_PLAN_STREAM_COUNT, true,
|
||||||
|
OP_NOT_OPENED, pInfo, pTaskInfo);
|
||||||
// for stream
|
// for stream
|
||||||
void* buff = NULL;
|
void* buff = NULL;
|
||||||
int32_t len = 0;
|
int32_t len = 0;
|
||||||
|
@ -704,8 +708,6 @@ SOperatorInfo* createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhys
|
||||||
doStreamCountDecodeOpState(buff, len, pOperator, true);
|
doStreamCountDecodeOpState(buff, len, pOperator, true);
|
||||||
taosMemoryFree(buff);
|
taosMemoryFree(buff);
|
||||||
}
|
}
|
||||||
setOperatorInfo(pOperator, getStreamOpName(pOperator->operatorType), QUERY_NODE_PHYSICAL_PLAN_STREAM_COUNT, true,
|
|
||||||
OP_NOT_OPENED, pInfo, pTaskInfo);
|
|
||||||
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamCountAgg, NULL, destroyStreamCountAggOperatorInfo,
|
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamCountAgg, NULL, destroyStreamCountAggOperatorInfo,
|
||||||
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
||||||
setOperatorStreamStateFn(pOperator, streamCountReleaseState, streamCountReloadState);
|
setOperatorStreamStateFn(pOperator, streamCountReleaseState, streamCountReloadState);
|
||||||
|
|
|
@ -406,6 +406,7 @@ void* doStreamEventDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOperato
|
||||||
if (!pInfo) {
|
if (!pInfo) {
|
||||||
return buf;
|
return buf;
|
||||||
}
|
}
|
||||||
|
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
|
||||||
|
|
||||||
// 4.checksum
|
// 4.checksum
|
||||||
int32_t dataLen = len - sizeof(uint32_t);
|
int32_t dataLen = len - sizeof(uint32_t);
|
||||||
|
@ -423,6 +424,8 @@ void* doStreamEventDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOperato
|
||||||
SSessionKey key = {0};
|
SSessionKey key = {0};
|
||||||
SResultWindowInfo winfo = {0};
|
SResultWindowInfo winfo = {0};
|
||||||
buf = decodeSSessionKey(buf, &key);
|
buf = decodeSSessionKey(buf, &key);
|
||||||
|
pAggSup->stateStore.streamStateSessionAddIfNotExist(pAggSup->pState, &winfo.sessionWin, pAggSup->gap,
|
||||||
|
(void**)&winfo.pStatePos, &pAggSup->resultRowSize);
|
||||||
buf = decodeSResultWindowInfo(buf, &winfo, pInfo->streamAggSup.resultRowSize);
|
buf = decodeSResultWindowInfo(buf, &winfo, pInfo->streamAggSup.resultRowSize);
|
||||||
tSimpleHashPut(pInfo->streamAggSup.pResultRows, &key, sizeof(SSessionKey), &winfo, sizeof(SResultWindowInfo));
|
tSimpleHashPut(pInfo->streamAggSup.pResultRows, &key, sizeof(SSessionKey), &winfo, sizeof(SResultWindowInfo));
|
||||||
}
|
}
|
||||||
|
@ -735,6 +738,8 @@ SOperatorInfo* createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhys
|
||||||
pInfo->reCkBlock = false;
|
pInfo->reCkBlock = false;
|
||||||
pInfo->recvGetAll = false;
|
pInfo->recvGetAll = false;
|
||||||
|
|
||||||
|
setOperatorInfo(pOperator, "StreamEventAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT, true, OP_NOT_OPENED,
|
||||||
|
pInfo, pTaskInfo);
|
||||||
// for stream
|
// for stream
|
||||||
void* buff = NULL;
|
void* buff = NULL;
|
||||||
int32_t len = 0;
|
int32_t len = 0;
|
||||||
|
@ -746,8 +751,6 @@ SOperatorInfo* createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhys
|
||||||
taosMemoryFree(buff);
|
taosMemoryFree(buff);
|
||||||
}
|
}
|
||||||
|
|
||||||
setOperatorInfo(pOperator, "StreamEventAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT, true, OP_NOT_OPENED,
|
|
||||||
pInfo, pTaskInfo);
|
|
||||||
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamEventAgg, NULL, destroyStreamEventOperatorInfo,
|
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamEventAgg, NULL, destroyStreamEventOperatorInfo,
|
||||||
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
||||||
setOperatorStreamStateFn(pOperator, streamEventReleaseState, streamEventReloadState);
|
setOperatorStreamStateFn(pOperator, streamEventReleaseState, streamEventReloadState);
|
||||||
|
|
|
@ -2533,7 +2533,6 @@ int32_t encodeSResultWindowInfo(void** buf, SResultWindowInfo* key, int32_t outL
|
||||||
|
|
||||||
void* decodeSResultWindowInfo(void* buf, SResultWindowInfo* key, int32_t outLen) {
|
void* decodeSResultWindowInfo(void* buf, SResultWindowInfo* key, int32_t outLen) {
|
||||||
buf = taosDecodeFixedBool(buf, &key->isOutput);
|
buf = taosDecodeFixedBool(buf, &key->isOutput);
|
||||||
key->pStatePos->pRowBuff = NULL;
|
|
||||||
buf = decodeSSessionKey(buf, &key->sessionWin);
|
buf = decodeSSessionKey(buf, &key->sessionWin);
|
||||||
return buf;
|
return buf;
|
||||||
}
|
}
|
||||||
|
@ -2591,6 +2590,7 @@ void* doStreamSessionDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOpera
|
||||||
if (!pInfo) {
|
if (!pInfo) {
|
||||||
return buf;
|
return buf;
|
||||||
}
|
}
|
||||||
|
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
|
||||||
|
|
||||||
// 5.checksum
|
// 5.checksum
|
||||||
if (isParent) {
|
if (isParent) {
|
||||||
|
@ -2609,6 +2609,8 @@ void* doStreamSessionDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOpera
|
||||||
SSessionKey key = {0};
|
SSessionKey key = {0};
|
||||||
SResultWindowInfo winfo = {0};
|
SResultWindowInfo winfo = {0};
|
||||||
buf = decodeSSessionKey(buf, &key);
|
buf = decodeSSessionKey(buf, &key);
|
||||||
|
pAggSup->stateStore.streamStateSessionAddIfNotExist(pAggSup->pState, &winfo.sessionWin, pAggSup->gap,
|
||||||
|
(void**)&winfo.pStatePos, &pAggSup->resultRowSize);
|
||||||
buf = decodeSResultWindowInfo(buf, &winfo, pInfo->streamAggSup.resultRowSize);
|
buf = decodeSResultWindowInfo(buf, &winfo, pInfo->streamAggSup.resultRowSize);
|
||||||
tSimpleHashPut(pInfo->streamAggSup.pResultRows, &key, sizeof(SSessionKey), &winfo, sizeof(SResultWindowInfo));
|
tSimpleHashPut(pInfo->streamAggSup.pResultRows, &key, sizeof(SSessionKey), &winfo, sizeof(SResultWindowInfo));
|
||||||
}
|
}
|
||||||
|
@ -2992,6 +2994,8 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
|
||||||
pInfo->recvGetAll = false;
|
pInfo->recvGetAll = false;
|
||||||
|
|
||||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION;
|
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION;
|
||||||
|
setOperatorInfo(pOperator, getStreamOpName(pOperator->operatorType), QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION, true,
|
||||||
|
OP_NOT_OPENED, pInfo, pTaskInfo);
|
||||||
// for stream
|
// for stream
|
||||||
void* buff = NULL;
|
void* buff = NULL;
|
||||||
int32_t len = 0;
|
int32_t len = 0;
|
||||||
|
@ -3002,8 +3006,6 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
|
||||||
doStreamSessionDecodeOpState(buff, len, pOperator, true);
|
doStreamSessionDecodeOpState(buff, len, pOperator, true);
|
||||||
taosMemoryFree(buff);
|
taosMemoryFree(buff);
|
||||||
}
|
}
|
||||||
setOperatorInfo(pOperator, getStreamOpName(pOperator->operatorType), QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION, true,
|
|
||||||
OP_NOT_OPENED, pInfo, pTaskInfo);
|
|
||||||
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamSessionAgg, NULL, destroyStreamSessionAggOperatorInfo,
|
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamSessionAgg, NULL, destroyStreamSessionAggOperatorInfo,
|
||||||
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
||||||
setOperatorStreamStateFn(pOperator, streamSessionReleaseState, streamSessionReloadState);
|
setOperatorStreamStateFn(pOperator, streamSessionReleaseState, streamSessionReloadState);
|
||||||
|
@ -3538,6 +3540,7 @@ void* doStreamStateDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOperato
|
||||||
if (!pInfo) {
|
if (!pInfo) {
|
||||||
return buf;
|
return buf;
|
||||||
}
|
}
|
||||||
|
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
|
||||||
|
|
||||||
// 5.checksum
|
// 5.checksum
|
||||||
if (isParent) {
|
if (isParent) {
|
||||||
|
@ -3556,6 +3559,9 @@ void* doStreamStateDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOperato
|
||||||
SSessionKey key = {0};
|
SSessionKey key = {0};
|
||||||
SResultWindowInfo winfo = {0};
|
SResultWindowInfo winfo = {0};
|
||||||
buf = decodeSSessionKey(buf, &key);
|
buf = decodeSSessionKey(buf, &key);
|
||||||
|
pAggSup->stateStore.streamStateStateAddIfNotExist(pAggSup->pState, &winfo.sessionWin, NULL,
|
||||||
|
pAggSup->stateKeySize, compareStateKey,
|
||||||
|
(void**)&winfo.pStatePos, &pAggSup->resultRowSize);
|
||||||
buf = decodeSResultWindowInfo(buf, &winfo, pInfo->streamAggSup.resultRowSize);
|
buf = decodeSResultWindowInfo(buf, &winfo, pInfo->streamAggSup.resultRowSize);
|
||||||
tSimpleHashPut(pInfo->streamAggSup.pResultRows, &key, sizeof(SSessionKey), &winfo, sizeof(SResultWindowInfo));
|
tSimpleHashPut(pInfo->streamAggSup.pResultRows, &key, sizeof(SSessionKey), &winfo, sizeof(SResultWindowInfo));
|
||||||
}
|
}
|
||||||
|
@ -3873,6 +3879,8 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
|
||||||
pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT);
|
pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT);
|
||||||
pInfo->recvGetAll = false;
|
pInfo->recvGetAll = false;
|
||||||
|
|
||||||
|
setOperatorInfo(pOperator, "StreamStateAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE, true, OP_NOT_OPENED,
|
||||||
|
pInfo, pTaskInfo);
|
||||||
// for stream
|
// for stream
|
||||||
void* buff = NULL;
|
void* buff = NULL;
|
||||||
int32_t len = 0;
|
int32_t len = 0;
|
||||||
|
@ -3884,8 +3892,6 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
|
||||||
taosMemoryFree(buff);
|
taosMemoryFree(buff);
|
||||||
}
|
}
|
||||||
|
|
||||||
setOperatorInfo(pOperator, "StreamStateAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE, true, OP_NOT_OPENED,
|
|
||||||
pInfo, pTaskInfo);
|
|
||||||
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamStateAgg, NULL, destroyStreamStateOperatorInfo,
|
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamStateAgg, NULL, destroyStreamStateOperatorInfo,
|
||||||
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
||||||
setOperatorStreamStateFn(pOperator, streamStateReleaseState, streamStateReloadState);
|
setOperatorStreamStateFn(pOperator, streamStateReleaseState, streamStateReloadState);
|
||||||
|
|
|
@ -21,6 +21,8 @@ print create stream streams1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0
|
||||||
|
|
||||||
sql create stream streams1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt1 as select _wstart, count(*) c1, count(d) c2 , sum(a) c3 , max(a) c4, min(c) c5, max(id) c from t1 state_window(a);
|
sql create stream streams1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt1 as select _wstart, count(*) c1, count(d) c2 , sum(a) c3 , max(a) c4, min(c) c5, max(id) c from t1 state_window(a);
|
||||||
|
|
||||||
|
sleep 1000
|
||||||
|
|
||||||
sql insert into t1 values(1648791213000,1,2,3,1.0,1);
|
sql insert into t1 values(1648791213000,1,2,3,1.0,1);
|
||||||
sql insert into t1 values(1648791213000,1,2,3,1.0,2);
|
sql insert into t1 values(1648791213000,1,2,3,1.0,2);
|
||||||
$loop_count = 0
|
$loop_count = 0
|
||||||
|
@ -457,6 +459,8 @@ print create stream streams2 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0
|
||||||
|
|
||||||
sql create stream streams2 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt1 as select _wstart, count(*) c1, count(d) c2 , sum(a) c3 , max(a) c4, min(c) c5, max(id) c from t1 state_window(a);
|
sql create stream streams2 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt1 as select _wstart, count(*) c1, count(d) c2 , sum(a) c3 , max(a) c4, min(c) c5, max(id) c from t1 state_window(a);
|
||||||
|
|
||||||
|
sleep 1000
|
||||||
|
|
||||||
sql insert into t1 values(1648791212000,2,2,3,1.0,1);
|
sql insert into t1 values(1648791212000,2,2,3,1.0,1);
|
||||||
sql insert into t1 values(1648791213000,1,2,3,1.0,1);
|
sql insert into t1 values(1648791213000,1,2,3,1.0,1);
|
||||||
sql insert into t1 values(1648791213000,1,2,4,1.0,2);
|
sql insert into t1 values(1648791213000,1,2,4,1.0,2);
|
||||||
|
@ -504,6 +508,9 @@ sql create table t1(ts timestamp, a int, b int , c int, d double, id int);
|
||||||
print create stream streams3 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt3 as select _wstart, count(*) c1, sum(b) c3 from t1 state_window(a);
|
print create stream streams3 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt3 as select _wstart, count(*) c1, sum(b) c3 from t1 state_window(a);
|
||||||
|
|
||||||
sql create stream streams3 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt3 as select _wstart, count(*) c1, sum(b) c3 from t1 state_window(a);
|
sql create stream streams3 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt3 as select _wstart, count(*) c1, sum(b) c3 from t1 state_window(a);
|
||||||
|
|
||||||
|
sleep 1000
|
||||||
|
|
||||||
sql insert into t1 values(1648791212000,1,2,3,1.0,1);
|
sql insert into t1 values(1648791212000,1,2,3,1.0,1);
|
||||||
sql insert into t1 values(1648791213000,2,2,3,1.0,1);
|
sql insert into t1 values(1648791213000,2,2,3,1.0,1);
|
||||||
sql insert into t1 values(1648791214000,3,2,4,1.0,2);
|
sql insert into t1 values(1648791214000,3,2,4,1.0,2);
|
||||||
|
@ -557,6 +564,8 @@ print create stream if not exists streams4 trigger window_close IGNORE EXPIRED 0
|
||||||
|
|
||||||
sql create stream if not exists streams4 trigger window_close IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt4 as select _wstart AS startts, min(c1),count(c1) from t1 state_window(c1);
|
sql create stream if not exists streams4 trigger window_close IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt4 as select _wstart AS startts, min(c1),count(c1) from t1 state_window(c1);
|
||||||
|
|
||||||
|
sleep 1000
|
||||||
|
|
||||||
sql insert into t1 (ts, c1) values (1668073288209, 11);
|
sql insert into t1 (ts, c1) values (1668073288209, 11);
|
||||||
sql insert into t1 (ts, c1) values (1668073288210, 11);
|
sql insert into t1 (ts, c1) values (1668073288210, 11);
|
||||||
sql insert into t1 (ts, c1) values (1668073288211, 11);
|
sql insert into t1 (ts, c1) values (1668073288211, 11);
|
||||||
|
@ -745,6 +754,9 @@ sql create table b (c timestamp, d int, e int , f int, g double);
|
||||||
print create stream streams0 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _wstart c1, count(*) c2, max(a) c3 from tb state_window(a);
|
print create stream streams0 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _wstart c1, count(*) c2, max(a) c3 from tb state_window(a);
|
||||||
|
|
||||||
sql create stream streams0 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _wstart c1, count(*) c2, max(a) c3 from tb state_window(a);
|
sql create stream streams0 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _wstart c1, count(*) c2, max(a) c3 from tb state_window(a);
|
||||||
|
|
||||||
|
sleep 1000
|
||||||
|
|
||||||
sql insert into b values(1648791213000,NULL,NULL,NULL,NULL);
|
sql insert into b values(1648791213000,NULL,NULL,NULL,NULL);
|
||||||
sql select * from streamt order by c1, c2, c3;
|
sql select * from streamt order by c1, c2, c3;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue