diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 63208538a3..8ce9ff6def 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -885,7 +885,7 @@ static int32_t mndProcessStreamCheckpointTmr(SRpcMsg *pReq) { return 0; } - int64_t checkpointId = tGenIdPI64(); + int64_t checkpointId = taosGetTimestampMs(); SMStreamDoCheckpointMsg *pMsg = rpcMallocCont(sizeof(SMStreamDoCheckpointMsg)); pMsg->checkpointId = checkpointId; @@ -994,7 +994,7 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre mndTransDrop(pTrans); return -1; } - mDebug("start to trigger checkpoint for stream:%s", pStream->name); + mDebug("start to trigger checkpoint for stream:%s, checkpoint: %" PRId64 "", pStream->name, checkpointId); atomic_store_64(&pStream->currentTick, 1); taosWLockLatch(&pStream->lock); // 1. redo action: broadcast checkpoint source msg for all source vg diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index f772237635..5838a5bf0f 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -16,10 +16,10 @@ #include "streamInt.h" #include "ttimer.h" -#define STREAM_TASK_INPUT_QUEUE_CAPACITY 20480 -#define STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE (30) -#define ONE_MB_F (1048576.0) -#define QUEUE_MEM_SIZE_IN_MB(_q) (taosQueueMemorySize(_q) / ONE_MB_F) +#define STREAM_TASK_INPUT_QUEUE_CAPACITY 20480 +#define STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE (30) +#define ONE_MB_F (1048576.0) +#define QUEUE_MEM_SIZE_IN_MB(_q) (taosQueueMemorySize(_q) / ONE_MB_F) SStreamGlobalEnv streamEnv; @@ -106,7 +106,7 @@ int32_t streamSetupScheduleTrigger(SStreamTask* pTask) { int32_t ref = atomic_add_fetch_32(&pTask->refCnt, 1); ASSERT(ref == 2 && pTask->schedTimer == NULL); - qDebug("s-task:%s setup scheduler trigger, delay:%"PRId64" ms", pTask->id.idStr, pTask->triggerParam); + qDebug("s-task:%s setup scheduler trigger, delay:%" PRId64 " ms", pTask->id.idStr, pTask->triggerParam); pTask->schedTimer = taosTmrStart(streamSchedByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer); pTask->triggerStatus = TASK_TRIGGER_STATUS__INACTIVE; @@ -155,7 +155,7 @@ int32_t streamTaskEnqueueBlocks(SStreamTask* pTask, const SStreamDispatchReq* pR } else { int32_t code = tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pBlock); // input queue is full, upstream is blocked now - status = (code == TSDB_CODE_SUCCESS)? TASK_INPUT_STATUS__NORMAL:TASK_INPUT_STATUS__BLOCKED; + status = (code == TSDB_CODE_SUCCESS) ? TASK_INPUT_STATUS__NORMAL : TASK_INPUT_STATUS__BLOCKED; } // rsp by input status @@ -183,8 +183,8 @@ int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq, // enqueue if (pData != NULL) { - qDebug("s-task:%s (child %d) recv retrieve req from task:0x%x(vgId:%d), reqId:0x%" PRIx64, pTask->id.idStr, pTask->info.selfChildId, - pReq->srcTaskId, pReq->srcNodeId, pReq->reqId); + qDebug("s-task:%s (child %d) recv retrieve req from task:0x%x(vgId:%d), reqId:0x%" PRIx64, pTask->id.idStr, + pTask->info.selfChildId, pReq->srcTaskId, pReq->srcNodeId, pReq->reqId); pData->type = STREAM_INPUT__DATA_RETRIEVE; pData->srcVgId = 0; @@ -265,7 +265,6 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i pRsp->downstreamTaskId, tstrerror(code), ++pTask->msgInfo.retryCount); int32_t ret = streamDispatchAllBlocks(pTask, pTask->msgInfo.pData); if (ret != TSDB_CODE_SUCCESS) { - } return TSDB_CODE_SUCCESS; @@ -291,20 +290,20 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i // so the TASK_INPUT_STATUS_BLOCKED is rsp // todo blocking the output status if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) { - pTask->msgInfo.blockingTs = taosGetTimestampMs(); // record the blocking start time + pTask->msgInfo.blockingTs = taosGetTimestampMs(); // record the blocking start time - int32_t waitDuration = 300; // 300 ms + int32_t waitDuration = 300; // 300 ms qError("s-task:%s inputQ of downstream task:0x%x is full, time:%" PRId64 "wait for %dms and retry dispatch data", pTask->id.idStr, pRsp->downstreamTaskId, pTask->msgInfo.blockingTs, waitDuration); streamRetryDispatchStreamBlock(pTask, waitDuration); - } else { // pipeline send data in output queue + } else { // pipeline send data in output queue // this message has been sent successfully, let's try next one. destroyStreamDataBlock(pTask->msgInfo.pData); pTask->msgInfo.pData = NULL; if (pTask->msgInfo.blockingTs != 0) { int64_t el = taosGetTimestampMs() - pTask->msgInfo.blockingTs; - qDebug("s-task:%s resume to normal from inputQ blocking, idle time:%"PRId64"ms", pTask->id.idStr, el); + qDebug("s-task:%s resume to normal from inputQ blocking, idle time:%" PRId64 "ms", pTask->id.idStr, el); pTask->msgInfo.blockingTs = 0; } @@ -350,9 +349,10 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { if (type == STREAM_INPUT__DATA_SUBMIT) { SStreamDataSubmit* px = (SStreamDataSubmit*)pItem; if ((pTask->info.taskLevel == TASK_LEVEL__SOURCE) && tInputQueueIsFull(pTask)) { - qError("s-task:%s input queue is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) stop to push data", - pTask->id.idStr, STREAM_TASK_INPUT_QUEUE_CAPACITY, STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE, total, - size); + qError( + "s-task:%s input queue is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) stop to push " + "data", + pTask->id.idStr, STREAM_TASK_INPUT_QUEUE_CAPACITY, STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE, total, size); streamDataSubmitDestroy(px); taosFreeQitem(pItem); return -1; @@ -366,27 +366,26 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { } qDebug("s-task:%s submit enqueue msgLen:%d ver:%" PRId64 ", total in queue:%d, size:%.2fMiB", pTask->id.idStr, - px->submit.msgLen, px->submit.ver, total, size + px->submit.msgLen/1048576.0); + px->submit.msgLen, px->submit.ver, total, size + px->submit.msgLen / 1048576.0); } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE || type == STREAM_INPUT__REF_DATA_BLOCK) { if ((pTask->info.taskLevel == TASK_LEVEL__SOURCE) && (tInputQueueIsFull(pTask))) { qError("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort", - pTask->id.idStr, STREAM_TASK_INPUT_QUEUE_CAPACITY, STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE, total, - size); - destroyStreamDataBlock((SStreamDataBlock*) pItem); + pTask->id.idStr, STREAM_TASK_INPUT_QUEUE_CAPACITY, STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE, total, size); + destroyStreamDataBlock((SStreamDataBlock*)pItem); return -1; } qDebug("s-task:%s data block enqueue, current(blocks:%d, size:%.2fMiB)", pTask->id.idStr, total, size); int32_t code = taosWriteQitem(pTask->inputQueue->queue, pItem); if (code != TSDB_CODE_SUCCESS) { - destroyStreamDataBlock((SStreamDataBlock*) pItem); + destroyStreamDataBlock((SStreamDataBlock*)pItem); return code; } } else if (type == STREAM_INPUT__CHECKPOINT) { taosWriteQitem(pTask->inputQueue->queue, pItem); qDebug("s-task:%s checkpoint enqueue, current(blocks:%d, size:%.2fMiB)", pTask->id.idStr, total, size); - } else if (type == STREAM_INPUT__GET_RES) { // use the default memory limit, refactor later. + } else if (type == STREAM_INPUT__GET_RES) { // use the default memory limit, refactor later. taosWriteQitem(pTask->inputQueue->queue, pItem); qDebug("s-task:%s data res enqueue, current(blocks:%d, size:%.2fMiB)", pTask->id.idStr, total, size); } diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 9233b608b5..6f726480c5 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -160,7 +160,7 @@ static int32_t streamTaskDispatchCheckpointMsg(SStreamTask* pTask, uint64_t chec req.downstreamTaskId = pVgInfo->taskId; streamDispatchCheckpointMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet); } - } else { // no need to dispatch msg to downstream task + } else { // no need to dispatch msg to downstream task qDebug("s-task:%s no down stream task, not dispatch checkpoint msg to downstream", pTask->id.idStr); streamProcessCheckpointRsp(NULL, pTask); } @@ -202,6 +202,7 @@ int32_t streamProcessCheckpointSourceReq(SStreamMeta* pMeta, SStreamTask* pTask, // 1. set task status to be prepared for check point, no data are allowed to put into inputQ. pTask->status.taskStatus = TASK_STATUS__CK; pTask->checkpointNotReadyTasks = 1; + pTask->checkpointingId = pReq->checkpointId; // 2. let's dispatch checkpoint msg to downstream task directly and do nothing else. streamTaskDispatchCheckpointMsg(pTask, checkpointId);