diff --git a/include/client/taos.h b/include/client/taos.h index 3db9046119..029aad8715 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -92,17 +92,6 @@ typedef struct taosField { typedef void (*__taos_async_fn_t)(void *param, TAOS_RES *, int code); -typedef struct tmq_t tmq_t; -typedef struct tmq_conf_t tmq_conf_t; -typedef struct tmq_list_t tmq_list_t; - -typedef struct tmq_message_t tmq_message_t; -typedef struct tmq_message_topic_t tmq_message_topic_t; -typedef struct tmq_message_tb_t tmq_message_tb_t; -typedef struct tmq_tb_iter_t tmq_tb_iter_t; -typedef struct tmq_message_col_t tmq_message_col_t; -typedef struct tmq_col_iter_t tmq_col_iter_t; - typedef struct TAOS_BIND { int buffer_type; void * buffer; @@ -205,27 +194,59 @@ DLL_EXPORT int taos_load_table_info(TAOS *taos, const char* tableNameList); DLL_EXPORT TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int protocol, int precision); /* --------------------------TMQ INTERFACE------------------------------- */ -typedef struct tmq_resp_err_t tmq_resp_err_t; + +enum tmq_resp_err_t { + TMQ_RESP_ERR__SUCCESS = 0, + TMQ_RESP_ERR__FAIL = 1, +}; + +typedef enum tmq_resp_err_t tmq_resp_err_t; + +typedef struct tmq_t tmq_t; typedef struct tmq_topic_vgroup_t tmq_topic_vgroup_t; typedef struct tmq_topic_vgroup_list_t tmq_topic_vgroup_list_t; +typedef struct tmq_conf_t tmq_conf_t; +typedef struct tmq_list_t tmq_list_t; +typedef struct tmq_message_t tmq_message_t; + typedef void (tmq_commit_cb(tmq_t*, tmq_resp_err_t, tmq_topic_vgroup_list_t*, void* param)); DLL_EXPORT tmq_list_t* tmq_list_new(); DLL_EXPORT int32_t tmq_list_append(tmq_list_t*, char*); -DLL_EXPORT tmq_conf_t* tmq_conf_new(); - -DLL_EXPORT int32_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value); -DLL_EXPORT void tmq_conf_set_offset_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb); - DLL_EXPORT TAOS_RES *taos_create_topic(TAOS* taos, const char* name, const char* sql, int sqlLen); - DLL_EXPORT tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errstrLen); -DLL_EXPORT TAOS_RES* tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list); - +/* ------------------------TMQ CONSUMER INTERFACE------------------------ */ +DLL_EXPORT tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list); +#if 0 +DLL_EXPORT tmq_resp_err_t tmq_unsubscribe(tmq_t* tmq); +DLL_EXPORT tmq_resp_err_t tmq_subscription(tmq_t* tmq, tmq_topic_vgroup_list_t** topics); +#endif DLL_EXPORT tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time); -DLL_EXPORT tmq_resp_err_t* tmq_commit(tmq_t* tmq, tmq_topic_vgroup_list_t* tmq_topic_vgroup_list, int32_t async); +#if 0 +DLL_EXPORT tmq_resp_err_t tmq_consumer_close(tmq_t* tmq); +DLL_EXPORT tmq_resp_err_t tmq_assign(tmq_t* tmq, const tmq_topic_vgroup_list_t* vgroups); +DLL_EXPORT tmq_resp_err_t tmq_assignment(tmq_t* tmq, tmq_topic_vgroup_list_t** vgroups); +#endif +DLL_EXPORT tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int32_t async); +#if 0 +DLL_EXPORT tmq_resp_err_t tmq_commit_message(tmq_t* tmq, const tmq_message_t* tmqmessage, int32_t async); +#endif +/* ----------------------TMQ CONFIGURATION INTERFACE---------------------- */ + +enum tmq_conf_res_t { + TMQ_CONF_UNKNOWN = -2, + TMQ_CONF_INVALID = -1, + TMQ_CONF_OK = 0, +}; + +typedef enum tmq_conf_res_t tmq_conf_res_t; + +DLL_EXPORT tmq_conf_t* tmq_conf_new(); +DLL_EXPORT void tmq_conf_destroy(tmq_conf_t* conf); +DLL_EXPORT tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value); +DLL_EXPORT void tmq_conf_set_offset_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb); #ifdef __cplusplus } diff --git a/include/common/common.h b/include/common/common.h index bbde65b48f..31f905d47f 100644 --- a/include/common/common.h +++ b/include/common/common.h @@ -119,7 +119,7 @@ static FORCE_INLINE void* tDecodeDataBlock(void* buf, SSDataBlock* pBlock) { buf = taosDecodeFixedI32(buf, &sz); pBlock->pDataBlock = taosArrayInit(sz, sizeof(SColumnInfoData)); for (int32_t i = 0; i < sz; i++) { - SColumnInfoData data; + SColumnInfoData data = {0}; buf = taosDecodeFixedI16(buf, &data.info.colId); buf = taosDecodeFixedI16(buf, &data.info.type); buf = taosDecodeFixedI16(buf, &data.info.bytes); @@ -167,13 +167,46 @@ static FORCE_INLINE void* tDecodeSMqConsumeRsp(void* buf, SMqConsumeRsp* pRsp) { buf = taosDecodeFixedI32(buf, &sz); pRsp->pBlockData = taosArrayInit(sz, sizeof(SSDataBlock)); for (int32_t i = 0; i < sz; i++) { - SSDataBlock block; + SSDataBlock block = {0}; tDecodeDataBlock(buf, &block); taosArrayPush(pRsp->pBlockData, &block); } return buf; } +static FORCE_INLINE void tDeleteSSDataBlock(SSDataBlock* pBlock) { + if (pBlock == NULL) { + return; + } + + //int32_t numOfOutput = pBlock->info.numOfCols; + int32_t sz = taosArrayGetSize(pBlock->pDataBlock); + for(int32_t i = 0; i < sz; ++i) { + SColumnInfoData* pColInfoData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i); + tfree(pColInfoData->pData); + } + + taosArrayDestroy(pBlock->pDataBlock); + tfree(pBlock->pBlockAgg); + //tfree(pBlock); +} + + +static FORCE_INLINE void tDeleteSMqConsumeRsp(SMqConsumeRsp* pRsp) { + if (pRsp->schemas) { + if (pRsp->schemas->nCols) { + tfree(pRsp->schemas->pSchema); + } + free(pRsp->schemas); + } + taosArrayDestroyEx(pRsp->pBlockData, (void(*)(void*))tDeleteSSDataBlock); + pRsp->pBlockData = NULL; + //for (int i = 0; i < taosArrayGetSize(pRsp->pBlockData); i++) { + //SSDataBlock* pDataBlock = (SSDataBlock*)taosArrayGet(pRsp->pBlockData, i); + //tDeleteSSDataBlock(pDataBlock); + //} +} + //====================================================================================================================== // the following structure shared by parser and executor typedef struct SColumn { diff --git a/include/common/tmsg.h b/include/common/tmsg.h index db184b18cd..371f8cfca2 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1752,6 +1752,10 @@ typedef struct SMqCMGetSubEpRsp { SArray* topics; // SArray } SMqCMGetSubEpRsp; +static FORCE_INLINE void tDeleteSMqSubTopicEp(SMqSubTopicEp* pSubTopicEp) { + taosArrayDestroy(pSubTopicEp->vgs); +} + static FORCE_INLINE int32_t tEncodeSMqSubVgEp(void** buf, const SMqSubVgEp* pVgEp) { int32_t tlen = 0; tlen += taosEncodeFixedI32(buf, pVgEp->vgId); @@ -1765,6 +1769,10 @@ static FORCE_INLINE void* tDecodeSMqSubVgEp(void* buf, SMqSubVgEp* pVgEp) { return buf; } +static FORCE_INLINE void tDeleteSMqCMGetSubEpRsp(SMqCMGetSubEpRsp* pRsp) { + taosArrayDestroyEx(pRsp->topics, (void (*)(void*)) tDeleteSMqSubTopicEp); +} + static FORCE_INLINE int32_t tEncodeSMqSubTopicEp(void** buf, const SMqSubTopicEp* pTopicEp) { int32_t tlen = 0; tlen += taosEncodeString(buf, pTopicEp->topic); diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 632b1dddb4..421c8fae30 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -26,6 +26,51 @@ #include "tpagedfile.h" #include "tref.h" +struct tmq_list_t { + int32_t cnt; + int32_t tot; + char* elems[]; +}; +struct tmq_topic_vgroup_t { + char* topic; + int32_t vgId; + int64_t commitOffset; +}; + +struct tmq_topic_vgroup_list_t { + int32_t cnt; + int32_t size; + tmq_topic_vgroup_t* elems; +}; + +struct tmq_conf_t { + char clientId[256]; + char groupId[256]; + /*char* ip;*/ + /*uint16_t port;*/ + tmq_commit_cb* commit_cb; +}; + +struct tmq_t { + char groupId[256]; + char clientId[256]; + SRWLatch lock; + int64_t consumerId; + int64_t epoch; + int64_t status; + tsem_t rspSem; + STscObj* pTscObj; + tmq_commit_cb* commit_cb; + int32_t nextTopicIdx; + SArray* clientTopics; //SArray + //stat + int64_t pollCnt; +}; + +struct tmq_message_t { + SMqConsumeRsp rsp; +}; + typedef struct SMqClientVg { // statistics int64_t pollCnt; @@ -47,83 +92,43 @@ typedef struct SMqClientTopic { SArray* vgs; //SArray } SMqClientTopic; + typedef struct SMqAskEpCbParam { tmq_t* tmq; int32_t wait; } SMqAskEpCbParam; -struct tmq_resp_err_t { - int32_t code; -}; - -struct tmq_topic_vgroup_t { - char* topic; - int32_t vgId; - int64_t commitOffset; -}; - -struct tmq_topic_vgroup_list_t { - int32_t cnt; - int32_t size; - tmq_topic_vgroup_t* elems; -}; - typedef struct SMqConsumeCbParam { tmq_t* tmq; SMqClientVg* pVg; tmq_message_t** retMsg; } SMqConsumeCbParam; -struct tmq_conf_t { - char clientId[256]; - char groupId[256]; - char* ip; - uint16_t port; - tmq_commit_cb* commit_cb; -}; - -struct tmq_message_t { - SMqConsumeRsp rsp; -}; - +typedef struct SMqSubscribeCbParam { + tmq_t* tmq; + tsem_t rspSem; + tmq_resp_err_t rspErr; +} SMqSubscribeCbParam; tmq_conf_t* tmq_conf_new() { tmq_conf_t* conf = calloc(1, sizeof(tmq_conf_t)); return conf; } -int32_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value) { +void tmq_conf_destroy(tmq_conf_t* conf) { + if(conf) free(conf); +} + +tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value) { if (strcmp(key, "group.id") == 0) { strcpy(conf->groupId, value); } if (strcmp(key, "client.id") == 0) { strcpy(conf->clientId, value); } - return 0; + return TMQ_CONF_OK; } -struct tmq_t { - char groupId[256]; - char clientId[256]; - SRWLatch lock; - int64_t consumerId; - int64_t epoch; - int64_t status; - tsem_t rspSem; - STscObj* pTscObj; - tmq_commit_cb* commit_cb; - int32_t nextTopicIdx; - SArray* clientTopics; //SArray - //stat - int64_t pollCnt; -}; - -struct tmq_list_t { - int32_t cnt; - int32_t tot; - char* elems[]; -}; - tmq_list_t* tmq_list_new() { tmq_list_t *ptr = malloc(sizeof(tmq_list_t) + 8 * sizeof(char*)); if (ptr == NULL) { @@ -141,6 +146,12 @@ int32_t tmq_list_append(tmq_list_t* ptr, char* src) { return 0; } +int32_t tmqSubscribeCb(void* param, const SDataBuf* pMsg, int32_t code) { + SMqSubscribeCbParam* pParam = (SMqSubscribeCbParam*)param; + pParam->rspErr = code; + tsem_post(&pParam->rspSem); + return 0; +} tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errstrLen) { tmq_t* pTmq = calloc(sizeof(tmq_t), 1); @@ -161,7 +172,7 @@ tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errs return pTmq; } -TAOS_RES* tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) { +tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) { SRequestObj *pRequest = NULL; int32_t sz = topic_list->cnt; //destroy ex @@ -219,27 +230,31 @@ TAOS_RES* tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) { tscError("failed to malloc sqlObj"); } + SMqSubscribeCbParam param = { + .rspErr = TMQ_RESP_ERR__SUCCESS, + .tmq = tmq + }; + tsem_init(¶m.rspSem, 0, 0); + pRequest->body.requestMsg = (SDataBuf){ .pData = buf, .len = tlen }; SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest); - /*sendInfo->fp*/ + sendInfo->param = ¶m; + sendInfo->fp = tmqSubscribeCb; SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp); int64_t transporterId = 0; asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); - tsem_wait(&pRequest->body.rspSem); + tsem_wait(¶m.rspSem); + tsem_destroy(¶m.rspSem); _return: /*if (sendInfo != NULL) {*/ /*destroySendMsgInfo(sendInfo);*/ /*}*/ - if (pRequest != NULL && terrno != TSDB_CODE_SUCCESS) { - pRequest->code = terrno; - } - - return pRequest; + return param.rspErr; } void tmq_conf_set_offset_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb) { @@ -441,15 +456,17 @@ static char *formatTimestamp(char *buf, int64_t val, int precision) { int32_t tmq_poll_cb_inner(void* param, const SDataBuf* pMsg, int32_t code) { if (code == -1) { printf("msg discard\n"); + free(param); return 0; } char pBuf[128]; SMqConsumeCbParam* pParam = (SMqConsumeCbParam*)param; SMqClientVg* pVg = pParam->pVg; - SMqConsumeRsp rsp; + SMqConsumeRsp rsp = {0}; tDecodeSMqConsumeRsp(pMsg->pData, &rsp); if (rsp.numOfTopics == 0) { /*printf("no data\n");*/ + free(param); return 0; } int32_t colNum = rsp.schemas->nCols; @@ -486,6 +503,8 @@ int32_t tmq_poll_cb_inner(void* param, const SDataBuf* pMsg, int32_t code) { printf("\n"); } } + tDeleteSMqConsumeRsp(&rsp); + free(param); /*printf("\n-----msg end------\n");*/ return 0; } @@ -542,6 +561,7 @@ int32_t tmq_ask_ep_cb(void* param, const SDataBuf* pMsg, int32_t code) { if (pParam->wait) { tsem_post(&tmq->rspSem); } + tDeleteSMqCMGetSubEpRsp(&rsp); free(pParam); return 0; } @@ -550,6 +570,7 @@ int32_t tmqAsyncAskEp(tmq_t* tmq, bool wait) { int32_t tlen = sizeof(SMqCMGetSubEpReq); SMqCMGetSubEpReq* buf = malloc(tlen); if (buf == NULL) { + goto END; tscError("failed to malloc get subscribe ep buf"); } buf->consumerId = htobe64(tmq->consumerId); @@ -557,6 +578,7 @@ int32_t tmqAsyncAskEp(tmq_t* tmq, bool wait) { SRequestObj *pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_MND_GET_SUB_EP); if (pRequest == NULL) { + goto END; tscError("failed to malloc subscribe ep request"); } @@ -564,7 +586,6 @@ int32_t tmqAsyncAskEp(tmq_t* tmq, bool wait) { SMqAskEpCbParam *pParam = malloc(sizeof(SMqAskEpCbParam)); if (pParam == NULL) { - free(buf); goto END; } pParam->tmq = tmq; @@ -611,10 +632,10 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { tmq_message_t* tmq_message = NULL; int64_t status = atomic_load_64(&tmq->status); - tmqAsyncAskEp(tmq, status == 0 || taosArrayGetSize(tmq->clientTopics)); + tmqAsyncAskEp(tmq, taosArrayGetSize(tmq->clientTopics)); /*if (blocking_time < 0) blocking_time = 500;*/ - blocking_time = 1000; + blocking_time = 1; if (taosArrayGetSize(tmq->clientTopics) == 0) { tscDebug("consumer:%ld poll but not assigned", tmq->consumerId); @@ -674,16 +695,15 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { /*return pRequest;*/ } -tmq_resp_err_t* tmq_commit(tmq_t* tmq, tmq_topic_vgroup_list_t* tmq_topic_vgroup_list, int32_t async) { +tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* tmq_topic_vgroup_list, int32_t async) { SMqConsumeReq req = {0}; - return NULL; + return 0; } void tmq_message_destroy(tmq_message_t* tmq_message) { if (tmq_message == NULL) return; } - static void destroySendMsgInfo(SMsgSendInfo* pMsgBody) { assert(pMsgBody != NULL); tfree(pMsgBody->msgInfo.pData); diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index c42b023d12..7ef06397f0 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -606,7 +606,7 @@ TEST(testCase, create_topic_stb_Test) { taos_free_result(pRes); - char* sql = "select ts, k from st1"; + char* sql = "select * from st1"; pRes = taos_create_topic(pConn, "test_stb_topic_1", sql, strlen(sql)); taos_free_result(pRes); taos_close(pConn); diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 9cce25ed6a..a9a633cf63 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -393,6 +393,12 @@ static FORCE_INLINE void* tDecodeSMqConsumerEp(void** buf, SMqConsumerEp* pConsu return buf; } +static FORCE_INLINE void tDeleteSMqConsumerEp(SMqConsumerEp* pConsumerEp) { + if (pConsumerEp) { + tfree(pConsumerEp->qmsg); + } +} + // unit for rebalance typedef struct SMqSubscribeObj { char key[TSDB_SUBSCRIBE_KEY_LEN]; @@ -520,7 +526,7 @@ static FORCE_INLINE void* tDecodeSubscribeObj(void* buf, SMqSubscribeObj* pSub) return NULL; } for (int32_t i = 0; i < sz; i++) { - SMqConsumerEp cEp; + SMqConsumerEp cEp = {0}; buf = tDecodeSMqConsumerEp(buf, &cEp); taosArrayPush(pSub->assigned, &cEp); } @@ -533,7 +539,7 @@ static FORCE_INLINE void* tDecodeSubscribeObj(void* buf, SMqSubscribeObj* pSub) return NULL; } for (int32_t i = 0; i < sz; i++) { - SMqConsumerEp cEp; + SMqConsumerEp cEp = {0}; buf = tDecodeSMqConsumerEp(buf, &cEp); taosArrayPush(pSub->lostConsumer, &cEp); } @@ -547,7 +553,7 @@ static FORCE_INLINE void* tDecodeSubscribeObj(void* buf, SMqSubscribeObj* pSub) return NULL; } for (int32_t i = 0; i < sz; i++) { - SMqConsumerEp cEp; + SMqConsumerEp cEp = {0}; buf = tDecodeSMqConsumerEp(buf, &cEp); taosArrayPush(pSub->idleConsumer, &cEp); } @@ -563,7 +569,7 @@ static FORCE_INLINE void* tDecodeSubscribeObj(void* buf, SMqSubscribeObj* pSub) return NULL; } for (int32_t i = 0; i < sz; i++) { - SMqConsumerEp cEp; + SMqConsumerEp cEp = {0}; buf = tDecodeSMqConsumerEp(buf, &cEp); taosArrayPush(pSub->unassignedVg, &cEp); } @@ -571,6 +577,29 @@ static FORCE_INLINE void* tDecodeSubscribeObj(void* buf, SMqSubscribeObj* pSub) return buf; } +static FORCE_INLINE void tDeleteSMqSubscribeObj(SMqSubscribeObj* pSub) { + if (pSub->availConsumer) { + taosArrayDestroy(pSub->availConsumer); + pSub->availConsumer = NULL; + } + if (pSub->assigned) { + taosArrayDestroyEx(pSub->assigned, (void (*)(void*))tDeleteSMqConsumerEp); + pSub->assigned = NULL; + } + if (pSub->unassignedVg) { + taosArrayDestroyEx(pSub->unassignedVg, (void (*)(void*))tDeleteSMqConsumerEp); + pSub->unassignedVg = NULL; + } + if (pSub->idleConsumer) { + taosArrayDestroyEx(pSub->idleConsumer, (void (*)(void*))tDeleteSMqConsumerEp); + pSub->idleConsumer = NULL; + } + if (pSub->lostConsumer) { + taosArrayDestroyEx(pSub->lostConsumer, (void (*)(void*))tDeleteSMqConsumerEp); + pSub->lostConsumer = NULL; + } +} + typedef struct SMqCGroup { char name[TSDB_CONSUMER_GROUP_LEN]; int32_t status; // 0 - uninitialized, 1 - wait rebalance, 2- normal diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index e642b578fa..6fd61baee3 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -75,6 +75,7 @@ SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer) { terrno = TSDB_CODE_SUCCESS; CM_ENCODE_OVER: + tfree(buf); if (terrno != 0) { mError("consumer:%ld, failed to encode to raw:%p since %s", pConsumer->consumerId, pRaw, terrstr()); sdbFreeRaw(pRaw); @@ -117,6 +118,7 @@ SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) { terrno = TSDB_CODE_SUCCESS; CM_DECODE_OVER: + tfree(buf); if (terrno != TSDB_CODE_SUCCESS) { mError("consumer:%ld, failed to decode from raw:%p since %s", pConsumer->consumerId, pRaw, terrstr()); tfree(pRow); diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index fa8c78f4b1..47bdefea3d 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -69,7 +69,7 @@ int32_t mndInitSubscribe(SMnode *pMnode) { static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) { SMnode *pMnode = pMsg->pMnode; SMqCMGetSubEpReq *pReq = (SMqCMGetSubEpReq *)pMsg->rpcMsg.pCont; - SMqCMGetSubEpRsp rsp; + SMqCMGetSubEpRsp rsp = {0}; int64_t consumerId = be64toh(pReq->consumerId); int64_t currentTs = taosGetTimestampMs(); @@ -122,7 +122,7 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) { if (changed || found) { SSdbRaw *pRaw = mndSubActionEncode(pSub); sdbSetRawStatus(pRaw, SDB_STATUS_READY); - sdbWriteNotFree(pMnode->pSdb, pRaw); + sdbWrite(pMnode->pSdb, pRaw); } mndReleaseSubscribe(pMnode, pSub); } @@ -134,7 +134,7 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) { } void *abuf = buf; tEncodeSMqCMGetSubEpRsp(&abuf, &rsp); - // TODO: free rsp + tDeleteSMqCMGetSubEpRsp(&rsp); pMsg->pCont = buf; pMsg->contLen = tlen; return 0; @@ -292,7 +292,7 @@ static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unas return -1; } - SMqConsumerEp CEp; + SMqConsumerEp CEp = {0}; CEp.status = 0; CEp.consumerId = -1; CEp.lastConsumerHbTs = CEp.lastVgHbTs = -1; @@ -388,6 +388,7 @@ static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *pSub) { terrno = TSDB_CODE_SUCCESS; SUB_ENCODE_OVER: + tfree(buf); if (terrno != 0) { mError("subscribe:%s, failed to encode to raw:%p since %s", pSub->key, pRaw, terrstr()); sdbFreeRaw(pRaw); @@ -431,6 +432,7 @@ static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw) { terrno = TSDB_CODE_SUCCESS; SUB_DECODE_OVER: + tfree(buf); if (terrno != TSDB_CODE_SUCCESS) { mError("subscribe:%s, failed to decode from raw:%p since %s", pSub->key, pRaw, terrstr()); // TODO free subscribeobj @@ -448,6 +450,7 @@ static int32_t mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *pSub) { static int32_t mndSubActionDelete(SSdb *pSdb, SMqSubscribeObj *pSub) { mTrace("subscribe:%s, perform delete action", pSub->key); + tDeleteSMqSubscribeObj(pSub); return 0; } diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index 6b4cb4ba59..33def69a68 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -240,15 +240,14 @@ static int32_t mndCreateTopic(SMnode *pMnode, SMnodeMsg *pMsg, SCMCreateTopicReq topicObj.uid = mndGenerateUid(pCreate->name, strlen(pCreate->name)); topicObj.dbUid = pDb->uid; topicObj.version = 1; - topicObj.sql = strdup(pCreate->sql); - topicObj.physicalPlan = strdup(pCreate->physicalPlan); - topicObj.logicalPlan = strdup(pCreate->logicalPlan); + topicObj.sql = pCreate->sql; + topicObj.physicalPlan = pCreate->physicalPlan; + topicObj.logicalPlan = pCreate->logicalPlan; topicObj.sqlLen = strlen(pCreate->sql); SSdbRaw *pTopicRaw = mndTopicActionEncode(&topicObj); if (pTopicRaw == NULL) return -1; if (sdbSetRawStatus(pTopicRaw, SDB_STATUS_READY) != 0) return -1; - // TODO: replace with trans to support recovery return sdbWrite(pMnode->pSdb, pTopicRaw); } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index ade623a736..139120a46d 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -785,6 +785,15 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { } void* abuf = buf; tEncodeSMqConsumeRsp(&abuf, &rsp); + if (rsp.pBlockData) { + taosArrayDestroyEx(rsp.pBlockData, (void(*)(void*))tDeleteSSDataBlock); + rsp.pBlockData = NULL; + /*for (int i = 0; i < taosArrayGetSize(rsp.pBlockData); i++) {*/ + /*SSDataBlock* pBlock = taosArrayGet(rsp.pBlockData, i);*/ + /*tDeleteSSDataBlock(pBlock);*/ + /*}*/ + /*taosArrayDestroy(rsp.pBlockData);*/ + } pMsg->pCont = buf; pMsg->contLen = tlen; pMsg->code = 0; @@ -916,6 +925,11 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) { int32_t numOfCols = pHandle->pSchema->numOfCols; int32_t colNumNeed = taosArrayGetSize(pHandle->pColIdList); + //TODO: stable case + if (colNumNeed > pSchemaWrapper->nCols) { + colNumNeed = pSchemaWrapper->nCols; + } + SArray* pArray = taosArrayInit(colNumNeed, sizeof(SColumnInfoData)); if (pArray == NULL) { return NULL; @@ -928,7 +942,6 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) { j++; } SSchema* pColSchema = &pSchemaWrapper->pSchema[j]; - ASSERT(pColSchema->colId == colId); SColumnInfoData colInfo = {0}; int sz = numOfRows * pColSchema->bytes; colInfo.info.bytes = pColSchema->bytes; diff --git a/source/util/src/tlog.c b/source/util/src/tlog.c index c7b1350591..5ce69f1c2e 100644 --- a/source/util/src/tlog.c +++ b/source/util/src/tlog.c @@ -92,7 +92,7 @@ int32_t debugFlag = 0; int32_t sDebugFlag = 135; int32_t wDebugFlag = 135; int32_t tsdbDebugFlag = 131; -int32_t tqDebugFlag = 131; +int32_t tqDebugFlag = 135; int32_t cqDebugFlag = 131; int32_t fsDebugFlag = 135;