fix:[TS-5776]avoid memcpy fo DataRspObj

This commit is contained in:
wangmm0220 2025-02-12 11:09:32 +08:00
parent 020840bd12
commit 891a6057ca
2 changed files with 43 additions and 36 deletions

View File

@ -2420,6 +2420,35 @@ static int32_t processMqRspError(tmq_t* tmq, SMqRspWrapper* pRspWrapper){
return code; return code;
} }
static int32_t processWrapperData(SMqRspWrapper* pRspWrapper){
int32_t code = 0;
if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_RSP) {
PROCESS_POLL_RSP(tDecodeMqDataRsp, &pRspWrapper->pollRsp.dataRsp)
pRspWrapper->pollRsp.dataRsp.data = pRspWrapper->pollRsp.data;
pRspWrapper->pollRsp.data = NULL;
} else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
PROCESS_POLL_RSP(tDecodeMqMetaRsp, &pRspWrapper->pollRsp.metaRsp)
} else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
PROCESS_POLL_RSP(tDecodeSTaosxRsp, &pRspWrapper->pollRsp.dataRsp)
pRspWrapper->pollRsp.dataRsp.data = pRspWrapper->pollRsp.data;
pRspWrapper->pollRsp.data = NULL;
} else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_BATCH_META_RSP) {
PROCESS_POLL_RSP(tSemiDecodeMqBatchMetaRsp, &pRspWrapper->pollRsp.batchMetaRsp)
} else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RAW_DATA_RSP) {
PROCESS_POLL_RSP(tDecodeMqRawDataRsp, &pRspWrapper->pollRsp.dataRsp)
pRspWrapper->pollRsp.dataRsp.len = pRspWrapper->pollRsp.len - sizeof(SMqRspHead);
pRspWrapper->pollRsp.dataRsp.rawData = POINTER_SHIFT(pRspWrapper->pollRsp.data, sizeof(SMqRspHead));
pRspWrapper->pollRsp.data = NULL;
} else {
tqErrorC("invalid rsp msg received, type:%d ignored", pRspWrapper->tmqRspType);
code = TSDB_CODE_TSC_INTERNAL_ERROR;
goto END;
}
END:
return code;
}
static SMqRspObj* processMqRsp(tmq_t* tmq, SMqRspWrapper* pRspWrapper){ static SMqRspObj* processMqRsp(tmq_t* tmq, SMqRspWrapper* pRspWrapper){
int32_t code = 0; int32_t code = 0;
SMqRspObj* pRspObj = NULL; SMqRspObj* pRspObj = NULL;
@ -2432,6 +2461,10 @@ static SMqRspObj* processMqRsp(tmq_t* tmq, SMqRspWrapper* pRspWrapper){
return pRspObj; return pRspObj;
} }
code = processWrapperData(pRspWrapper);
if (code != 0) {
goto END;
}
SMqPollRspWrapper* pollRspWrapper = &pRspWrapper->pollRsp; SMqPollRspWrapper* pollRspWrapper = &pRspWrapper->pollRsp;
taosWLockLatch(&tmq->lock); taosWLockLatch(&tmq->lock);
SMqClientVg* pVg = NULL; SMqClientVg* pVg = NULL;
@ -2508,33 +2541,6 @@ END:
return pRspObj; return pRspObj;
} }
static int32_t processWrapperData(SMqRspWrapper* pRspWrapper){
int32_t code = 0;
if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_RSP) {
PROCESS_POLL_RSP(tDecodeMqDataRsp, &pRspWrapper->pollRsp.dataRsp)
pRspWrapper->pollRsp.dataRsp.data = pRspWrapper->pollRsp.data;
pRspWrapper->pollRsp.data = NULL;
} else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
PROCESS_POLL_RSP(tDecodeMqMetaRsp, &pRspWrapper->pollRsp.metaRsp)
} else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
PROCESS_POLL_RSP(tDecodeSTaosxRsp, &pRspWrapper->pollRsp.dataRsp)
pRspWrapper->pollRsp.dataRsp.data = pRspWrapper->pollRsp.data;
pRspWrapper->pollRsp.data = NULL;
} else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_BATCH_META_RSP) {
PROCESS_POLL_RSP(tSemiDecodeMqBatchMetaRsp, &pRspWrapper->pollRsp.batchMetaRsp)
} else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RAW_DATA_RSP) {
PROCESS_POLL_RSP(tDecodeMqRawDataRsp, &pRspWrapper->pollRsp.dataRsp)
pRspWrapper->pollRsp.dataRsp.len = pRspWrapper->pollRsp.len - sizeof(SMqRspHead);
pRspWrapper->pollRsp.dataRsp.rawData = POINTER_SHIFT(pRspWrapper->pollRsp.data, sizeof(SMqRspHead));
pRspWrapper->pollRsp.data = NULL;
} else {
tqErrorC("invalid rsp msg received, type:%d ignored", pRspWrapper->tmqRspType);
code = TSDB_CODE_TSC_INTERNAL_ERROR;
goto END;
}
END:
return code;
}
static void* tmqHandleAllRsp(tmq_t* tmq) { static void* tmqHandleAllRsp(tmq_t* tmq) {
tqDebugC("consumer:0x%" PRIx64 " start to handle the rsp, total:%d", tmq->consumerId, taosQallItemSize(tmq->qall)); tqDebugC("consumer:0x%" PRIx64 " start to handle the rsp, total:%d", tmq->consumerId, taosQallItemSize(tmq->qall));
@ -2554,14 +2560,11 @@ static void* tmqHandleAllRsp(tmq_t* tmq) {
} }
tqDebugC("consumer:0x%" PRIx64 " handle rsp, type:%s", tmq->consumerId, tmqMsgTypeStr[pRspWrapper->tmqRspType]); tqDebugC("consumer:0x%" PRIx64 " handle rsp, type:%s", tmq->consumerId, tmqMsgTypeStr[pRspWrapper->tmqRspType]);
code = processWrapperData(pRspWrapper); if (pRspWrapper->code != 0) {
if (code == 0){ code = processMqRspError(tmq, pRspWrapper);
if (pRspWrapper->code != 0) { }else{
code = processMqRspError(tmq, pRspWrapper); returnVal = processMqRsp(tmq, pRspWrapper);
}else{ code = terrno;
returnVal = processMqRsp(tmq, pRspWrapper);
code = terrno;
}
} }
tmqFreeRspWrapper(pRspWrapper); tmqFreeRspWrapper(pRspWrapper);

View File

@ -31,6 +31,7 @@ taosd = "taosd"
taosxLog = "taosx.log" taosxLog = "taosx.log"
taosxTimeout = 2 taosxTimeout = 2
timeCost = [] timeCost = []
speedupStr = []
insertJson = '''{ insertJson = '''{
"filetype": "insert", "filetype": "insert",
"cfgdir": "/etc/taos", "cfgdir": "/etc/taos",
@ -270,7 +271,10 @@ if __name__ == "__main__":
if i % 2 == 1 : if i % 2 == 1 :
print(f"opti cost:{float(timeCost[0]) - taosxTimeout}") print(f"opti cost:{float(timeCost[0]) - taosxTimeout}")
print(f"old cost:{float(timeCost[1]) - taosxTimeout}") print(f"old cost:{float(timeCost[1]) - taosxTimeout}")
print(str(paras[i]) + f" speedup:{(float(timeCost[1]) - taosxTimeout)/(float(timeCost[0]) - taosxTimeout)}\n\n\n") tmp = str(paras[i]) + f" speedup:{(float(timeCost[1]) - taosxTimeout)/(float(timeCost[0]) - taosxTimeout)}"
speedupStr.append(tmp)
print(tmp + "\n\n\n")
timeCost.clear() timeCost.clear()
print("performance result:\n" + str(speedupStr))
tdLog.info("run performance end") tdLog.info("run performance end")