diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 02fa31ef07..1b4636120b 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -923,10 +923,9 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask); void streamTaskClearCheckInfo(SStreamTask* pTask, bool clearChkpReadyMsg); int32_t streamAlignTransferState(SStreamTask* pTask); int32_t streamBuildAndSendDropTaskMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskId* pTaskId, int64_t resetRelHalt); -int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SStreamTask* pTask, - int8_t isSucceed); +int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SStreamTask* pTask); int32_t buildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SRpcMsg* pMsg, - int8_t isSucceed); + int32_t setCode); SStreamTaskSM* streamCreateStateMachine(SStreamTask* pTask); void* streamDestroyStateMachine(SStreamTaskSM* pSM); diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index 95f70c8ff3..c3b5beb3e3 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -37,7 +37,6 @@ typedef struct { int64_t applyIndex; uint64_t applyTerm; char user[TSDB_USER_LEN]; - } SRpcConnInfo; typedef struct SRpcHandleInfo { @@ -63,7 +62,6 @@ typedef struct SRpcHandleInfo { SRpcConnInfo conn; int8_t forbiddenIp; int8_t notFreeAhandle; - } SRpcHandleInfo; typedef struct SRpcMsg { diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 3a3b103e3f..ed8d06080a 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1064,6 +1064,7 @@ _OVER: return code; } +// no matter what kinds of error happened, make sure the mnode will receive the success execution code. int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) { int32_t vgId = TD_VID(pTq->pVnode); SStreamMeta* pMeta = pTq->pStreamMeta; @@ -1081,8 +1082,9 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) code = TSDB_CODE_MSG_DECODE_ERROR; tDecoderClear(&decoder); tqError("vgId:%d failed to decode checkpoint-source msg, code:%s", vgId, tstrerror(code)); + SRpcMsg rsp = {0}; - buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0); + buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); tmsgSendRsp(&rsp); // error occurs return code; } @@ -1091,7 +1093,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) if (!vnodeIsRoleLeader(pTq->pVnode)) { tqDebug("vgId:%d not leader, ignore checkpoint-source msg, s-task:0x%x", vgId, req.taskId); SRpcMsg rsp = {0}; - buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0); + buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); tmsgSendRsp(&rsp); // error occurs return TSDB_CODE_SUCCESS; } @@ -1101,7 +1103,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) ", transId:%d s-task:0x%x ignore it", vgId, req.checkpointId, req.transId, req.taskId); SRpcMsg rsp = {0}; - buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0); + buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); tmsgSendRsp(&rsp); // error occurs return TSDB_CODE_SUCCESS; } @@ -1112,7 +1114,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) " transId:%d it may have been destroyed", vgId, req.taskId, req.checkpointId, req.transId); SRpcMsg rsp = {0}; - buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0); + buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); tmsgSendRsp(&rsp); // error occurs return TSDB_CODE_SUCCESS; } @@ -1128,7 +1130,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) streamMetaReleaseTask(pMeta, pTask); SRpcMsg rsp = {0}; - buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0); + buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); tmsgSendRsp(&rsp); // error occurs return TSDB_CODE_SUCCESS; } @@ -1146,7 +1148,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) streamMetaReleaseTask(pMeta, pTask); SRpcMsg rsp = {0}; - buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0); + buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); tmsgSendRsp(&rsp); // error occurs return TSDB_CODE_SUCCESS; @@ -1171,11 +1173,15 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) } else { // checkpoint already finished, and not in checkpoint status if (req.checkpointId <= pTask->chkInfo.checkpointId) { tqWarn("s-task:%s repeatly recv checkpoint-source msg checkpointId:%" PRId64 - " transId:%d already handled, ignore and discard", pTask->id.idStr, req.checkpointId, req.transId); + " transId:%d already handled, return success", pTask->id.idStr, req.checkpointId, req.transId); taosThreadMutexUnlock(&pTask->lock); streamMetaReleaseTask(pMeta, pTask); + SRpcMsg rsp = {0}; + buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); + tmsgSendRsp(&rsp); // error occurs + return TSDB_CODE_SUCCESS; } } @@ -1193,10 +1199,10 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) pTask->id.idStr, vgId, pTask->info.taskLevel, req.checkpointId, req.transId, pPrevStatus); } - code = streamAddCheckpointSourceRspMsg(&req, &pMsg->info, pTask, 1); + code = streamAddCheckpointSourceRspMsg(&req, &pMsg->info, pTask); if (code != TSDB_CODE_SUCCESS) { SRpcMsg rsp = {0}; - buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0); + buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); tmsgSendRsp(&rsp); // error occurs return code; } diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 08118f5231..d97214b700 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -1580,12 +1580,11 @@ int32_t valueEncode(void* value, int32_t vlen, int64_t ttl, char** dest) { key.len = compressedSize; value = dst; } - stDebug("vlen: raw size: %d, compressed size: %d", vlen, compressedSize); } if (*dest == NULL) { - char* p = taosMemoryCalloc( - 1, sizeof(key.unixTimestamp) + sizeof(key.len) + sizeof(key.rawLen) + sizeof(key.compress) + key.len); + size_t size = sizeof(key.unixTimestamp) + sizeof(key.len) + sizeof(key.rawLen) + sizeof(key.compress) + key.len; + char* p = taosMemoryCalloc(1, size); char* buf = p; len += taosEncodeFixedI64((void**)&buf, key.unixTimestamp); len += taosEncodeFixedI32((void**)&buf, key.len); @@ -1601,8 +1600,8 @@ int32_t valueEncode(void* value, int32_t vlen, int64_t ttl, char** dest) { len += taosEncodeFixedI8((void**)&buf, key.compress); len += taosEncodeBinary((void**)&buf, (char*)value, key.len); } - taosMemoryFree(dst); + taosMemoryFree(dst); return len; } diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index d56b347f4c..d0fc923f83 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -833,7 +833,7 @@ FAIL: } int32_t buildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SRpcMsg* pMsg, - int8_t isSucceed) { + int32_t setCode) { int32_t len = 0; int32_t code = 0; SEncoder encoder; @@ -845,7 +845,7 @@ int32_t buildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInf .streamId = pReq->streamId, .expireTime = pReq->expireTime, .mnodeId = pReq->mnodeId, - .success = isSucceed, + .success = (setCode == TSDB_CODE_SUCCESS) ? 1 : 0, }; tEncodeSize(tEncodeStreamCheckpointSourceRsp, &rsp, len, code); @@ -866,22 +866,24 @@ int32_t buildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInf tEncoderClear(&encoder); initRpcMsg(pMsg, 0, pBuf, sizeof(SMsgHead) + len); + + pMsg->code = setCode; pMsg->info = *pRpcInfo; return 0; } -int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SStreamTask* pTask, - int8_t isSucceed) { +int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SStreamTask* pTask) { SStreamChkptReadyInfo info = {0}; - buildCheckpointSourceRsp(pReq, pRpcInfo, &info.msg, isSucceed); + buildCheckpointSourceRsp(pReq, pRpcInfo, &info.msg, TSDB_CODE_SUCCESS); if (pTask->pReadyMsgList == NULL) { pTask->pReadyMsgList = taosArrayInit(4, sizeof(SStreamChkptReadyInfo)); } taosArrayPush(pTask->pReadyMsgList, &info); - stDebug("s-task:%s add checkpoint source rsp msg, total:%d", pTask->id.idStr, - (int32_t)taosArrayGetSize(pTask->pReadyMsgList)); + + int32_t size = taosArrayGetSize(pTask->pReadyMsgList); + stDebug("s-task:%s add checkpoint source rsp msg, total:%d", pTask->id.idStr, size); return TSDB_CODE_SUCCESS; }