refactor: do some internal refactor.
This commit is contained in:
parent
cb42806148
commit
8b269ca955
|
@ -433,8 +433,8 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV
|
||||||
taosThreadMutexLock(&pTask->lock);
|
taosThreadMutexLock(&pTask->lock);
|
||||||
|
|
||||||
if (pReq->checkpointId <= pInfo->checkpointId) {
|
if (pReq->checkpointId <= pInfo->checkpointId) {
|
||||||
stDebug("s-task:%s vgId:%d latest checkpointId:%" PRId64 " checkpointVer:%" PRId64
|
stDebug("s-task:%s vgId:%d latest checkpointId:%" PRId64 " Ver:%" PRId64
|
||||||
" no need to update the checkpoint info, updated checkpointId:%" PRId64 " checkpointVer:%" PRId64
|
" no need to update checkpoint info, updated checkpointId:%" PRId64 " Ver:%" PRId64
|
||||||
" transId:%d ignored",
|
" transId:%d ignored",
|
||||||
id, vgId, pInfo->checkpointId, pInfo->checkpointVer, pReq->checkpointId, pReq->checkpointVer,
|
id, vgId, pInfo->checkpointId, pInfo->checkpointVer, pReq->checkpointId, pReq->checkpointVer,
|
||||||
pReq->transId);
|
pReq->transId);
|
||||||
|
@ -1114,12 +1114,7 @@ int32_t deleteCheckpointFile(const char* id, const char* name) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamTaskSendRestoreChkptMsg(SStreamTask* pTask) {
|
int32_t streamTaskSendRestoreChkptMsg(SStreamTask* pTask) {
|
||||||
int32_t code;
|
const char* id = pTask->id.idStr;
|
||||||
int32_t tlen = 0;
|
|
||||||
int32_t vgId = pTask->pMeta->vgId;
|
|
||||||
const char* id = pTask->id.idStr;
|
|
||||||
SCheckpointInfo* pInfo = &pTask->chkInfo;
|
|
||||||
|
|
||||||
taosThreadMutexLock(&pTask->lock);
|
taosThreadMutexLock(&pTask->lock);
|
||||||
if (pTask->status.sendConsensusChkptId == true) {
|
if (pTask->status.sendConsensusChkptId == true) {
|
||||||
stDebug("s-task:%s already start to consensus-checkpointId, not start again before it completed", id);
|
stDebug("s-task:%s already start to consensus-checkpointId, not start again before it completed", id);
|
||||||
|
@ -1133,44 +1128,8 @@ int32_t streamTaskSendRestoreChkptMsg(SStreamTask* pTask) {
|
||||||
|
|
||||||
ASSERT(pTask->pBackend == NULL);
|
ASSERT(pTask->pBackend == NULL);
|
||||||
pTask->status.requireConsensusChkptId = true;
|
pTask->status.requireConsensusChkptId = true;
|
||||||
#if 0
|
|
||||||
SRestoreCheckpointInfo req = {
|
|
||||||
.streamId = pTask->id.streamId,
|
|
||||||
.taskId = pTask->id.taskId,
|
|
||||||
.nodeId = vgId,
|
|
||||||
.checkpointId = pInfo->checkpointId,
|
|
||||||
.startTs = pTask->execInfo.created,
|
|
||||||
};
|
|
||||||
|
|
||||||
tEncodeSize(tEncodeRestoreCheckpointInfo, &req, tlen, code);
|
stDebug("s-task:%s set the require consensus-checkpointId flag", id);
|
||||||
if (code < 0) {
|
|
||||||
stError("s-task:%s vgId:%d encode stream task latest-checkpoint-id failed, code:%s", id, vgId, tstrerror(code));
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
void* buf = rpcMallocCont(tlen);
|
|
||||||
if (buf == NULL) {
|
|
||||||
stError("s-task:%s vgId:%d encode stream task latest-checkpoint-id msg failed, code:%s", id, vgId,
|
|
||||||
tstrerror(TSDB_CODE_OUT_OF_MEMORY));
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
SEncoder encoder;
|
|
||||||
tEncoderInit(&encoder, buf, tlen);
|
|
||||||
if ((code = tEncodeRestoreCheckpointInfo(&encoder, &req)) < 0) {
|
|
||||||
rpcFreeCont(buf);
|
|
||||||
stError("s-task:%s vgId:%d encode stream task latest-checkpoint-id msg failed, code:%s", id, vgId, tstrerror(code));
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
tEncoderClear(&encoder);
|
|
||||||
|
|
||||||
SRpcMsg msg = {0};
|
|
||||||
initRpcMsg(&msg, TDMT_MND_STREAM_REQ_CONSEN_CHKPT, buf, tlen);
|
|
||||||
stDebug("s-task:%s vgId:%d send latest checkpointId:%" PRId64 " to mnode to get the consensus checkpointId", id, vgId,
|
|
||||||
pInfo->checkpointId);
|
|
||||||
|
|
||||||
tmsgSendReq(&pTask->info.mnodeEpset, &msg);
|
|
||||||
#endif
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue