fix(stream): gen checkpoint for single task.
This commit is contained in:
parent
db474626e6
commit
0cd84aa587
|
@ -3033,7 +3033,7 @@ SStreamObj *mndGetStreamObj(SMnode *pMnode, int64_t streamId) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
static void doAddTaskId(SArray* pList, int32_t taskId) {
|
||||
static void doAddTaskId(SArray* pList, int32_t taskId, int64_t uid, int32_t numOfTotal) {
|
||||
int32_t num = taosArrayGetSize(pList);
|
||||
for(int32_t i = 0; i < num; ++i) {
|
||||
int32_t* pId = taosArrayGet(pList, i);
|
||||
|
@ -3043,6 +3043,9 @@ static void doAddTaskId(SArray* pList, int32_t taskId) {
|
|||
}
|
||||
|
||||
taosArrayPush(pList, &taskId);
|
||||
|
||||
int32_t numOfTasks = taosArrayGetSize(pList);
|
||||
mDebug("stream:0x%" PRIx64 " receive %d reqs for checkpoint, remain:%d", uid, numOfTasks, numOfTotal - numOfTasks);
|
||||
}
|
||||
|
||||
int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) {
|
||||
|
@ -3067,35 +3070,33 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) {
|
|||
taosThreadMutexLock(&execInfo.lock);
|
||||
|
||||
SStreamObj *pStream = mndGetStreamObj(pMnode, req.streamId);
|
||||
int32_t numOfTasks = mndGetNumOfStreamTasks(pStream);
|
||||
int32_t numOfTasks = mndGetNumOfStreamTasks(pStream);
|
||||
|
||||
void **pReqTaskList = taosHashGet(execInfo.pTransferStateStreams, &req.streamId, sizeof(req.streamId));
|
||||
SArray **pReqTaskList = (SArray**)taosHashGet(execInfo.pTransferStateStreams, &req.streamId, sizeof(req.streamId));
|
||||
if (pReqTaskList == NULL) {
|
||||
SArray *pList = taosArrayInit(4, sizeof(int32_t));
|
||||
doAddTaskId(pList, req.taskId);
|
||||
doAddTaskId(pList, req.taskId, pStream->uid, numOfTasks);
|
||||
taosHashPut(execInfo.pTransferStateStreams, &req.streamId, sizeof(int64_t), &pList, sizeof(void *));
|
||||
mDebug("stream:0x%" PRIx64 " receive %d reqs for checkpoint, remain:%d", pStream->uid, 1, numOfTasks - 1);
|
||||
|
||||
pReqTaskList = (SArray**)taosHashGet(execInfo.pTransferStateStreams, &req.streamId, sizeof(req.streamId));
|
||||
} else {
|
||||
doAddTaskId(*pReqTaskList, req.taskId);
|
||||
doAddTaskId(*pReqTaskList, req.taskId, pStream->uid, numOfTasks);
|
||||
}
|
||||
|
||||
int32_t total = taosArrayGetSize(*pReqTaskList);
|
||||
if (total == numOfTasks) { // all tasks has send the reqs
|
||||
int64_t checkpointId = mndStreamGenChkpId(pMnode);
|
||||
mDebug("stream:0x%" PRIx64 " all tasks req, start checkpointId:%" PRId64, pStream->uid, checkpointId);
|
||||
int32_t total = taosArrayGetSize(*pReqTaskList);
|
||||
if (total == numOfTasks) { // all tasks has send the reqs
|
||||
int64_t checkpointId = mndStreamGenChkpId(pMnode);
|
||||
mDebug("stream:0x%" PRIx64 " all tasks req, start checkpointId:%" PRId64, pStream->uid, checkpointId);
|
||||
|
||||
// TODO:handle error
|
||||
int32_t code = mndProcessStreamCheckpointTrans(pMnode, pStream, checkpointId, 0, false);
|
||||
// TODO:handle error
|
||||
int32_t code = mndProcessStreamCheckpointTrans(pMnode, pStream, checkpointId, 0, false);
|
||||
|
||||
// remove this entry
|
||||
taosArrayDestroy(*(SArray**)pReqTaskList);
|
||||
taosHashRemove(execInfo.pTransferStateStreams, &req.streamId, sizeof(int64_t));
|
||||
// remove this entry
|
||||
taosArrayDestroy(*pReqTaskList);
|
||||
taosHashRemove(execInfo.pTransferStateStreams, &req.streamId, sizeof(int64_t));
|
||||
|
||||
int32_t numOfStreams = taosHashGetSize(execInfo.pTransferStateStreams);
|
||||
mDebug("stream:0x%" PRIx64 " removed, remain streams:%d fill-history not completed", pStream->uid, numOfStreams);
|
||||
} else {
|
||||
mDebug("stream:0x%" PRIx64 " receive %d reqs for checkpoint, remain:%d", pStream->uid, total, numOfTasks - total);
|
||||
}
|
||||
int32_t numOfStreams = taosHashGetSize(execInfo.pTransferStateStreams);
|
||||
mDebug("stream:0x%" PRIx64 " removed, remain streams:%d fill-history not completed", pStream->uid, numOfStreams);
|
||||
}
|
||||
|
||||
mndReleaseStream(pMnode, pStream);
|
||||
|
|
|
@ -18,6 +18,7 @@ sql use test;
|
|||
sql create table t1(ts timestamp, a int, b int , c int, d double);
|
||||
|
||||
sql create stream stream1 trigger at_once fill_history 1 IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _wstart, count(*) c1, count(d) c2 , sum(a) c3 , max(b) c4, min(c) c5 from t1 interval(10s);
|
||||
sleep 1000
|
||||
|
||||
sql insert into t1 values(1648791213000,1,2,3,1.0);
|
||||
sql insert into t1 values(1648791223001,2,2,3,1.1);
|
||||
|
@ -224,53 +225,53 @@ endi
|
|||
|
||||
# row 2
|
||||
if $data21 != 1 then
|
||||
print ======$data21
|
||||
print ======$data21, expect 1
|
||||
goto loop01
|
||||
endi
|
||||
|
||||
if $data22 != 1 then
|
||||
print ======$data22
|
||||
print ======$data22 , expect 1
|
||||
goto loop01
|
||||
endi
|
||||
|
||||
if $data23 != 3 then
|
||||
print ======$data23
|
||||
print ======$data23 , expect 3
|
||||
goto loop01
|
||||
endi
|
||||
|
||||
if $data24 != 2 then
|
||||
print ======$data24
|
||||
print ======$data24 , expect 2
|
||||
goto loop01
|
||||
endi
|
||||
|
||||
if $data25 != 3 then
|
||||
print ======$data25
|
||||
print ======$data25 , expect 3
|
||||
goto loop01
|
||||
endi
|
||||
|
||||
# row 3
|
||||
if $data31 != 1 then
|
||||
print ======$data31
|
||||
print ======$data31 , expect 1
|
||||
goto loop01
|
||||
endi
|
||||
|
||||
if $data32 != 1 then
|
||||
print ======$data32
|
||||
print ======$data32 , expect 1
|
||||
goto loop01
|
||||
endi
|
||||
|
||||
if $data33 != 4 then
|
||||
print ======$data33
|
||||
print ======$data33 , expect 4
|
||||
goto loop01
|
||||
endi
|
||||
|
||||
if $data34 != 2 then
|
||||
print ======$data34
|
||||
print ======$data34 , expect 2
|
||||
goto loop01
|
||||
endi
|
||||
|
||||
if $data35 != 3 then
|
||||
print ======$data35
|
||||
print ======$data35 , expect 3
|
||||
goto loop01
|
||||
endi
|
||||
|
||||
|
|
Loading…
Reference in New Issue