From 891a6057caed721b878e1e7cf1b576ff7f01c32f Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 12 Feb 2025 11:09:32 +0800 Subject: [PATCH] fix:[TS-5776]avoid memcpy fo DataRspObj --- source/client/src/clientTmq.c | 73 ++++++++++---------- tests/system-test/7-tmq/taosx-performance.py | 6 +- 2 files changed, 43 insertions(+), 36 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index d2b6aff2bf..4b2360ea27 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -2420,6 +2420,35 @@ static int32_t processMqRspError(tmq_t* tmq, SMqRspWrapper* pRspWrapper){ return code; } + +static int32_t processWrapperData(SMqRspWrapper* pRspWrapper){ + int32_t code = 0; + if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_RSP) { + PROCESS_POLL_RSP(tDecodeMqDataRsp, &pRspWrapper->pollRsp.dataRsp) + pRspWrapper->pollRsp.dataRsp.data = pRspWrapper->pollRsp.data; + pRspWrapper->pollRsp.data = NULL; + } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) { + PROCESS_POLL_RSP(tDecodeMqMetaRsp, &pRspWrapper->pollRsp.metaRsp) + } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_META_RSP) { + PROCESS_POLL_RSP(tDecodeSTaosxRsp, &pRspWrapper->pollRsp.dataRsp) + pRspWrapper->pollRsp.dataRsp.data = pRspWrapper->pollRsp.data; + pRspWrapper->pollRsp.data = NULL; + } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_BATCH_META_RSP) { + PROCESS_POLL_RSP(tSemiDecodeMqBatchMetaRsp, &pRspWrapper->pollRsp.batchMetaRsp) + } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RAW_DATA_RSP) { + PROCESS_POLL_RSP(tDecodeMqRawDataRsp, &pRspWrapper->pollRsp.dataRsp) + pRspWrapper->pollRsp.dataRsp.len = pRspWrapper->pollRsp.len - sizeof(SMqRspHead); + pRspWrapper->pollRsp.dataRsp.rawData = POINTER_SHIFT(pRspWrapper->pollRsp.data, sizeof(SMqRspHead)); + pRspWrapper->pollRsp.data = NULL; + } else { + tqErrorC("invalid rsp msg received, type:%d ignored", pRspWrapper->tmqRspType); + code = TSDB_CODE_TSC_INTERNAL_ERROR; + goto END; + } + END: + return code; +} + static SMqRspObj* processMqRsp(tmq_t* tmq, SMqRspWrapper* pRspWrapper){ int32_t code = 0; SMqRspObj* pRspObj = NULL; @@ -2432,6 +2461,10 @@ static SMqRspObj* processMqRsp(tmq_t* tmq, SMqRspWrapper* pRspWrapper){ return pRspObj; } + code = processWrapperData(pRspWrapper); + if (code != 0) { + goto END; + } SMqPollRspWrapper* pollRspWrapper = &pRspWrapper->pollRsp; taosWLockLatch(&tmq->lock); SMqClientVg* pVg = NULL; @@ -2508,33 +2541,6 @@ END: return pRspObj; } -static int32_t processWrapperData(SMqRspWrapper* pRspWrapper){ - int32_t code = 0; - if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_RSP) { - PROCESS_POLL_RSP(tDecodeMqDataRsp, &pRspWrapper->pollRsp.dataRsp) - pRspWrapper->pollRsp.dataRsp.data = pRspWrapper->pollRsp.data; - pRspWrapper->pollRsp.data = NULL; - } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) { - PROCESS_POLL_RSP(tDecodeMqMetaRsp, &pRspWrapper->pollRsp.metaRsp) - } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_META_RSP) { - PROCESS_POLL_RSP(tDecodeSTaosxRsp, &pRspWrapper->pollRsp.dataRsp) - pRspWrapper->pollRsp.dataRsp.data = pRspWrapper->pollRsp.data; - pRspWrapper->pollRsp.data = NULL; - } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_BATCH_META_RSP) { - PROCESS_POLL_RSP(tSemiDecodeMqBatchMetaRsp, &pRspWrapper->pollRsp.batchMetaRsp) - } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RAW_DATA_RSP) { - PROCESS_POLL_RSP(tDecodeMqRawDataRsp, &pRspWrapper->pollRsp.dataRsp) - pRspWrapper->pollRsp.dataRsp.len = pRspWrapper->pollRsp.len - sizeof(SMqRspHead); - pRspWrapper->pollRsp.dataRsp.rawData = POINTER_SHIFT(pRspWrapper->pollRsp.data, sizeof(SMqRspHead)); - pRspWrapper->pollRsp.data = NULL; - } else { - tqErrorC("invalid rsp msg received, type:%d ignored", pRspWrapper->tmqRspType); - code = TSDB_CODE_TSC_INTERNAL_ERROR; - goto END; - } -END: - return code; -} static void* tmqHandleAllRsp(tmq_t* tmq) { tqDebugC("consumer:0x%" PRIx64 " start to handle the rsp, total:%d", tmq->consumerId, taosQallItemSize(tmq->qall)); @@ -2554,14 +2560,11 @@ static void* tmqHandleAllRsp(tmq_t* tmq) { } tqDebugC("consumer:0x%" PRIx64 " handle rsp, type:%s", tmq->consumerId, tmqMsgTypeStr[pRspWrapper->tmqRspType]); - code = processWrapperData(pRspWrapper); - if (code == 0){ - if (pRspWrapper->code != 0) { - code = processMqRspError(tmq, pRspWrapper); - }else{ - returnVal = processMqRsp(tmq, pRspWrapper); - code = terrno; - } + if (pRspWrapper->code != 0) { + code = processMqRspError(tmq, pRspWrapper); + }else{ + returnVal = processMqRsp(tmq, pRspWrapper); + code = terrno; } tmqFreeRspWrapper(pRspWrapper); diff --git a/tests/system-test/7-tmq/taosx-performance.py b/tests/system-test/7-tmq/taosx-performance.py index 4471415e28..997c6ec096 100755 --- a/tests/system-test/7-tmq/taosx-performance.py +++ b/tests/system-test/7-tmq/taosx-performance.py @@ -31,6 +31,7 @@ taosd = "taosd" taosxLog = "taosx.log" taosxTimeout = 2 timeCost = [] +speedupStr = [] insertJson = '''{ "filetype": "insert", "cfgdir": "/etc/taos", @@ -270,7 +271,10 @@ if __name__ == "__main__": if i % 2 == 1 : print(f"opti cost:{float(timeCost[0]) - taosxTimeout}") print(f"old cost:{float(timeCost[1]) - taosxTimeout}") - print(str(paras[i]) + f" speedup:{(float(timeCost[1]) - taosxTimeout)/(float(timeCost[0]) - taosxTimeout)}\n\n\n") + tmp = str(paras[i]) + f" speedup:{(float(timeCost[1]) - taosxTimeout)/(float(timeCost[0]) - taosxTimeout)}" + speedupStr.append(tmp) + print(tmp + "\n\n\n") timeCost.clear() + print("performance result:\n" + str(speedupStr)) tdLog.info("run performance end")