fix(stream): always return success if repeatly recv checkpoint source msg.

This commit is contained in:
Haojun Liao 2024-05-07 16:33:08 +08:00
parent 0e0f98eb8d
commit b6debd985b
5 changed files with 29 additions and 25 deletions

View File

@ -923,10 +923,9 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask);
void streamTaskClearCheckInfo(SStreamTask* pTask, bool clearChkpReadyMsg); void streamTaskClearCheckInfo(SStreamTask* pTask, bool clearChkpReadyMsg);
int32_t streamAlignTransferState(SStreamTask* pTask); int32_t streamAlignTransferState(SStreamTask* pTask);
int32_t streamBuildAndSendDropTaskMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskId* pTaskId, int64_t resetRelHalt); int32_t streamBuildAndSendDropTaskMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskId* pTaskId, int64_t resetRelHalt);
int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SStreamTask* pTask, int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SStreamTask* pTask);
int8_t isSucceed);
int32_t buildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SRpcMsg* pMsg, int32_t buildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SRpcMsg* pMsg,
int8_t isSucceed); int32_t setCode);
SStreamTaskSM* streamCreateStateMachine(SStreamTask* pTask); SStreamTaskSM* streamCreateStateMachine(SStreamTask* pTask);
void* streamDestroyStateMachine(SStreamTaskSM* pSM); void* streamDestroyStateMachine(SStreamTaskSM* pSM);

View File

@ -37,7 +37,6 @@ typedef struct {
int64_t applyIndex; int64_t applyIndex;
uint64_t applyTerm; uint64_t applyTerm;
char user[TSDB_USER_LEN]; char user[TSDB_USER_LEN];
} SRpcConnInfo; } SRpcConnInfo;
typedef struct SRpcHandleInfo { typedef struct SRpcHandleInfo {
@ -63,7 +62,6 @@ typedef struct SRpcHandleInfo {
SRpcConnInfo conn; SRpcConnInfo conn;
int8_t forbiddenIp; int8_t forbiddenIp;
int8_t notFreeAhandle; int8_t notFreeAhandle;
} SRpcHandleInfo; } SRpcHandleInfo;
typedef struct SRpcMsg { typedef struct SRpcMsg {

View File

@ -1064,6 +1064,7 @@ _OVER:
return code; return code;
} }
// no matter what kinds of error happened, make sure the mnode will receive the success execution code.
int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) { int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) {
int32_t vgId = TD_VID(pTq->pVnode); int32_t vgId = TD_VID(pTq->pVnode);
SStreamMeta* pMeta = pTq->pStreamMeta; SStreamMeta* pMeta = pTq->pStreamMeta;
@ -1081,8 +1082,9 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
code = TSDB_CODE_MSG_DECODE_ERROR; code = TSDB_CODE_MSG_DECODE_ERROR;
tDecoderClear(&decoder); tDecoderClear(&decoder);
tqError("vgId:%d failed to decode checkpoint-source msg, code:%s", vgId, tstrerror(code)); tqError("vgId:%d failed to decode checkpoint-source msg, code:%s", vgId, tstrerror(code));
SRpcMsg rsp = {0}; SRpcMsg rsp = {0};
buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0); buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
tmsgSendRsp(&rsp); // error occurs tmsgSendRsp(&rsp); // error occurs
return code; return code;
} }
@ -1091,7 +1093,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
if (!vnodeIsRoleLeader(pTq->pVnode)) { if (!vnodeIsRoleLeader(pTq->pVnode)) {
tqDebug("vgId:%d not leader, ignore checkpoint-source msg, s-task:0x%x", vgId, req.taskId); tqDebug("vgId:%d not leader, ignore checkpoint-source msg, s-task:0x%x", vgId, req.taskId);
SRpcMsg rsp = {0}; SRpcMsg rsp = {0};
buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0); buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
tmsgSendRsp(&rsp); // error occurs tmsgSendRsp(&rsp); // error occurs
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -1101,7 +1103,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
", transId:%d s-task:0x%x ignore it", ", transId:%d s-task:0x%x ignore it",
vgId, req.checkpointId, req.transId, req.taskId); vgId, req.checkpointId, req.transId, req.taskId);
SRpcMsg rsp = {0}; SRpcMsg rsp = {0};
buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0); buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
tmsgSendRsp(&rsp); // error occurs tmsgSendRsp(&rsp); // error occurs
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -1112,7 +1114,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
" transId:%d it may have been destroyed", " transId:%d it may have been destroyed",
vgId, req.taskId, req.checkpointId, req.transId); vgId, req.taskId, req.checkpointId, req.transId);
SRpcMsg rsp = {0}; SRpcMsg rsp = {0};
buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0); buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
tmsgSendRsp(&rsp); // error occurs tmsgSendRsp(&rsp); // error occurs
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -1128,7 +1130,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
SRpcMsg rsp = {0}; SRpcMsg rsp = {0};
buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0); buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
tmsgSendRsp(&rsp); // error occurs tmsgSendRsp(&rsp); // error occurs
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -1146,7 +1148,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
SRpcMsg rsp = {0}; SRpcMsg rsp = {0};
buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0); buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
tmsgSendRsp(&rsp); // error occurs tmsgSendRsp(&rsp); // error occurs
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -1171,11 +1173,15 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
} else { // checkpoint already finished, and not in checkpoint status } else { // checkpoint already finished, and not in checkpoint status
if (req.checkpointId <= pTask->chkInfo.checkpointId) { if (req.checkpointId <= pTask->chkInfo.checkpointId) {
tqWarn("s-task:%s repeatly recv checkpoint-source msg checkpointId:%" PRId64 tqWarn("s-task:%s repeatly recv checkpoint-source msg checkpointId:%" PRId64
" transId:%d already handled, ignore and discard", pTask->id.idStr, req.checkpointId, req.transId); " transId:%d already handled, return success", pTask->id.idStr, req.checkpointId, req.transId);
taosThreadMutexUnlock(&pTask->lock); taosThreadMutexUnlock(&pTask->lock);
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
SRpcMsg rsp = {0};
buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
tmsgSendRsp(&rsp); // error occurs
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
} }
@ -1193,10 +1199,10 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
pTask->id.idStr, vgId, pTask->info.taskLevel, req.checkpointId, req.transId, pPrevStatus); pTask->id.idStr, vgId, pTask->info.taskLevel, req.checkpointId, req.transId, pPrevStatus);
} }
code = streamAddCheckpointSourceRspMsg(&req, &pMsg->info, pTask, 1); code = streamAddCheckpointSourceRspMsg(&req, &pMsg->info, pTask);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
SRpcMsg rsp = {0}; SRpcMsg rsp = {0};
buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0); buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
tmsgSendRsp(&rsp); // error occurs tmsgSendRsp(&rsp); // error occurs
return code; return code;
} }

