diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index 6225cf703c..6eb7abe0eb 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -939,7 +939,7 @@ static int32_t smlSendMetaMsg(SSmlHandle *info, SName *pName, SArray *pColumns, pReq.pColumns = taosArrayInit(pReq.numOfColumns, sizeof(SFieldWithOptions)); for (int32_t i = 0; i < pReq.numOfColumns; ++i) { SField *pField = taosArrayGet(pColumns, i); - SFieldWithOptions fieldWithOption; + SFieldWithOptions fieldWithOption = {0}; setFieldWithOptions(&fieldWithOption, pField); setDefaultOptionsForField(&fieldWithOption); taosArrayPush(pReq.pColumns, &fieldWithOption); diff --git a/source/client/src/clientSmlTelnet.c b/source/client/src/clientSmlTelnet.c index f715f32556..bc0e560178 100644 --- a/source/client/src/clientSmlTelnet.c +++ b/source/client/src/clientSmlTelnet.c @@ -233,7 +233,7 @@ int32_t smlParseTelnetString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLine SSmlKv kvTs = {0}; smlBuildTsKv(&kvTs, ts); - if (needConverTime) { + if (needConverTime && info->currSTableMeta != NULL) { kvTs.i = convertTimePrecision(kvTs.i, TSDB_TIME_PRECISION_NANO, info->currSTableMeta->tableInfo.precision); } diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index fde1b12be0..8a44227852 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -872,7 +872,10 @@ void tmqSendHbReq(void* param, void* tmrId) { SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp); int64_t transporterId = 0; - asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); + int32_t code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); + if (code != 0) { + tscError("tmqSendHbReq asyncSendMsgToServer failed"); + } OVER: tDestroySMqHbReq(&req); @@ -1240,12 +1243,15 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); if (sendInfo == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; + taosMemoryFree(buf); goto FAIL; } SMqSubscribeCbParam param = {.rspErr = 0}; if (tsem_init(¶m.rspSem, 0, 0) != 0) { code = TSDB_CODE_TSC_INTERNAL_ERROR; + taosMemoryFree(buf); + taosMemoryFree(sendInfo); goto FAIL; } @@ -1265,10 +1271,6 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { goto FAIL; } - // avoid double free if msg is sent - buf = NULL; - sendInfo = NULL; - tsem_wait(¶m.rspSem); tsem_destroy(¶m.rspSem); @@ -1307,8 +1309,6 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { FAIL: taosArrayDestroyP(req.topicNames, taosMemoryFree); - taosMemoryFree(buf); - taosMemoryFree(sendInfo); return code; } @@ -1381,7 +1381,8 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { if(pMsg->pData == NULL){ tscError("consumer:0x%" PRIx64 " msg discard from vgId:%d, since msg is NULL", tmq->consumerId, vgId); - goto FAIL; + code = TSDB_CODE_TSC_INTERNAL_ERROR; + goto END; } int32_t msgEpoch = ((SMqRspHead*)pMsg->pData)->epoch; @@ -1410,7 +1411,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { tDecoderClear(&decoder); taosReleaseRef(tmqMgmt.rsetId, refId); code = TSDB_CODE_OUT_OF_MEMORY; - goto FAIL; + goto END; } tDecoderClear(&decoder); memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead)); @@ -1426,7 +1427,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { tDecoderClear(&decoder); taosReleaseRef(tmqMgmt.rsetId, refId); code = TSDB_CODE_OUT_OF_MEMORY; - goto FAIL; + goto END; } tDecoderClear(&decoder); memcpy(&pRspWrapper->metaRsp, pMsg->pData, sizeof(SMqRspHead)); @@ -1437,7 +1438,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { tDecoderClear(&decoder); taosReleaseRef(tmqMgmt.rsetId, refId); code = TSDB_CODE_OUT_OF_MEMORY; - goto FAIL; + goto END; } tDecoderClear(&decoder); memcpy(&pRspWrapper->taosxRsp, pMsg->pData, sizeof(SMqRspHead)); @@ -1795,6 +1796,7 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p pParam = taosMemoryMalloc(sizeof(SMqPollCbParam)); if (pParam == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; + taosMemoryFreeClear(msg); goto FAIL; } @@ -1806,6 +1808,7 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); if (sendInfo == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; + taosMemoryFreeClear(msg); goto FAIL; } @@ -1832,7 +1835,6 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p return 0; FAIL: - taosMemoryFreeClear(msg); return tmqPollCb(pParam, NULL, code); } @@ -1938,8 +1940,6 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) { terrno = pRspWrapper->code; tscError("consumer:0x%" PRIx64 " unexpected rsp from poll, code:%s", tmq->consumerId, tstrerror(pRspWrapper->code)); - taosFreeQitem(pRspWrapper); - return NULL; } else { if (pRspWrapper->code == TSDB_CODE_VND_INVALID_VGROUP_ID) { // for vnode transform askEp(tmq, NULL, false, true); @@ -2653,6 +2653,7 @@ void askEp(tmq_t* pTmq, void* param, bool sync, bool updateEpSet) { if (tSerializeSMqAskEpReq(pReq, tlen, &req) < 0) { tscError("consumer:0x%" PRIx64 ", tSerializeSMqAskEpReq %d failed", pTmq->consumerId, tlen); code = TSDB_CODE_INVALID_PARA; + taosMemoryFree(pReq); goto FAIL; } @@ -2660,6 +2661,7 @@ void askEp(tmq_t* pTmq, void* param, bool sync, bool updateEpSet) { if (pParam == NULL) { tscError("consumer:0x%" PRIx64 ", failed to malloc subscribe param", pTmq->consumerId); code = TSDB_CODE_OUT_OF_MEMORY; + taosMemoryFree(pReq); goto FAIL; } @@ -2670,6 +2672,7 @@ void askEp(tmq_t* pTmq, void* param, bool sync, bool updateEpSet) { SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); if (sendInfo == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; + taosMemoryFree(pReq); goto FAIL; } @@ -2691,8 +2694,6 @@ void askEp(tmq_t* pTmq, void* param, bool sync, bool updateEpSet) { } FAIL: - taosMemoryFreeClear(pParam); - taosMemoryFreeClear(pReq); askEpCb(pParam, NULL, code); } @@ -2799,7 +2800,6 @@ static int32_t tmqGetWalInfoCb(void* param, SDataBuf* pMsg, int32_t code) { taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pEpSet); - taosMemoryFree(pParam); return 0; } @@ -2903,8 +2903,6 @@ int64_t getCommittedFromServer(tmq_t* tmq, char* tname, int32_t vgId, SEpSet* ep int64_t transporterId = 0; code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, &transporterId, sendInfo); if (code != 0) { - taosMemoryFree(buf); - taosMemoryFree(sendInfo); tsem_destroy(&pParam->sem); taosMemoryFree(pParam); return code; @@ -3161,6 +3159,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a sendInfo->requestId = req.reqId; sendInfo->requestObjRefId = 0; sendInfo->param = pParam; + sendInfo->paramFreeFp = taosMemoryFree; sendInfo->fp = tmqGetWalInfoCb; sendInfo->msgType = TDMT_VND_TMQ_VG_WALINFO; @@ -3172,8 +3171,6 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a tmq->consumerId, pTopic->topicName, pClientVg->vgId, tmq->epoch, offsetFormatBuf, req.reqId); code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pClientVg->epSet, &transporterId, sendInfo); if (code != 0) { - taosMemoryFree(pParam); - taosMemoryFree(msg); goto end; } } @@ -3329,8 +3326,6 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_ int64_t transporterId = 0; code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); if (code != 0) { - taosMemoryFree(msg); - taosMemoryFree(sendInfo); tsem_destroy(&pParam->sem); taosMemoryFree(pParam); return code;