From 8b269ca955660a96907f3e0f9e123a61ac692745 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 12 Jul 2024 15:12:43 +0800 Subject: [PATCH] refactor: do some internal refactor. --- source/libs/stream/src/streamCheckpoint.c | 49 ++--------------------- 1 file changed, 4 insertions(+), 45 deletions(-) diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index b490b0e02a..25974375e1 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -433,8 +433,8 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV taosThreadMutexLock(&pTask->lock); if (pReq->checkpointId <= pInfo->checkpointId) { - stDebug("s-task:%s vgId:%d latest checkpointId:%" PRId64 " checkpointVer:%" PRId64 - " no need to update the checkpoint info, updated checkpointId:%" PRId64 " checkpointVer:%" PRId64 + stDebug("s-task:%s vgId:%d latest checkpointId:%" PRId64 " Ver:%" PRId64 + " no need to update checkpoint info, updated checkpointId:%" PRId64 " Ver:%" PRId64 " transId:%d ignored", id, vgId, pInfo->checkpointId, pInfo->checkpointVer, pReq->checkpointId, pReq->checkpointVer, pReq->transId); @@ -1114,12 +1114,7 @@ int32_t deleteCheckpointFile(const char* id, const char* name) { } int32_t streamTaskSendRestoreChkptMsg(SStreamTask* pTask) { - int32_t code; - int32_t tlen = 0; - int32_t vgId = pTask->pMeta->vgId; - const char* id = pTask->id.idStr; - SCheckpointInfo* pInfo = &pTask->chkInfo; - + const char* id = pTask->id.idStr; taosThreadMutexLock(&pTask->lock); if (pTask->status.sendConsensusChkptId == true) { stDebug("s-task:%s already start to consensus-checkpointId, not start again before it completed", id); @@ -1133,44 +1128,8 @@ int32_t streamTaskSendRestoreChkptMsg(SStreamTask* pTask) { ASSERT(pTask->pBackend == NULL); pTask->status.requireConsensusChkptId = true; -#if 0 - SRestoreCheckpointInfo req = { - .streamId = pTask->id.streamId, - .taskId = pTask->id.taskId, - .nodeId = vgId, - .checkpointId = pInfo->checkpointId, - .startTs = pTask->execInfo.created, - }; - tEncodeSize(tEncodeRestoreCheckpointInfo, &req, tlen, code); - if (code < 0) { - stError("s-task:%s vgId:%d encode stream task latest-checkpoint-id failed, code:%s", id, vgId, tstrerror(code)); - return -1; - } - - void* buf = rpcMallocCont(tlen); - if (buf == NULL) { - stError("s-task:%s vgId:%d encode stream task latest-checkpoint-id msg failed, code:%s", id, vgId, - tstrerror(TSDB_CODE_OUT_OF_MEMORY)); - return -1; - } - - SEncoder encoder; - tEncoderInit(&encoder, buf, tlen); - if ((code = tEncodeRestoreCheckpointInfo(&encoder, &req)) < 0) { - rpcFreeCont(buf); - stError("s-task:%s vgId:%d encode stream task latest-checkpoint-id msg failed, code:%s", id, vgId, tstrerror(code)); - return -1; - } - tEncoderClear(&encoder); - - SRpcMsg msg = {0}; - initRpcMsg(&msg, TDMT_MND_STREAM_REQ_CONSEN_CHKPT, buf, tlen); - stDebug("s-task:%s vgId:%d send latest checkpointId:%" PRId64 " to mnode to get the consensus checkpointId", id, vgId, - pInfo->checkpointId); - - tmsgSendReq(&pTask->info.mnodeEpset, &msg); -#endif + stDebug("s-task:%s set the require consensus-checkpointId flag", id); return 0; }