Merge pull request #27723 from taosdata/fix/invalidPtrSetNULL
Fix/invalid ptr set null
This commit is contained in:
commit
e64d8204f2
|
@ -144,7 +144,6 @@ int32_t schProcessExplainRsp(SSchJob *pJob, SSchTask *pTask, SExplainRsp *rsp) {
|
|||
|
||||
int32_t schProcessResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDataBuf *pMsg, int32_t rspCode) {
|
||||
int32_t code = 0;
|
||||
char *msg = pMsg->pData;
|
||||
int32_t msgSize = pMsg->len;
|
||||
int32_t msgType = pMsg->msgType;
|
||||
|
||||
|
@ -158,9 +157,9 @@ int32_t schProcessResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SD
|
|||
}
|
||||
case TDMT_VND_CREATE_TABLE_RSP: {
|
||||
SVCreateTbBatchRsp batchRsp = {0};
|
||||
if (msg) {
|
||||
if (pMsg->pData) {
|
||||
SDecoder coder = {0};
|
||||
tDecoderInit(&coder, msg, msgSize);
|
||||
tDecoderInit(&coder, pMsg->pData, msgSize);
|
||||
code = tDecodeSVCreateTbBatchRsp(&coder, &batchRsp);
|
||||
if (TSDB_CODE_SUCCESS == code && batchRsp.nRsps > 0) {
|
||||
SCH_LOCK(SCH_WRITE, &pJob->resLock);
|
||||
|
@ -206,16 +205,16 @@ int32_t schProcessResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SD
|
|||
}
|
||||
|
||||
SCH_ERR_JRET(rspCode);
|
||||
taosMemoryFreeClear(msg);
|
||||
taosMemoryFreeClear(pMsg->pData);
|
||||
|
||||
SCH_ERR_JRET(schProcessOnTaskSuccess(pJob, pTask));
|
||||
break;
|
||||
}
|
||||
case TDMT_VND_DROP_TABLE_RSP: {
|
||||
SVDropTbBatchRsp batchRsp = {0};
|
||||
if (msg) {
|
||||
if (pMsg->pData) {
|
||||
SDecoder coder = {0};
|
||||
tDecoderInit(&coder, msg, msgSize);
|
||||
tDecoderInit(&coder, pMsg->pData, msgSize);
|
||||
code = tDecodeSVDropTbBatchRsp(&coder, &batchRsp);
|
||||
if (TSDB_CODE_SUCCESS == code && batchRsp.nRsps > 0) {
|
||||
for (int32_t i = 0; i < batchRsp.nRsps; ++i) {
|
||||
|
@ -232,16 +231,16 @@ int32_t schProcessResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SD
|
|||
}
|
||||
|
||||
SCH_ERR_JRET(rspCode);
|
||||
taosMemoryFreeClear(msg);
|
||||
taosMemoryFreeClear(pMsg->pData);
|
||||
|
||||
SCH_ERR_JRET(schProcessOnTaskSuccess(pJob, pTask));
|
||||
break;
|
||||
}
|
||||
case TDMT_VND_ALTER_TABLE_RSP: {
|
||||
SVAlterTbRsp rsp = {0};
|
||||
if (msg) {
|
||||
if (pMsg->pData) {
|
||||
SDecoder coder = {0};
|
||||
tDecoderInit(&coder, msg, msgSize);
|
||||
tDecoderInit(&coder, pMsg->pData, msgSize);
|
||||
code = tDecodeSVAlterTbRsp(&coder, &rsp);
|
||||
tDecoderClear(&coder);
|
||||
SCH_ERR_JRET(code);
|
||||
|
@ -253,11 +252,11 @@ int32_t schProcessResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SD
|
|||
|
||||
SCH_ERR_JRET(rspCode);
|
||||
|
||||
if (NULL == msg) {
|
||||
if (NULL == pMsg->pData) {
|
||||
SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
||||
taosMemoryFreeClear(msg);
|
||||
taosMemoryFreeClear(pMsg->pData);
|
||||
|
||||
SCH_ERR_JRET(schProcessOnTaskSuccess(pJob, pTask));
|
||||
break;
|
||||
|
@ -265,10 +264,10 @@ int32_t schProcessResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SD
|
|||
case TDMT_VND_SUBMIT_RSP: {
|
||||
SCH_ERR_JRET(rspCode);
|
||||
|
||||
if (msg) {
|
||||
if (pMsg->pData) {
|
||||
SDecoder coder = {0};
|
||||
SSubmitRsp2 *rsp = taosMemoryMalloc(sizeof(*rsp));
|
||||
tDecoderInit(&coder, msg, msgSize);
|
||||
tDecoderInit(&coder, pMsg->pData, msgSize);
|
||||
code = tDecodeSSubmitRsp2(&coder, rsp);
|
||||
tDecoderClear(&coder);
|
||||
if (code) {
|
||||
|
@ -319,7 +318,7 @@ int32_t schProcessResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SD
|
|||
}
|
||||
}
|
||||
|
||||
taosMemoryFreeClear(msg);
|
||||
taosMemoryFreeClear(pMsg->pData);
|
||||
|
||||
SCH_ERR_JRET(schProcessOnTaskSuccess(pJob, pTask));
|
||||
|
||||
|
@ -328,10 +327,10 @@ int32_t schProcessResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SD
|
|||
case TDMT_VND_DELETE_RSP: {
|
||||
SCH_ERR_JRET(rspCode);
|
||||
|
||||
if (msg) {
|
||||
if (pMsg->pData) {
|
||||
SDecoder coder = {0};
|
||||
SVDeleteRsp rsp = {0};
|
||||
tDecoderInit(&coder, msg, msgSize);
|
||||
tDecoderInit(&coder, pMsg->pData, msgSize);
|
||||
if (tDecodeSVDeleteRsp(&coder, &rsp) < 0) {
|
||||
code = terrno;
|
||||
tDecoderClear(&coder);
|
||||
|
@ -343,7 +342,7 @@ int32_t schProcessResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SD
|
|||
SCH_TASK_DLOG("delete succeed, affectedRows:%" PRId64, rsp.affectedRows);
|
||||
}
|
||||
|
||||
taosMemoryFreeClear(msg);
|
||||
taosMemoryFreeClear(pMsg->pData);
|
||||
|
||||
SCH_ERR_JRET(schProcessOnTaskSuccess(pJob, pTask));
|
||||
|
||||
|
@ -352,7 +351,7 @@ int32_t schProcessResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SD
|
|||
case TDMT_SCH_QUERY_RSP:
|
||||
case TDMT_SCH_MERGE_QUERY_RSP: {
|
||||
SCH_ERR_JRET(rspCode);
|
||||
if (NULL == msg) {
|
||||
if (NULL == pMsg->pData) {
|
||||
SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
||||
|
@ -365,7 +364,7 @@ int32_t schProcessResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SD
|
|||
}
|
||||
|
||||
SQueryTableRsp rsp = {0};
|
||||
if (tDeserializeSQueryTableRsp(msg, msgSize, &rsp) < 0) {
|
||||
if (tDeserializeSQueryTableRsp(pMsg->pData, msgSize, &rsp) < 0) {
|
||||
SCH_TASK_ELOG("tDeserializeSQueryTableRsp failed, msgSize:%d", msgSize);
|
||||
SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_MSG);
|
||||
}
|
||||
|
@ -376,7 +375,7 @@ int32_t schProcessResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SD
|
|||
|
||||
(void)atomic_add_fetch_64(&pJob->resNumOfRows, rsp.affectedRows);
|
||||
|
||||
taosMemoryFreeClear(msg);
|
||||
taosMemoryFreeClear(pMsg->pData);
|
||||
|
||||
SCH_ERR_JRET(schProcessOnTaskSuccess(pJob, pTask));
|
||||
|
||||
|
@ -384,7 +383,7 @@ int32_t schProcessResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SD
|
|||
}
|
||||
case TDMT_SCH_EXPLAIN_RSP: {
|
||||
SCH_ERR_JRET(rspCode);
|
||||
if (NULL == msg) {
|
||||
if (NULL == pMsg->pData) {
|
||||
SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
||||
|
@ -399,20 +398,20 @@ int32_t schProcessResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SD
|
|||
}
|
||||
|
||||
SExplainRsp rsp = {0};
|
||||
if (tDeserializeSExplainRsp(msg, msgSize, &rsp)) {
|
||||
if (tDeserializeSExplainRsp(pMsg->pData, msgSize, &rsp)) {
|
||||
tFreeSExplainRsp(&rsp);
|
||||
SCH_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
SCH_ERR_JRET(schProcessExplainRsp(pJob, pTask, &rsp));
|
||||
|
||||
taosMemoryFreeClear(msg);
|
||||
taosMemoryFreeClear(pMsg->pData);
|
||||
break;
|
||||
}
|
||||
case TDMT_SCH_FETCH_RSP:
|
||||
case TDMT_SCH_MERGE_FETCH_RSP: {
|
||||
code = schProcessFetchRsp(pJob, pTask, msg, rspCode);
|
||||
msg = NULL;
|
||||
code = schProcessFetchRsp(pJob, pTask, pMsg->pData, rspCode);
|
||||
pMsg->pData = NULL;
|
||||
SCH_ERR_JRET(code);
|
||||
break;
|
||||
}
|
||||
|
@ -435,7 +434,7 @@ int32_t schProcessResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SD
|
|||
|
||||
_return:
|
||||
|
||||
taosMemoryFreeClear(msg);
|
||||
taosMemoryFreeClear(pMsg->pData);
|
||||
|
||||
SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
|
||||
}
|
||||
|
@ -445,7 +444,6 @@ _return:
|
|||
int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDataBuf *pMsg, int32_t rspCode) {
|
||||
int32_t code = 0;
|
||||
int32_t msgType = pMsg->msgType;
|
||||
char *msg = pMsg->pData;
|
||||
|
||||
bool dropExecNode = (msgType == TDMT_SCH_LINK_BROKEN || SCH_NETWORK_ERR(rspCode));
|
||||
if (SCH_IS_QUERY_JOB(pJob)) {
|
||||
|
@ -467,7 +465,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa
|
|||
|
||||
_return:
|
||||
|
||||
taosMemoryFreeClear(msg);
|
||||
taosMemoryFreeClear(pMsg->pData);
|
||||
|
||||
SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue