diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index 4e34a47902..bf88248259 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -144,7 +144,6 @@ int32_t schProcessExplainRsp(SSchJob *pJob, SSchTask *pTask, SExplainRsp *rsp) { int32_t schProcessResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDataBuf *pMsg, int32_t rspCode) { int32_t code = 0; - char *msg = pMsg->pData; int32_t msgSize = pMsg->len; int32_t msgType = pMsg->msgType; @@ -158,9 +157,9 @@ int32_t schProcessResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SD } case TDMT_VND_CREATE_TABLE_RSP: { SVCreateTbBatchRsp batchRsp = {0}; - if (msg) { + if (pMsg->pData) { SDecoder coder = {0}; - tDecoderInit(&coder, msg, msgSize); + tDecoderInit(&coder, pMsg->pData, msgSize); code = tDecodeSVCreateTbBatchRsp(&coder, &batchRsp); if (TSDB_CODE_SUCCESS == code && batchRsp.nRsps > 0) { SCH_LOCK(SCH_WRITE, &pJob->resLock); @@ -206,16 +205,16 @@ int32_t schProcessResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SD } SCH_ERR_JRET(rspCode); - taosMemoryFreeClear(msg); + taosMemoryFreeClear(pMsg->pData); SCH_ERR_JRET(schProcessOnTaskSuccess(pJob, pTask)); break; } case TDMT_VND_DROP_TABLE_RSP: { SVDropTbBatchRsp batchRsp = {0}; - if (msg) { + if (pMsg->pData) { SDecoder coder = {0}; - tDecoderInit(&coder, msg, msgSize); + tDecoderInit(&coder, pMsg->pData, msgSize); code = tDecodeSVDropTbBatchRsp(&coder, &batchRsp); if (TSDB_CODE_SUCCESS == code && batchRsp.nRsps > 0) { for (int32_t i = 0; i < batchRsp.nRsps; ++i) { @@ -232,16 +231,16 @@ int32_t schProcessResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SD } SCH_ERR_JRET(rspCode); - taosMemoryFreeClear(msg); + taosMemoryFreeClear(pMsg->pData); SCH_ERR_JRET(schProcessOnTaskSuccess(pJob, pTask)); break; } case TDMT_VND_ALTER_TABLE_RSP: { SVAlterTbRsp rsp = {0}; - if (msg) { + if (pMsg->pData) { SDecoder coder = {0}; - tDecoderInit(&coder, msg, msgSize); + tDecoderInit(&coder, pMsg->pData, msgSize); code = tDecodeSVAlterTbRsp(&coder, &rsp); tDecoderClear(&coder); SCH_ERR_JRET(code); @@ -253,11 +252,11 @@ int32_t schProcessResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SD SCH_ERR_JRET(rspCode); - if (NULL == msg) { + if (NULL == pMsg->pData) { SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); } - taosMemoryFreeClear(msg); + taosMemoryFreeClear(pMsg->pData); SCH_ERR_JRET(schProcessOnTaskSuccess(pJob, pTask)); break; @@ -265,10 +264,10 @@ int32_t schProcessResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SD case TDMT_VND_SUBMIT_RSP: { SCH_ERR_JRET(rspCode); - if (msg) { + if (pMsg->pData) { SDecoder coder = {0}; SSubmitRsp2 *rsp = taosMemoryMalloc(sizeof(*rsp)); - tDecoderInit(&coder, msg, msgSize); + tDecoderInit(&coder, pMsg->pData, msgSize); code = tDecodeSSubmitRsp2(&coder, rsp); tDecoderClear(&coder); if (code) { @@ -319,7 +318,7 @@ int32_t schProcessResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SD } } - taosMemoryFreeClear(msg); + taosMemoryFreeClear(pMsg->pData); SCH_ERR_JRET(schProcessOnTaskSuccess(pJob, pTask)); @@ -328,10 +327,10 @@ int32_t schProcessResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SD case TDMT_VND_DELETE_RSP: { SCH_ERR_JRET(rspCode); - if (msg) { + if (pMsg->pData) { SDecoder coder = {0}; SVDeleteRsp rsp = {0}; - tDecoderInit(&coder, msg, msgSize); + tDecoderInit(&coder, pMsg->pData, msgSize); if (tDecodeSVDeleteRsp(&coder, &rsp) < 0) { code = terrno; tDecoderClear(&coder); @@ -343,7 +342,7 @@ int32_t schProcessResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SD SCH_TASK_DLOG("delete succeed, affectedRows:%" PRId64, rsp.affectedRows); } - taosMemoryFreeClear(msg); + taosMemoryFreeClear(pMsg->pData); SCH_ERR_JRET(schProcessOnTaskSuccess(pJob, pTask)); @@ -352,7 +351,7 @@ int32_t schProcessResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SD case TDMT_SCH_QUERY_RSP: case TDMT_SCH_MERGE_QUERY_RSP: { SCH_ERR_JRET(rspCode); - if (NULL == msg) { + if (NULL == pMsg->pData) { SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); } @@ -365,7 +364,7 @@ int32_t schProcessResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SD } SQueryTableRsp rsp = {0}; - if (tDeserializeSQueryTableRsp(msg, msgSize, &rsp) < 0) { + if (tDeserializeSQueryTableRsp(pMsg->pData, msgSize, &rsp) < 0) { SCH_TASK_ELOG("tDeserializeSQueryTableRsp failed, msgSize:%d", msgSize); SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_MSG); } @@ -376,7 +375,7 @@ int32_t schProcessResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SD (void)atomic_add_fetch_64(&pJob->resNumOfRows, rsp.affectedRows); - taosMemoryFreeClear(msg); + taosMemoryFreeClear(pMsg->pData); SCH_ERR_JRET(schProcessOnTaskSuccess(pJob, pTask)); @@ -384,7 +383,7 @@ int32_t schProcessResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SD } case TDMT_SCH_EXPLAIN_RSP: { SCH_ERR_JRET(rspCode); - if (NULL == msg) { + if (NULL == pMsg->pData) { SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); } @@ -399,20 +398,20 @@ int32_t schProcessResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SD } SExplainRsp rsp = {0}; - if (tDeserializeSExplainRsp(msg, msgSize, &rsp)) { + if (tDeserializeSExplainRsp(pMsg->pData, msgSize, &rsp)) { tFreeSExplainRsp(&rsp); SCH_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); } SCH_ERR_JRET(schProcessExplainRsp(pJob, pTask, &rsp)); - taosMemoryFreeClear(msg); + taosMemoryFreeClear(pMsg->pData); break; } case TDMT_SCH_FETCH_RSP: case TDMT_SCH_MERGE_FETCH_RSP: { - code = schProcessFetchRsp(pJob, pTask, msg, rspCode); - msg = NULL; + code = schProcessFetchRsp(pJob, pTask, pMsg->pData, rspCode); + pMsg->pData = NULL; SCH_ERR_JRET(code); break; } @@ -435,7 +434,7 @@ int32_t schProcessResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SD _return: - taosMemoryFreeClear(msg); + taosMemoryFreeClear(pMsg->pData); SCH_RET(schProcessOnTaskFailure(pJob, pTask, code)); } @@ -445,7 +444,6 @@ _return: int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDataBuf *pMsg, int32_t rspCode) { int32_t code = 0; int32_t msgType = pMsg->msgType; - char *msg = pMsg->pData; bool dropExecNode = (msgType == TDMT_SCH_LINK_BROKEN || SCH_NETWORK_ERR(rspCode)); if (SCH_IS_QUERY_JOB(pJob)) { @@ -467,7 +465,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa _return: - taosMemoryFreeClear(msg); + taosMemoryFreeClear(pMsg->pData); SCH_RET(schProcessOnTaskFailure(pJob, pTask, code)); }