diff --git a/examples/c/tmq.c b/examples/c/tmq.c index 71e571d4dd..eb41ad039a 100644 --- a/examples/c/tmq.c +++ b/examples/c/tmq.c @@ -20,7 +20,7 @@ #include #include "taos.h" -static int running = 1; +static int running = 1; static int32_t msg_process(TAOS_RES* msg) { char buf[1024]; @@ -40,8 +40,8 @@ static int32_t msg_process(TAOS_RES* msg) { TAOS_FIELD* fields = taos_fetch_fields(msg); int32_t numOfFields = taos_field_count(msg); - //int32_t* length = taos_fetch_lengths(msg); - int32_t precision = taos_result_precision(msg); + // int32_t* length = taos_fetch_lengths(msg); + int32_t precision = taos_result_precision(msg); rows++; taos_print_row(buf, row, fields, numOfFields); printf("precision: %d, row content: %s\n", precision, buf); @@ -62,14 +62,12 @@ static int32_t init_env() { pRes = taos_query(pConn, "drop topic topicname"); if (taos_errno(pRes) != 0) { printf("error in drop tmqdb, reason:%s\n", taos_errstr(pRes)); - return -1; } taos_free_result(pRes); pRes = taos_query(pConn, "drop database if exists tmqdb"); if (taos_errno(pRes) != 0) { printf("error in drop tmqdb, reason:%s\n", taos_errstr(pRes)); - return -1; } taos_free_result(pRes); @@ -77,7 +75,7 @@ static int32_t init_env() { pRes = taos_query(pConn, "create database tmqdb precision 'ns'"); if (taos_errno(pRes) != 0) { printf("error in create tmqdb, reason:%s\n", taos_errstr(pRes)); - return -1; + goto END; } taos_free_result(pRes); @@ -87,7 +85,7 @@ static int32_t init_env() { pConn, "create table tmqdb.stb (ts timestamp, c1 int, c2 float, c3 varchar(16)) tags(t1 int, t3 varchar(16))"); if (taos_errno(pRes) != 0) { printf("failed to create super table stb, reason:%s\n", taos_errstr(pRes)); - return -1; + goto END; } taos_free_result(pRes); @@ -96,28 +94,28 @@ static int32_t init_env() { pRes = taos_query(pConn, "create table tmqdb.ctb0 using tmqdb.stb tags(0, 'subtable0')"); if (taos_errno(pRes) != 0) { printf("failed to create super table ctb0, reason:%s\n", taos_errstr(pRes)); - return -1; + goto END; } taos_free_result(pRes); pRes = taos_query(pConn, "create table tmqdb.ctb1 using tmqdb.stb tags(1, 'subtable1')"); if (taos_errno(pRes) != 0) { printf("failed to create super table ctb1, reason:%s\n", taos_errstr(pRes)); - return -1; + goto END; } taos_free_result(pRes); pRes = taos_query(pConn, "create table tmqdb.ctb2 using tmqdb.stb tags(2, 'subtable2')"); if (taos_errno(pRes) != 0) { printf("failed to create super table ctb2, reason:%s\n", taos_errstr(pRes)); - return -1; + goto END; } taos_free_result(pRes); pRes = taos_query(pConn, "create table tmqdb.ctb3 using tmqdb.stb tags(3, 'subtable3')"); if (taos_errno(pRes) != 0) { printf("failed to create super table ctb3, reason:%s\n", taos_errstr(pRes)); - return -1; + goto END; } taos_free_result(pRes); @@ -126,33 +124,37 @@ static int32_t init_env() { pRes = taos_query(pConn, "insert into tmqdb.ctb0 values(now, 0, 0, 'a0')(now+1s, 0, 0, 'a00')"); if (taos_errno(pRes) != 0) { printf("failed to insert into ctb0, reason:%s\n", taos_errstr(pRes)); - return -1; + goto END; } taos_free_result(pRes); pRes = taos_query(pConn, "insert into tmqdb.ctb1 values(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11')"); if (taos_errno(pRes) != 0) { printf("failed to insert into ctb0, reason:%s\n", taos_errstr(pRes)); - return -1; + goto END; } taos_free_result(pRes); pRes = taos_query(pConn, "insert into tmqdb.ctb2 values(now, 2, 2, 'a1')(now+1s, 22, 22, 'a22')"); if (taos_errno(pRes) != 0) { printf("failed to insert into ctb0, reason:%s\n", taos_errstr(pRes)); - return -1; + goto END; } taos_free_result(pRes); pRes = taos_query(pConn, "insert into tmqdb.ctb3 values(now, 3, 3, 'a1')(now+1s, 33, 33, 'a33')"); if (taos_errno(pRes) != 0) { printf("failed to insert into ctb0, reason:%s\n", taos_errstr(pRes)); - return -1; + goto END; } taos_free_result(pRes); - taos_close(pConn); return 0; + +END: + taos_free_result(pRes); + taos_close(pConn); + return -1; } int32_t create_topic() { diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 350ecdd373..c63ddea25d 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -438,6 +438,7 @@ int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) { taosMemoryFree(pParam->pOffset); taosMemoryFree(pBuf->pData); + taosMemoryFree(pBuf->pEpSet); /*tscDebug("receive offset commit cb of %s on vgId:%d, offset is %" PRId64, pParam->pOffset->subKey, pParam->->vgId, * pOffset->version);*/ @@ -724,7 +725,10 @@ void tmqAssignDelayedReportTask(void* param, void* tmrId) { } int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) { - if (pMsg && pMsg->pData) taosMemoryFree(pMsg->pData); + if (pMsg) { + taosMemoryFree(pMsg->pData); + taosMemoryFree(pMsg->pEpSet); + } return 0; } @@ -869,6 +873,8 @@ void tmqClearUnhandleMsg(tmq_t* tmq) { int32_t tmqSubscribeCb(void* param, SDataBuf* pMsg, int32_t code) { SMqSubscribeCbParam* pParam = (SMqSubscribeCbParam*)param; pParam->rspErr = code; + + taosMemoryFree(pMsg->pEpSet); tsem_post(&pParam->rspSem); return 0; } @@ -1166,6 +1172,8 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { if (code != 0) { tscWarn("msg discard from vgId:%d, epoch %d, since %s", vgId, epoch, terrstr()); if (pMsg->pData) taosMemoryFree(pMsg->pData); + if (pMsg->pEpSet) taosMemoryFree(pMsg->pEpSet); + if (code == TSDB_CODE_TMQ_CONSUMER_MISMATCH) { atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__RECOVER); goto CREATE_MSG_FAIL; @@ -1365,6 +1373,7 @@ int32_t tmqAskEpCb(void* param, SDataBuf* pMsg, int32_t code) { taosMemoryFree(pParam); } taosMemoryFree(pMsg->pData); + taosMemoryFree(pMsg->pEpSet); terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED; return -1; } @@ -1416,6 +1425,8 @@ END: } else { taosMemoryFree(pParam); } + + taosMemoryFree(pMsg->pEpSet); taosMemoryFree(pMsg->pData); return code; } diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index c23b461b47..b6de9383d7 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -456,6 +456,7 @@ int32_t schHandleLinkBrokenCallback(void *param, SDataBuf *pMsg, int32_t code) { if (head->isHbParam) { taosMemoryFree(pMsg->pData); + taosMemoryFree(pMsg->pEpSet); SSchHbCallbackParam *hbParam = (SSchHbCallbackParam *)param; SSchTrans trans = {.pTrans = hbParam->pTrans, .pHandle = NULL}; diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 6ad126162e..d144a76eb0 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -599,6 +599,10 @@ static int32_t allocConnRef(SCliConn* conn, bool update) { exh->pThrd = conn->hostThrd; exh->refId = transAddExHandle(transGetRefMgt(), exh); conn->refId = exh->refId; + + if (conn->refId == -1) { + taosMemoryFree(exh); + } return 0; }