add checkpoint

This commit is contained in:
yihaoDeng 2023-07-10 17:33:48 +08:00
parent 4d838d3dff
commit e263d5f55f
3 changed files with 25 additions and 25 deletions

View File

@ -885,7 +885,7 @@ static int32_t mndProcessStreamCheckpointTmr(SRpcMsg *pReq) {
return 0;
}
int64_t checkpointId = tGenIdPI64();
int64_t checkpointId = taosGetTimestampMs();
SMStreamDoCheckpointMsg *pMsg = rpcMallocCont(sizeof(SMStreamDoCheckpointMsg));
pMsg->checkpointId = checkpointId;
@ -994,7 +994,7 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre
mndTransDrop(pTrans);
return -1;
}
mDebug("start to trigger checkpoint for stream:%s", pStream->name);
mDebug("start to trigger checkpoint for stream:%s, checkpoint: %" PRId64 "", pStream->name, checkpointId);
atomic_store_64(&pStream->currentTick, 1);
taosWLockLatch(&pStream->lock);
// 1. redo action: broadcast checkpoint source msg for all source vg

View File

@ -106,7 +106,7 @@ int32_t streamSetupScheduleTrigger(SStreamTask* pTask) {
int32_t ref = atomic_add_fetch_32(&pTask->refCnt, 1);
ASSERT(ref == 2 && pTask->schedTimer == NULL);
qDebug("s-task:%s setup scheduler trigger, delay:%"PRId64" ms", pTask->id.idStr, pTask->triggerParam);
qDebug("s-task:%s setup scheduler trigger, delay:%" PRId64 " ms", pTask->id.idStr, pTask->triggerParam);
pTask->schedTimer = taosTmrStart(streamSchedByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer);
pTask->triggerStatus = TASK_TRIGGER_STATUS__INACTIVE;
@ -155,7 +155,7 @@ int32_t streamTaskEnqueueBlocks(SStreamTask* pTask, const SStreamDispatchReq* pR
} else {
int32_t code = tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pBlock);
// input queue is full, upstream is blocked now
status = (code == TSDB_CODE_SUCCESS)? TASK_INPUT_STATUS__NORMAL:TASK_INPUT_STATUS__BLOCKED;
status = (code == TSDB_CODE_SUCCESS) ? TASK_INPUT_STATUS__NORMAL : TASK_INPUT_STATUS__BLOCKED;
}
// rsp by input status
@ -183,8 +183,8 @@ int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq,
// enqueue
if (pData != NULL) {
qDebug("s-task:%s (child %d) recv retrieve req from task:0x%x(vgId:%d), reqId:0x%" PRIx64, pTask->id.idStr, pTask->info.selfChildId,
pReq->srcTaskId, pReq->srcNodeId, pReq->reqId);
qDebug("s-task:%s (child %d) recv retrieve req from task:0x%x(vgId:%d), reqId:0x%" PRIx64, pTask->id.idStr,
pTask->info.selfChildId, pReq->srcTaskId, pReq->srcNodeId, pReq->reqId);
pData->type = STREAM_INPUT__DATA_RETRIEVE;
pData->srcVgId = 0;
@ -265,7 +265,6 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
pRsp->downstreamTaskId, tstrerror(code), ++pTask->msgInfo.retryCount);
int32_t ret = streamDispatchAllBlocks(pTask, pTask->msgInfo.pData);
if (ret != TSDB_CODE_SUCCESS) {
}
return TSDB_CODE_SUCCESS;
@ -304,7 +303,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
if (pTask->msgInfo.blockingTs != 0) {
int64_t el = taosGetTimestampMs() - pTask->msgInfo.blockingTs;
qDebug("s-task:%s resume to normal from inputQ blocking, idle time:%"PRId64"ms", pTask->id.idStr, el);
qDebug("s-task:%s resume to normal from inputQ blocking, idle time:%" PRId64 "ms", pTask->id.idStr, el);
pTask->msgInfo.blockingTs = 0;
}
@ -350,9 +349,10 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) {
if (type == STREAM_INPUT__DATA_SUBMIT) {
SStreamDataSubmit* px = (SStreamDataSubmit*)pItem;
if ((pTask->info.taskLevel == TASK_LEVEL__SOURCE) && tInputQueueIsFull(pTask)) {
qError("s-task:%s input queue is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) stop to push data",
pTask->id.idStr, STREAM_TASK_INPUT_QUEUE_CAPACITY, STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE, total,
size);
qError(
"s-task:%s input queue is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) stop to push "
"data",
pTask->id.idStr, STREAM_TASK_INPUT_QUEUE_CAPACITY, STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE, total, size);
streamDataSubmitDestroy(px);
taosFreeQitem(pItem);
return -1;
@ -366,21 +366,20 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) {
}
qDebug("s-task:%s submit enqueue msgLen:%d ver:%" PRId64 ", total in queue:%d, size:%.2fMiB", pTask->id.idStr,
px->submit.msgLen, px->submit.ver, total, size + px->submit.msgLen/1048576.0);
px->submit.msgLen, px->submit.ver, total, size + px->submit.msgLen / 1048576.0);
} else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE ||
type == STREAM_INPUT__REF_DATA_BLOCK) {
if ((pTask->info.taskLevel == TASK_LEVEL__SOURCE) && (tInputQueueIsFull(pTask))) {
qError("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort",
pTask->id.idStr, STREAM_TASK_INPUT_QUEUE_CAPACITY, STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE, total,
size);
destroyStreamDataBlock((SStreamDataBlock*) pItem);
pTask->id.idStr, STREAM_TASK_INPUT_QUEUE_CAPACITY, STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE, total, size);
destroyStreamDataBlock((SStreamDataBlock*)pItem);
return -1;
}
qDebug("s-task:%s data block enqueue, current(blocks:%d, size:%.2fMiB)", pTask->id.idStr, total, size);
int32_t code = taosWriteQitem(pTask->inputQueue->queue, pItem);
if (code != TSDB_CODE_SUCCESS) {
destroyStreamDataBlock((SStreamDataBlock*) pItem);
destroyStreamDataBlock((SStreamDataBlock*)pItem);
return code;
}
} else if (type == STREAM_INPUT__CHECKPOINT) {

View File

@ -202,6 +202,7 @@ int32_t streamProcessCheckpointSourceReq(SStreamMeta* pMeta, SStreamTask* pTask,
// 1. set task status to be prepared for check point, no data are allowed to put into inputQ.
pTask->status.taskStatus = TASK_STATUS__CK;
pTask->checkpointNotReadyTasks = 1;
pTask->checkpointingId = pReq->checkpointId;
// 2. let's dispatch checkpoint msg to downstream task directly and do nothing else.
streamTaskDispatchCheckpointMsg(pTask, checkpointId);