From f7ea875af1ca1525b0070574c130dc8ac279c942 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 24 Aug 2023 23:27:48 +0800 Subject: [PATCH] fix(stream): add some logs. --- source/dnode/vnode/src/tq/tq.c | 49 ----------------------- source/libs/stream/src/streamCheckpoint.c | 9 +++-- 2 files changed, 5 insertions(+), 53 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 139fef14c8..1011a0cbfb 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -237,55 +237,6 @@ void tqNotifyClose(STQ* pTq) { tqDebug("vgId:%d all stream tasks are not in timer, continue close, elapsed time:%" PRId64 " ms", pMeta->vgId, el); } -// static int32_t doSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch, -// int64_t consumerId, int32_t type) { -// int32_t len = 0; -// int32_t code = 0; -// -// if (type == TMQ_MSG_TYPE__POLL_DATA_RSP) { -// tEncodeSize(tEncodeMqDataRsp, pRsp, len, code); -// } else if (type == TMQ_MSG_TYPE__POLL_DATA_META_RSP) { -// tEncodeSize(tEncodeSTaosxRsp, (STaosxRsp*)pRsp, len, code); -// } -// -// if (code < 0) { -// return -1; -// } -// -// int32_t tlen = sizeof(SMqRspHead) + len; -// void* buf = rpcMallocCont(tlen); -// if (buf == NULL) { -// return -1; -// } -// -// ((SMqRspHead*)buf)->mqMsgType = type; -// ((SMqRspHead*)buf)->epoch = epoch; -// ((SMqRspHead*)buf)->consumerId = consumerId; -// -// void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead)); -// -// SEncoder encoder = {0}; -// tEncoderInit(&encoder, abuf, len); -// -// if (type == TMQ_MSG_TYPE__POLL_DATA_RSP) { -// tEncodeMqDataRsp(&encoder, pRsp); -// } else if (type == TMQ_MSG_TYPE__POLL_DATA_META_RSP) { -// tEncodeSTaosxRsp(&encoder, (STaosxRsp*)pRsp); -// } -// -// tEncoderClear(&encoder); -// -// SRpcMsg rsp = { -// .info = *pRpcHandleInfo, -// .pCont = buf, -// .contLen = tlen, -// .code = 0, -// }; -// -// tmsgSendRsp(&rsp); -// return 0; -// } - int32_t tqPushEmptyDataRsp(STqHandle* pHandle, int32_t vgId) { SMqPollReq req = {0}; if (tDeserializeSMqPollReq(pHandle->msg->pCont, pHandle->msg->contLen, &req) < 0) { diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index adb587fdf8..56f8549b36 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -274,7 +274,7 @@ int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) { ASSERT(p->chkInfo.checkpointId < p->checkpointingId && p->checkpointingId == checkpointId); p->chkInfo.checkpointId = p->checkpointingId; - p->status.taskStatus = TASK_STATUS__NORMAL; + streamSetStatusNormal(p); // save the task streamMetaSaveTask(pMeta, p); @@ -308,14 +308,15 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { ASSERT(remain >= 0); if (remain == 0) { // all tasks are in TASK_STATUS__CK_READY state + qDebug("s-task:%s is ready for checkpoint", pTask->id.idStr); + streamBackendDoCheckpoint(pMeta, pTask->checkpointingId); streamSaveAllTaskStatus(pMeta, pTask->checkpointingId); - qDebug("vgId:%d vnode wide checkpoint completed, save all tasks status, checkpointId:%" PRId64, pMeta->vgId, pTask->checkpointingId); } else { - qDebug("vgId:%d vnode wide tasks not reach checkpoint ready status, not ready:%d/%d", pMeta->vgId, remain, - (int32_t)taosArrayGetSize(pMeta->pTaskList)); + qDebug("vgId:%d vnode wide tasks not reach checkpoint ready status, ready s-task:%s, not ready:%d/%d", + pMeta->vgId, pTask->id.idStr, remain, (int32_t)taosArrayGetSize(pMeta->pTaskList)); } // send check point response to upstream task