View File

@ -1580,12 +1580,11 @@ int32_t valueEncode(void* value, int32_t vlen, int64_t ttl, char** dest) {
key.len = compressedSize; key.len = compressedSize;
value = dst; value = dst;
} }
stDebug("vlen: raw size: %d, compressed size: %d", vlen, compressedSize);
} }
if (*dest == NULL) { if (*dest == NULL) {
char* p = taosMemoryCalloc( size_t size = sizeof(key.unixTimestamp) + sizeof(key.len) + sizeof(key.rawLen) + sizeof(key.compress) + key.len;
1, sizeof(key.unixTimestamp) + sizeof(key.len) + sizeof(key.rawLen) + sizeof(key.compress) + key.len); char* p = taosMemoryCalloc(1, size);
char* buf = p; char* buf = p;
len += taosEncodeFixedI64((void**)&buf, key.unixTimestamp); len += taosEncodeFixedI64((void**)&buf, key.unixTimestamp);
len += taosEncodeFixedI32((void**)&buf, key.len); len += taosEncodeFixedI32((void**)&buf, key.len);
@ -1601,8 +1600,8 @@ int32_t valueEncode(void* value, int32_t vlen, int64_t ttl, char** dest) {
len += taosEncodeFixedI8((void**)&buf, key.compress); len += taosEncodeFixedI8((void**)&buf, key.compress);
len += taosEncodeBinary((void**)&buf, (char*)value, key.len); len += taosEncodeBinary((void**)&buf, (char*)value, key.len);
} }
taosMemoryFree(dst);
taosMemoryFree(dst);
return len; return len;
} }

View File

@ -833,7 +833,7 @@ FAIL:
} }
int32_t buildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SRpcMsg* pMsg, int32_t buildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SRpcMsg* pMsg,
int8_t isSucceed) { int32_t setCode) {
int32_t len = 0; int32_t len = 0;
int32_t code = 0; int32_t code = 0;
SEncoder encoder; SEncoder encoder;
@ -845,7 +845,7 @@ int32_t buildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInf
.streamId = pReq->streamId, .streamId = pReq->streamId,
.expireTime = pReq->expireTime, .expireTime = pReq->expireTime,
.mnodeId = pReq->mnodeId, .mnodeId = pReq->mnodeId,
.success = isSucceed, .success = (setCode == TSDB_CODE_SUCCESS) ? 1 : 0,
}; };
tEncodeSize(tEncodeStreamCheckpointSourceRsp, &rsp, len, code); tEncodeSize(tEncodeStreamCheckpointSourceRsp, &rsp, len, code);
@ -866,22 +866,24 @@ int32_t buildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInf
tEncoderClear(&encoder); tEncoderClear(&encoder);
initRpcMsg(pMsg, 0, pBuf, sizeof(SMsgHead) + len); initRpcMsg(pMsg, 0, pBuf, sizeof(SMsgHead) + len);
pMsg->code = setCode;
pMsg->info = *pRpcInfo; pMsg->info = *pRpcInfo;
return 0; return 0;
} }
int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SStreamTask* pTask, int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SStreamTask* pTask) {
int8_t isSucceed) {
SStreamChkptReadyInfo info = {0}; SStreamChkptReadyInfo info = {0};
buildCheckpointSourceRsp(pReq, pRpcInfo, &info.msg, isSucceed); buildCheckpointSourceRsp(pReq, pRpcInfo, &info.msg, TSDB_CODE_SUCCESS);
if (pTask->pReadyMsgList == NULL) { if (pTask->pReadyMsgList == NULL) {
pTask->pReadyMsgList = taosArrayInit(4, sizeof(SStreamChkptReadyInfo)); pTask->pReadyMsgList = taosArrayInit(4, sizeof(SStreamChkptReadyInfo));
} }
taosArrayPush(pTask->pReadyMsgList, &info); taosArrayPush(pTask->pReadyMsgList, &info);
stDebug("s-task:%s add checkpoint source rsp msg, total:%d", pTask->id.idStr,
(int32_t)taosArrayGetSize(pTask->pReadyMsgList)); int32_t size = taosArrayGetSize(pTask->pReadyMsgList);
stDebug("s-task:%s add checkpoint source rsp msg, total:%d", pTask->id.idStr, size);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }