From a703b7024d830246ac11a10674afddddf2bbe6e1 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Fri, 28 Jan 2022 15:57:38 +0800 Subject: [PATCH 01/10] support select * from stable --- source/dnode/vnode/src/tq/tq.c | 6 +++++- source/util/src/tlog.c | 2 +- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index ade623a736..f3163fb515 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -916,6 +916,11 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) { int32_t numOfCols = pHandle->pSchema->numOfCols; int32_t colNumNeed = taosArrayGetSize(pHandle->pColIdList); + //TODO: stable case + if (colNumNeed > pSchemaWrapper->nCols) { + colNumNeed = pSchemaWrapper->nCols; + } + SArray* pArray = taosArrayInit(colNumNeed, sizeof(SColumnInfoData)); if (pArray == NULL) { return NULL; @@ -928,7 +933,6 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) { j++; } SSchema* pColSchema = &pSchemaWrapper->pSchema[j]; - ASSERT(pColSchema->colId == colId); SColumnInfoData colInfo = {0}; int sz = numOfRows * pColSchema->bytes; colInfo.info.bytes = pColSchema->bytes; diff --git a/source/util/src/tlog.c b/source/util/src/tlog.c index c7b1350591..5ce69f1c2e 100644 --- a/source/util/src/tlog.c +++ b/source/util/src/tlog.c @@ -92,7 +92,7 @@ int32_t debugFlag = 0; int32_t sDebugFlag = 135; int32_t wDebugFlag = 135; int32_t tsdbDebugFlag = 131; -int32_t tqDebugFlag = 131; +int32_t tqDebugFlag = 135; int32_t cqDebugFlag = 131; int32_t fsDebugFlag = 135; From ff57f57c57ea0191a9da4cb0aeaaea3c68bbe13a Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Fri, 28 Jan 2022 17:19:12 +0800 Subject: [PATCH 02/10] refactor tmq --- include/client/taos.h | 63 ++++++++++++------ source/client/src/tmq.c | 144 ++++++++++++++++++++++------------------ 2 files changed, 121 insertions(+), 86 deletions(-) diff --git a/include/client/taos.h b/include/client/taos.h index 3db9046119..029aad8715 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -92,17 +92,6 @@ typedef struct taosField { typedef void (*__taos_async_fn_t)(void *param, TAOS_RES *, int code); -typedef struct tmq_t tmq_t; -typedef struct tmq_conf_t tmq_conf_t; -typedef struct tmq_list_t tmq_list_t; - -typedef struct tmq_message_t tmq_message_t; -typedef struct tmq_message_topic_t tmq_message_topic_t; -typedef struct tmq_message_tb_t tmq_message_tb_t; -typedef struct tmq_tb_iter_t tmq_tb_iter_t; -typedef struct tmq_message_col_t tmq_message_col_t; -typedef struct tmq_col_iter_t tmq_col_iter_t; - typedef struct TAOS_BIND { int buffer_type; void * buffer; @@ -205,27 +194,59 @@ DLL_EXPORT int taos_load_table_info(TAOS *taos, const char* tableNameList); DLL_EXPORT TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int protocol, int precision); /* --------------------------TMQ INTERFACE------------------------------- */ -typedef struct tmq_resp_err_t tmq_resp_err_t; + +enum tmq_resp_err_t { + TMQ_RESP_ERR__SUCCESS = 0, + TMQ_RESP_ERR__FAIL = 1, +}; + +typedef enum tmq_resp_err_t tmq_resp_err_t; + +typedef struct tmq_t tmq_t; typedef struct tmq_topic_vgroup_t tmq_topic_vgroup_t; typedef struct tmq_topic_vgroup_list_t tmq_topic_vgroup_list_t; +typedef struct tmq_conf_t tmq_conf_t; +typedef struct tmq_list_t tmq_list_t; +typedef struct tmq_message_t tmq_message_t; + typedef void (tmq_commit_cb(tmq_t*, tmq_resp_err_t, tmq_topic_vgroup_list_t*, void* param)); DLL_EXPORT tmq_list_t* tmq_list_new(); DLL_EXPORT int32_t tmq_list_append(tmq_list_t*, char*); -DLL_EXPORT tmq_conf_t* tmq_conf_new(); - -DLL_EXPORT int32_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value); -DLL_EXPORT void tmq_conf_set_offset_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb); - DLL_EXPORT TAOS_RES *taos_create_topic(TAOS* taos, const char* name, const char* sql, int sqlLen); - DLL_EXPORT tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errstrLen); -DLL_EXPORT TAOS_RES* tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list); - +/* ------------------------TMQ CONSUMER INTERFACE------------------------ */ +DLL_EXPORT tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list); +#if 0 +DLL_EXPORT tmq_resp_err_t tmq_unsubscribe(tmq_t* tmq); +DLL_EXPORT tmq_resp_err_t tmq_subscription(tmq_t* tmq, tmq_topic_vgroup_list_t** topics); +#endif DLL_EXPORT tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time); -DLL_EXPORT tmq_resp_err_t* tmq_commit(tmq_t* tmq, tmq_topic_vgroup_list_t* tmq_topic_vgroup_list, int32_t async); +#if 0 +DLL_EXPORT tmq_resp_err_t tmq_consumer_close(tmq_t* tmq); +DLL_EXPORT tmq_resp_err_t tmq_assign(tmq_t* tmq, const tmq_topic_vgroup_list_t* vgroups); +DLL_EXPORT tmq_resp_err_t tmq_assignment(tmq_t* tmq, tmq_topic_vgroup_list_t** vgroups); +#endif +DLL_EXPORT tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int32_t async); +#if 0 +DLL_EXPORT tmq_resp_err_t tmq_commit_message(tmq_t* tmq, const tmq_message_t* tmqmessage, int32_t async); +#endif +/* ----------------------TMQ CONFIGURATION INTERFACE---------------------- */ + +enum tmq_conf_res_t { + TMQ_CONF_UNKNOWN = -2, + TMQ_CONF_INVALID = -1, + TMQ_CONF_OK = 0, +}; + +typedef enum tmq_conf_res_t tmq_conf_res_t; + +DLL_EXPORT tmq_conf_t* tmq_conf_new(); +DLL_EXPORT void tmq_conf_destroy(tmq_conf_t* conf); +DLL_EXPORT tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value); +DLL_EXPORT void tmq_conf_set_offset_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb); #ifdef __cplusplus } diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 632b1dddb4..a4af40285e 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -26,6 +26,51 @@ #include "tpagedfile.h" #include "tref.h" +struct tmq_list_t { + int32_t cnt; + int32_t tot; + char* elems[]; +}; +struct tmq_topic_vgroup_t { + char* topic; + int32_t vgId; + int64_t commitOffset; +}; + +struct tmq_topic_vgroup_list_t { + int32_t cnt; + int32_t size; + tmq_topic_vgroup_t* elems; +}; + +struct tmq_conf_t { + char clientId[256]; + char groupId[256]; + /*char* ip;*/ + /*uint16_t port;*/ + tmq_commit_cb* commit_cb; +}; + +struct tmq_t { + char groupId[256]; + char clientId[256]; + SRWLatch lock; + int64_t consumerId; + int64_t epoch; + int64_t status; + tsem_t rspSem; + STscObj* pTscObj; + tmq_commit_cb* commit_cb; + int32_t nextTopicIdx; + SArray* clientTopics; //SArray + //stat + int64_t pollCnt; +}; + +struct tmq_message_t { + SMqConsumeRsp rsp; +}; + typedef struct SMqClientVg { // statistics int64_t pollCnt; @@ -47,83 +92,43 @@ typedef struct SMqClientTopic { SArray* vgs; //SArray } SMqClientTopic; + typedef struct SMqAskEpCbParam { tmq_t* tmq; int32_t wait; } SMqAskEpCbParam; -struct tmq_resp_err_t { - int32_t code; -}; - -struct tmq_topic_vgroup_t { - char* topic; - int32_t vgId; - int64_t commitOffset; -}; - -struct tmq_topic_vgroup_list_t { - int32_t cnt; - int32_t size; - tmq_topic_vgroup_t* elems; -}; - typedef struct SMqConsumeCbParam { tmq_t* tmq; SMqClientVg* pVg; tmq_message_t** retMsg; } SMqConsumeCbParam; -struct tmq_conf_t { - char clientId[256]; - char groupId[256]; - char* ip; - uint16_t port; - tmq_commit_cb* commit_cb; -}; - -struct tmq_message_t { - SMqConsumeRsp rsp; -}; - +typedef struct SMqSubscribeCbParam { + tmq_t* tmq; + tsem_t rspSem; + tmq_resp_err_t rspErr; +} SMqSubscribeCbParam; tmq_conf_t* tmq_conf_new() { tmq_conf_t* conf = calloc(1, sizeof(tmq_conf_t)); return conf; } -int32_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value) { +void tmq_conf_destroy(tmq_conf_t* conf) { + if(conf) free(conf); +} + +tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value) { if (strcmp(key, "group.id") == 0) { strcpy(conf->groupId, value); } if (strcmp(key, "client.id") == 0) { strcpy(conf->clientId, value); } - return 0; + return TMQ_CONF_OK; } -struct tmq_t { - char groupId[256]; - char clientId[256]; - SRWLatch lock; - int64_t consumerId; - int64_t epoch; - int64_t status; - tsem_t rspSem; - STscObj* pTscObj; - tmq_commit_cb* commit_cb; - int32_t nextTopicIdx; - SArray* clientTopics; //SArray - //stat - int64_t pollCnt; -}; - -struct tmq_list_t { - int32_t cnt; - int32_t tot; - char* elems[]; -}; - tmq_list_t* tmq_list_new() { tmq_list_t *ptr = malloc(sizeof(tmq_list_t) + 8 * sizeof(char*)); if (ptr == NULL) { @@ -141,6 +146,12 @@ int32_t tmq_list_append(tmq_list_t* ptr, char* src) { return 0; } +int32_t tmqSubscribeCb(void* param, const SDataBuf* pMsg, int32_t code) { + SMqSubscribeCbParam* pParam = (SMqSubscribeCbParam*)param; + pParam->rspErr = code; + tsem_post(&pParam->rspSem); + return 0; +} tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errstrLen) { tmq_t* pTmq = calloc(sizeof(tmq_t), 1); @@ -161,7 +172,7 @@ tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errs return pTmq; } -TAOS_RES* tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) { +tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) { SRequestObj *pRequest = NULL; int32_t sz = topic_list->cnt; //destroy ex @@ -219,27 +230,31 @@ TAOS_RES* tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) { tscError("failed to malloc sqlObj"); } + SMqSubscribeCbParam param = { + .rspErr = TMQ_RESP_ERR__SUCCESS, + .tmq = tmq + }; + tsem_init(¶m.rspSem, 0, 0); + pRequest->body.requestMsg = (SDataBuf){ .pData = buf, .len = tlen }; SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest); - /*sendInfo->fp*/ + sendInfo->param = ¶m; + sendInfo->fp = tmqSubscribeCb; SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp); int64_t transporterId = 0; asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); - tsem_wait(&pRequest->body.rspSem); + tsem_wait(¶m.rspSem); + tsem_destroy(¶m.rspSem); _return: /*if (sendInfo != NULL) {*/ /*destroySendMsgInfo(sendInfo);*/ /*}*/ - if (pRequest != NULL && terrno != TSDB_CODE_SUCCESS) { - pRequest->code = terrno; - } - - return pRequest; + return param.rspErr; } void tmq_conf_set_offset_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb) { @@ -611,10 +626,10 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { tmq_message_t* tmq_message = NULL; int64_t status = atomic_load_64(&tmq->status); - tmqAsyncAskEp(tmq, status == 0 || taosArrayGetSize(tmq->clientTopics)); + tmqAsyncAskEp(tmq, taosArrayGetSize(tmq->clientTopics)); /*if (blocking_time < 0) blocking_time = 500;*/ - blocking_time = 1000; + blocking_time = 1; if (taosArrayGetSize(tmq->clientTopics) == 0) { tscDebug("consumer:%ld poll but not assigned", tmq->consumerId); @@ -674,16 +689,15 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { /*return pRequest;*/ } -tmq_resp_err_t* tmq_commit(tmq_t* tmq, tmq_topic_vgroup_list_t* tmq_topic_vgroup_list, int32_t async) { +tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* tmq_topic_vgroup_list, int32_t async) { SMqConsumeReq req = {0}; - return NULL; + return 0; } void tmq_message_destroy(tmq_message_t* tmq_message) { if (tmq_message == NULL) return; } - static void destroySendMsgInfo(SMsgSendInfo* pMsgBody) { assert(pMsgBody != NULL); tfree(pMsgBody->msgInfo.pData); From 2550d42cd3cf7d870efe46bbfbcc397d26b78ef5 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Fri, 28 Jan 2022 17:56:12 +0800 Subject: [PATCH 03/10] fix mem leak --- include/common/common.h | 18 ++++++++++++++++++ source/client/test/clientTests.cpp | 2 +- source/dnode/vnode/src/tq/tq.c | 7 +++++++ 3 files changed, 26 insertions(+), 1 deletion(-) diff --git a/include/common/common.h b/include/common/common.h index bbde65b48f..092a666e73 100644 --- a/include/common/common.h +++ b/include/common/common.h @@ -174,6 +174,24 @@ static FORCE_INLINE void* tDecodeSMqConsumeRsp(void* buf, SMqConsumeRsp* pRsp) { return buf; } +static FORCE_INLINE void destroySSDataBlock(SSDataBlock* pBlock) { + if (pBlock == NULL) { + return; + } + + //int32_t numOfOutput = pBlock->info.numOfCols; + int32_t sz = taosArrayGetSize(pBlock->pDataBlock); + for(int32_t i = 0; i < sz; ++i) { + SColumnInfoData* pColInfoData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i); + tfree(pColInfoData->pData); + } + + taosArrayDestroy(pBlock->pDataBlock); + tfree(pBlock->pBlockAgg); + tfree(pBlock); +} + + //====================================================================================================================== // the following structure shared by parser and executor typedef struct SColumn { diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index c42b023d12..7ef06397f0 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -606,7 +606,7 @@ TEST(testCase, create_topic_stb_Test) { taos_free_result(pRes); - char* sql = "select ts, k from st1"; + char* sql = "select * from st1"; pRes = taos_create_topic(pConn, "test_stb_topic_1", sql, strlen(sql)); taos_free_result(pRes); taos_close(pConn); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index f3163fb515..49d15e6148 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -785,6 +785,13 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { } void* abuf = buf; tEncodeSMqConsumeRsp(&abuf, &rsp); + if (rsp.pBlockData) { + for (int i = 0; i < taosArrayGetSize(rsp.pBlockData); i++) { + SSDataBlock* pBlock = taosArrayGet(rsp.pBlockData, i); + destroySSDataBlock(pBlock); + } + free(rsp.pBlockData); + } pMsg->pCont = buf; pMsg->contLen = tlen; pMsg->code = 0; From 688a85c9c3076bee5ab148aad6e31592fd944afd Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Fri, 28 Jan 2022 18:14:38 +0800 Subject: [PATCH 04/10] fix mem leak --- source/dnode/mnode/impl/src/mndSubscribe.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index fa8c78f4b1..62083a3d03 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -122,7 +122,7 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) { if (changed || found) { SSdbRaw *pRaw = mndSubActionEncode(pSub); sdbSetRawStatus(pRaw, SDB_STATUS_READY); - sdbWriteNotFree(pMnode->pSdb, pRaw); + sdbWrite(pMnode->pSdb, pRaw); } mndReleaseSubscribe(pMnode, pSub); } @@ -434,6 +434,7 @@ SUB_DECODE_OVER: if (terrno != TSDB_CODE_SUCCESS) { mError("subscribe:%s, failed to decode from raw:%p since %s", pSub->key, pRaw, terrstr()); // TODO free subscribeobj + tfree(buf); tfree(pRow); return NULL; } From 108fd4cfc36926fc25cda1eb39d714c9af679b5f Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Fri, 28 Jan 2022 18:39:34 +0800 Subject: [PATCH 05/10] fix mem leak --- source/client/src/tmq.c | 4 +++- source/dnode/mnode/impl/inc/mndDef.h | 14 ++++++++++++++ source/dnode/mnode/impl/src/mndSubscribe.c | 1 + 3 files changed, 18 insertions(+), 1 deletion(-) diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index a4af40285e..6603c4019f 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -565,6 +565,7 @@ int32_t tmqAsyncAskEp(tmq_t* tmq, bool wait) { int32_t tlen = sizeof(SMqCMGetSubEpReq); SMqCMGetSubEpReq* buf = malloc(tlen); if (buf == NULL) { + goto END; tscError("failed to malloc get subscribe ep buf"); } buf->consumerId = htobe64(tmq->consumerId); @@ -572,6 +573,7 @@ int32_t tmqAsyncAskEp(tmq_t* tmq, bool wait) { SRequestObj *pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_MND_GET_SUB_EP); if (pRequest == NULL) { + goto END; tscError("failed to malloc subscribe ep request"); } @@ -579,7 +581,6 @@ int32_t tmqAsyncAskEp(tmq_t* tmq, bool wait) { SMqAskEpCbParam *pParam = malloc(sizeof(SMqAskEpCbParam)); if (pParam == NULL) { - free(buf); goto END; } pParam->tmq = tmq; @@ -596,6 +597,7 @@ int32_t tmqAsyncAskEp(tmq_t* tmq, bool wait) { asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); END: + tfree(buf); if (wait) tsem_wait(&tmq->rspSem); return 0; } diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 9cce25ed6a..1b7383bee1 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -393,6 +393,12 @@ static FORCE_INLINE void* tDecodeSMqConsumerEp(void** buf, SMqConsumerEp* pConsu return buf; } +static FORCE_INLINE void tDeleteSMqConsumerEp(SMqConsumerEp* pConsumerEp) { + if (pConsumerEp) { + tfree(pConsumerEp->qmsg); + } +} + // unit for rebalance typedef struct SMqSubscribeObj { char key[TSDB_SUBSCRIBE_KEY_LEN]; @@ -571,6 +577,14 @@ static FORCE_INLINE void* tDecodeSubscribeObj(void* buf, SMqSubscribeObj* pSub) return buf; } +static FORCE_INLINE void tDeleteSMqSubscribeObj(SMqSubscribeObj* pSub) { + if (pSub->availConsumer) taosArrayDestroy(pSub->availConsumer); + if (pSub->assigned) taosArrayDestroyEx(pSub->assigned, (void (*)(void*))tDeleteSMqConsumerEp); + if (pSub->unassignedVg) taosArrayDestroyEx(pSub->unassignedVg, (void (*)(void*))tDeleteSMqConsumerEp); + if (pSub->idleConsumer) taosArrayDestroyEx(pSub->idleConsumer, (void (*)(void*))tDeleteSMqConsumerEp); + if (pSub->lostConsumer) taosArrayDestroyEx(pSub->lostConsumer, (void (*)(void*))tDeleteSMqConsumerEp); +} + typedef struct SMqCGroup { char name[TSDB_CONSUMER_GROUP_LEN]; int32_t status; // 0 - uninitialized, 1 - wait rebalance, 2- normal diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 62083a3d03..0da5b5f27e 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -449,6 +449,7 @@ static int32_t mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *pSub) { static int32_t mndSubActionDelete(SSdb *pSdb, SMqSubscribeObj *pSub) { mTrace("subscribe:%s, perform delete action", pSub->key); + tDeleteSMqSubscribeObj(pSub); return 0; } From 19892cfaf78612e3c8d78899764d616ab0d346a4 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Fri, 28 Jan 2022 18:58:22 +0800 Subject: [PATCH 06/10] fix double free --- source/client/src/tmq.c | 1 - source/dnode/mnode/impl/inc/mndDef.h | 33 ++++++++++++++++------ source/dnode/mnode/impl/src/mndSubscribe.c | 2 +- 3 files changed, 25 insertions(+), 11 deletions(-) diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 6603c4019f..f9ade7287f 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -597,7 +597,6 @@ int32_t tmqAsyncAskEp(tmq_t* tmq, bool wait) { asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); END: - tfree(buf); if (wait) tsem_wait(&tmq->rspSem); return 0; } diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 1b7383bee1..a9a633cf63 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -526,7 +526,7 @@ static FORCE_INLINE void* tDecodeSubscribeObj(void* buf, SMqSubscribeObj* pSub) return NULL; } for (int32_t i = 0; i < sz; i++) { - SMqConsumerEp cEp; + SMqConsumerEp cEp = {0}; buf = tDecodeSMqConsumerEp(buf, &cEp); taosArrayPush(pSub->assigned, &cEp); } @@ -539,7 +539,7 @@ static FORCE_INLINE void* tDecodeSubscribeObj(void* buf, SMqSubscribeObj* pSub) return NULL; } for (int32_t i = 0; i < sz; i++) { - SMqConsumerEp cEp; + SMqConsumerEp cEp = {0}; buf = tDecodeSMqConsumerEp(buf, &cEp); taosArrayPush(pSub->lostConsumer, &cEp); } @@ -553,7 +553,7 @@ static FORCE_INLINE void* tDecodeSubscribeObj(void* buf, SMqSubscribeObj* pSub) return NULL; } for (int32_t i = 0; i < sz; i++) { - SMqConsumerEp cEp; + SMqConsumerEp cEp = {0}; buf = tDecodeSMqConsumerEp(buf, &cEp); taosArrayPush(pSub->idleConsumer, &cEp); } @@ -569,7 +569,7 @@ static FORCE_INLINE void* tDecodeSubscribeObj(void* buf, SMqSubscribeObj* pSub) return NULL; } for (int32_t i = 0; i < sz; i++) { - SMqConsumerEp cEp; + SMqConsumerEp cEp = {0}; buf = tDecodeSMqConsumerEp(buf, &cEp); taosArrayPush(pSub->unassignedVg, &cEp); } @@ -578,11 +578,26 @@ static FORCE_INLINE void* tDecodeSubscribeObj(void* buf, SMqSubscribeObj* pSub) } static FORCE_INLINE void tDeleteSMqSubscribeObj(SMqSubscribeObj* pSub) { - if (pSub->availConsumer) taosArrayDestroy(pSub->availConsumer); - if (pSub->assigned) taosArrayDestroyEx(pSub->assigned, (void (*)(void*))tDeleteSMqConsumerEp); - if (pSub->unassignedVg) taosArrayDestroyEx(pSub->unassignedVg, (void (*)(void*))tDeleteSMqConsumerEp); - if (pSub->idleConsumer) taosArrayDestroyEx(pSub->idleConsumer, (void (*)(void*))tDeleteSMqConsumerEp); - if (pSub->lostConsumer) taosArrayDestroyEx(pSub->lostConsumer, (void (*)(void*))tDeleteSMqConsumerEp); + if (pSub->availConsumer) { + taosArrayDestroy(pSub->availConsumer); + pSub->availConsumer = NULL; + } + if (pSub->assigned) { + taosArrayDestroyEx(pSub->assigned, (void (*)(void*))tDeleteSMqConsumerEp); + pSub->assigned = NULL; + } + if (pSub->unassignedVg) { + taosArrayDestroyEx(pSub->unassignedVg, (void (*)(void*))tDeleteSMqConsumerEp); + pSub->unassignedVg = NULL; + } + if (pSub->idleConsumer) { + taosArrayDestroyEx(pSub->idleConsumer, (void (*)(void*))tDeleteSMqConsumerEp); + pSub->idleConsumer = NULL; + } + if (pSub->lostConsumer) { + taosArrayDestroyEx(pSub->lostConsumer, (void (*)(void*))tDeleteSMqConsumerEp); + pSub->lostConsumer = NULL; + } } typedef struct SMqCGroup { diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 0da5b5f27e..47d7e14e45 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -292,7 +292,7 @@ static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unas return -1; } - SMqConsumerEp CEp; + SMqConsumerEp CEp = {0}; CEp.status = 0; CEp.consumerId = -1; CEp.lastConsumerHbTs = CEp.lastVgHbTs = -1; From 8f24333d7d3594401ebb245f620194f4adb3b082 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Fri, 28 Jan 2022 19:10:18 +0800 Subject: [PATCH 07/10] fix mem leak --- include/common/tmsg.h | 8 ++++++++ source/client/src/tmq.c | 1 + source/dnode/mnode/impl/src/mndConsumer.c | 2 ++ source/dnode/mnode/impl/src/mndSubscribe.c | 3 ++- 4 files changed, 13 insertions(+), 1 deletion(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 154973e20f..a5b9d356f8 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1744,6 +1744,10 @@ typedef struct SMqCMGetSubEpRsp { SArray* topics; // SArray } SMqCMGetSubEpRsp; +static FORCE_INLINE void tDeleteSMqSubTopicEp(SMqSubTopicEp* pSubTopicEp) { + taosArrayDestroy(pSubTopicEp->vgs); +} + static FORCE_INLINE int32_t tEncodeSMqSubVgEp(void** buf, const SMqSubVgEp* pVgEp) { int32_t tlen = 0; tlen += taosEncodeFixedI32(buf, pVgEp->vgId); @@ -1757,6 +1761,10 @@ static FORCE_INLINE void* tDecodeSMqSubVgEp(void* buf, SMqSubVgEp* pVgEp) { return buf; } +static FORCE_INLINE void tDeleteSMqCMGetSubEpRsp(SMqCMGetSubEpRsp* pRsp) { + taosArrayDestroyEx(pRsp->topics, (void (*)(void*)) tDeleteSMqSubTopicEp); +} + static FORCE_INLINE int32_t tEncodeSMqSubTopicEp(void** buf, const SMqSubTopicEp* pTopicEp) { int32_t tlen = 0; tlen += taosEncodeString(buf, pTopicEp->topic); diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index f9ade7287f..ccdd1e64cb 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -557,6 +557,7 @@ int32_t tmq_ask_ep_cb(void* param, const SDataBuf* pMsg, int32_t code) { if (pParam->wait) { tsem_post(&tmq->rspSem); } + tDeleteSMqCMGetSubEpRsp(&rsp); free(pParam); return 0; } diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index e642b578fa..6fd61baee3 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -75,6 +75,7 @@ SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer) { terrno = TSDB_CODE_SUCCESS; CM_ENCODE_OVER: + tfree(buf); if (terrno != 0) { mError("consumer:%ld, failed to encode to raw:%p since %s", pConsumer->consumerId, pRaw, terrstr()); sdbFreeRaw(pRaw); @@ -117,6 +118,7 @@ SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) { terrno = TSDB_CODE_SUCCESS; CM_DECODE_OVER: + tfree(buf); if (terrno != TSDB_CODE_SUCCESS) { mError("consumer:%ld, failed to decode from raw:%p since %s", pConsumer->consumerId, pRaw, terrstr()); tfree(pRow); diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 47d7e14e45..8e62d48e61 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -388,6 +388,7 @@ static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *pSub) { terrno = TSDB_CODE_SUCCESS; SUB_ENCODE_OVER: + tfree(buf); if (terrno != 0) { mError("subscribe:%s, failed to encode to raw:%p since %s", pSub->key, pRaw, terrstr()); sdbFreeRaw(pRaw); @@ -431,10 +432,10 @@ static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw) { terrno = TSDB_CODE_SUCCESS; SUB_DECODE_OVER: + tfree(buf); if (terrno != TSDB_CODE_SUCCESS) { mError("subscribe:%s, failed to decode from raw:%p since %s", pSub->key, pRaw, terrstr()); // TODO free subscribeobj - tfree(buf); tfree(pRow); return NULL; } From ee783080f341580cff84005cd90c60b28f474cc5 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Fri, 28 Jan 2022 19:22:48 +0800 Subject: [PATCH 08/10] fix mem leak --- include/common/common.h | 15 ++++++++++++++- source/client/src/tmq.c | 4 ++++ source/dnode/vnode/src/tq/tq.c | 2 +- 3 files changed, 19 insertions(+), 2 deletions(-) diff --git a/include/common/common.h b/include/common/common.h index 092a666e73..16a691c338 100644 --- a/include/common/common.h +++ b/include/common/common.h @@ -174,7 +174,7 @@ static FORCE_INLINE void* tDecodeSMqConsumeRsp(void* buf, SMqConsumeRsp* pRsp) { return buf; } -static FORCE_INLINE void destroySSDataBlock(SSDataBlock* pBlock) { +static FORCE_INLINE void tDeleteSSDataBlock(SSDataBlock* pBlock) { if (pBlock == NULL) { return; } @@ -192,6 +192,19 @@ static FORCE_INLINE void destroySSDataBlock(SSDataBlock* pBlock) { } +static FORCE_INLINE void tDeleteSMqConsumeRsp(SMqConsumeRsp* pRsp) { + if (pRsp->schemas) { + if (pRsp->schemas->nCols) { + tfree(pRsp->schemas->pSchema); + } + free(pRsp->schemas); + } + for (int i = 0; i < taosArrayGetSize(pRsp->pBlockData); i++) { + SSDataBlock* pDataBlock = (SSDataBlock*)taosArrayGet(pRsp->pBlockData, i); + tDeleteSSDataBlock(pDataBlock); + } +} + //====================================================================================================================== // the following structure shared by parser and executor typedef struct SColumn { diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index ccdd1e64cb..26177f466e 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -456,6 +456,7 @@ static char *formatTimestamp(char *buf, int64_t val, int precision) { int32_t tmq_poll_cb_inner(void* param, const SDataBuf* pMsg, int32_t code) { if (code == -1) { printf("msg discard\n"); + free(param); return 0; } char pBuf[128]; @@ -465,6 +466,7 @@ int32_t tmq_poll_cb_inner(void* param, const SDataBuf* pMsg, int32_t code) { tDecodeSMqConsumeRsp(pMsg->pData, &rsp); if (rsp.numOfTopics == 0) { /*printf("no data\n");*/ + free(param); return 0; } int32_t colNum = rsp.schemas->nCols; @@ -501,6 +503,8 @@ int32_t tmq_poll_cb_inner(void* param, const SDataBuf* pMsg, int32_t code) { printf("\n"); } } + tDeleteSMqConsumeRsp(&rsp); + free(param); /*printf("\n-----msg end------\n");*/ return 0; } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 49d15e6148..7299710586 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -788,7 +788,7 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { if (rsp.pBlockData) { for (int i = 0; i < taosArrayGetSize(rsp.pBlockData); i++) { SSDataBlock* pBlock = taosArrayGet(rsp.pBlockData, i); - destroySSDataBlock(pBlock); + tDeleteSSDataBlock(pBlock); } free(rsp.pBlockData); } From 5aa0f80c913d78a1ea6a538c5097e14f5a2c4ef0 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Fri, 28 Jan 2022 19:29:18 +0800 Subject: [PATCH 09/10] fix mem leak --- include/common/common.h | 12 +++++++----- source/dnode/vnode/src/tq/tq.c | 12 +++++++----- 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/include/common/common.h b/include/common/common.h index 16a691c338..bad9adedf5 100644 --- a/include/common/common.h +++ b/include/common/common.h @@ -188,7 +188,7 @@ static FORCE_INLINE void tDeleteSSDataBlock(SSDataBlock* pBlock) { taosArrayDestroy(pBlock->pDataBlock); tfree(pBlock->pBlockAgg); - tfree(pBlock); + //tfree(pBlock); } @@ -199,10 +199,12 @@ static FORCE_INLINE void tDeleteSMqConsumeRsp(SMqConsumeRsp* pRsp) { } free(pRsp->schemas); } - for (int i = 0; i < taosArrayGetSize(pRsp->pBlockData); i++) { - SSDataBlock* pDataBlock = (SSDataBlock*)taosArrayGet(pRsp->pBlockData, i); - tDeleteSSDataBlock(pDataBlock); - } + taosArrayDestroyEx(pRsp->pBlockData, (void(*)(void*))tDeleteSSDataBlock); + pRsp->pBlockData = NULL; + //for (int i = 0; i < taosArrayGetSize(pRsp->pBlockData); i++) { + //SSDataBlock* pDataBlock = (SSDataBlock*)taosArrayGet(pRsp->pBlockData, i); + //tDeleteSSDataBlock(pDataBlock); + //} } //====================================================================================================================== diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 7299710586..139120a46d 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -786,11 +786,13 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { void* abuf = buf; tEncodeSMqConsumeRsp(&abuf, &rsp); if (rsp.pBlockData) { - for (int i = 0; i < taosArrayGetSize(rsp.pBlockData); i++) { - SSDataBlock* pBlock = taosArrayGet(rsp.pBlockData, i); - tDeleteSSDataBlock(pBlock); - } - free(rsp.pBlockData); + taosArrayDestroyEx(rsp.pBlockData, (void(*)(void*))tDeleteSSDataBlock); + rsp.pBlockData = NULL; + /*for (int i = 0; i < taosArrayGetSize(rsp.pBlockData); i++) {*/ + /*SSDataBlock* pBlock = taosArrayGet(rsp.pBlockData, i);*/ + /*tDeleteSSDataBlock(pBlock);*/ + /*}*/ + /*taosArrayDestroy(rsp.pBlockData);*/ } pMsg->pCont = buf; pMsg->contLen = tlen; From a1d143384ab963ec2fa61ef8781b1742a318a8c1 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Fri, 28 Jan 2022 19:58:56 +0800 Subject: [PATCH 10/10] fix mem leak --- include/common/common.h | 4 ++-- source/client/src/tmq.c | 2 +- source/dnode/mnode/impl/src/mndSubscribe.c | 4 ++-- source/dnode/mnode/impl/src/mndTopic.c | 7 +++---- 4 files changed, 8 insertions(+), 9 deletions(-) diff --git a/include/common/common.h b/include/common/common.h index bad9adedf5..31f905d47f 100644 --- a/include/common/common.h +++ b/include/common/common.h @@ -119,7 +119,7 @@ static FORCE_INLINE void* tDecodeDataBlock(void* buf, SSDataBlock* pBlock) { buf = taosDecodeFixedI32(buf, &sz); pBlock->pDataBlock = taosArrayInit(sz, sizeof(SColumnInfoData)); for (int32_t i = 0; i < sz; i++) { - SColumnInfoData data; + SColumnInfoData data = {0}; buf = taosDecodeFixedI16(buf, &data.info.colId); buf = taosDecodeFixedI16(buf, &data.info.type); buf = taosDecodeFixedI16(buf, &data.info.bytes); @@ -167,7 +167,7 @@ static FORCE_INLINE void* tDecodeSMqConsumeRsp(void* buf, SMqConsumeRsp* pRsp) { buf = taosDecodeFixedI32(buf, &sz); pRsp->pBlockData = taosArrayInit(sz, sizeof(SSDataBlock)); for (int32_t i = 0; i < sz; i++) { - SSDataBlock block; + SSDataBlock block = {0}; tDecodeDataBlock(buf, &block); taosArrayPush(pRsp->pBlockData, &block); } diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 26177f466e..421c8fae30 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -462,7 +462,7 @@ int32_t tmq_poll_cb_inner(void* param, const SDataBuf* pMsg, int32_t code) { char pBuf[128]; SMqConsumeCbParam* pParam = (SMqConsumeCbParam*)param; SMqClientVg* pVg = pParam->pVg; - SMqConsumeRsp rsp; + SMqConsumeRsp rsp = {0}; tDecodeSMqConsumeRsp(pMsg->pData, &rsp); if (rsp.numOfTopics == 0) { /*printf("no data\n");*/ diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 8e62d48e61..47bdefea3d 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -69,7 +69,7 @@ int32_t mndInitSubscribe(SMnode *pMnode) { static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) { SMnode *pMnode = pMsg->pMnode; SMqCMGetSubEpReq *pReq = (SMqCMGetSubEpReq *)pMsg->rpcMsg.pCont; - SMqCMGetSubEpRsp rsp; + SMqCMGetSubEpRsp rsp = {0}; int64_t consumerId = be64toh(pReq->consumerId); int64_t currentTs = taosGetTimestampMs(); @@ -134,7 +134,7 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) { } void *abuf = buf; tEncodeSMqCMGetSubEpRsp(&abuf, &rsp); - // TODO: free rsp + tDeleteSMqCMGetSubEpRsp(&rsp); pMsg->pCont = buf; pMsg->contLen = tlen; return 0; diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index 6b4cb4ba59..33def69a68 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -240,15 +240,14 @@ static int32_t mndCreateTopic(SMnode *pMnode, SMnodeMsg *pMsg, SCMCreateTopicReq topicObj.uid = mndGenerateUid(pCreate->name, strlen(pCreate->name)); topicObj.dbUid = pDb->uid; topicObj.version = 1; - topicObj.sql = strdup(pCreate->sql); - topicObj.physicalPlan = strdup(pCreate->physicalPlan); - topicObj.logicalPlan = strdup(pCreate->logicalPlan); + topicObj.sql = pCreate->sql; + topicObj.physicalPlan = pCreate->physicalPlan; + topicObj.logicalPlan = pCreate->logicalPlan; topicObj.sqlLen = strlen(pCreate->sql); SSdbRaw *pTopicRaw = mndTopicActionEncode(&topicObj); if (pTopicRaw == NULL) return -1; if (sdbSetRawStatus(pTopicRaw, SDB_STATUS_READY) != 0) return -1; - // TODO: replace with trans to support recovery return sdbWrite(pMnode->pSdb, pTopicRaw); }