Merge pull request #11911 from taosdata/feature/tq
fix: client memory leak
This commit is contained in:
commit
a196de93f1
|
@ -2385,7 +2385,7 @@ typedef struct {
|
|||
int64_t consumerId;
|
||||
int64_t waitTime;
|
||||
int64_t currentOffset;
|
||||
} SMqPollReqV2;
|
||||
} SMqPollReq;
|
||||
|
||||
typedef struct {
|
||||
int32_t vgId;
|
||||
|
|
|
@ -246,6 +246,12 @@ void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t
|
|||
|
||||
pResInfo->numOfCols = numOfCols;
|
||||
// TODO handle memory leak
|
||||
if (pResInfo->fields != NULL) {
|
||||
taosMemoryFree(pResInfo->fields);
|
||||
}
|
||||
if (pResInfo->userFields != NULL) {
|
||||
taosMemoryFree(pResInfo->userFields);
|
||||
}
|
||||
pResInfo->fields = taosMemoryCalloc(numOfCols, sizeof(TAOS_FIELD));
|
||||
pResInfo->userFields = taosMemoryCalloc(numOfCols, sizeof(TAOS_FIELD));
|
||||
|
||||
|
|
|
@ -16,7 +16,6 @@
|
|||
#include "clientInt.h"
|
||||
#include "clientLog.h"
|
||||
#include "parser.h"
|
||||
#include "planner.h"
|
||||
#include "tdatablock.h"
|
||||
#include "tdef.h"
|
||||
#include "tglobal.h"
|
||||
|
@ -175,7 +174,6 @@ typedef struct {
|
|||
int32_t epoch;
|
||||
int32_t vgId;
|
||||
tsem_t rspSem;
|
||||
int32_t sync;
|
||||
} SMqPollCbParam;
|
||||
|
||||
typedef struct {
|
||||
|
@ -387,14 +385,16 @@ tmq_resp_err_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) {
|
|||
}
|
||||
for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
|
||||
SMqClientTopic* topic = taosArrayGetP(tmq->clientTopics, i);
|
||||
tmq_list_append(*topics, strdup(topic->topicName));
|
||||
tmq_list_append(*topics, topic->topicName);
|
||||
}
|
||||
return TMQ_RESP_ERR__SUCCESS;
|
||||
}
|
||||
|
||||
tmq_resp_err_t tmq_unsubscribe(tmq_t* tmq) {
|
||||
tmq_list_t* lst = tmq_list_new();
|
||||
return tmq_subscribe(tmq, lst);
|
||||
tmq_list_t* lst = tmq_list_new();
|
||||
tmq_resp_err_t rsp = tmq_subscribe(tmq, lst);
|
||||
tmq_list_destroy(lst);
|
||||
return rsp;
|
||||
}
|
||||
|
||||
#if 0
|
||||
|
@ -657,6 +657,9 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
|
|||
int64_t transporterId = 0;
|
||||
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
|
||||
|
||||
// avoid double free if msg is sent
|
||||
buf = NULL;
|
||||
|
||||
tsem_wait(¶m.rspSem);
|
||||
tsem_destroy(¶m.rspSem);
|
||||
|
||||
|
@ -808,25 +811,6 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
|
|||
tscWarn("mismatch rsp from vg %d, epoch %d, current epoch %d", pParam->vgId, msgEpoch, tmqEpoch);
|
||||
}
|
||||
|
||||
#if 0
|
||||
if (pParam->sync == 1) {
|
||||
/**pParam->msg = taosMemoryMalloc(sizeof(tmq_message_t));*/
|
||||
*pParam->msg = taosAllocateQitem(sizeof(tmq_message_t));
|
||||
if (*pParam->msg) {
|
||||
memcpy(*pParam->msg, pMsg->pData, sizeof(SMqRspHead));
|
||||
tDecodeSMqConsumeRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &((*pParam->msg)->consumeRsp));
|
||||
if ((*pParam->msg)->consumeRsp.numOfTopics != 0) {
|
||||
pVg->currentOffset = (*pParam->msg)->consumeRsp.rspOffset;
|
||||
}
|
||||
taosWriteQitem(tmq->mqueue, *pParam->msg);
|
||||
tsem_post(&pParam->rspSem);
|
||||
return 0;
|
||||
}
|
||||
tsem_post(&pParam->rspSem);
|
||||
return -1;
|
||||
}
|
||||
#endif
|
||||
|
||||
SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper));
|
||||
if (pRspWrapper == NULL) {
|
||||
tscWarn("msg discard from vg %d, epoch %d since out of memory", pParam->vgId, pParam->epoch);
|
||||
|
@ -1082,7 +1066,7 @@ tmq_resp_err_t tmq_seek(tmq_t* tmq, const tmq_topic_vgroup_t* offset) {
|
|||
return TMQ_RESP_ERR__FAIL;
|
||||
}
|
||||
|
||||
SMqPollReqV2* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t waitTime, SMqClientTopic* pTopic, SMqClientVg* pVg) {
|
||||
SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t waitTime, SMqClientTopic* pTopic, SMqClientVg* pVg) {
|
||||
int64_t reqOffset;
|
||||
if (pVg->currentOffset >= 0) {
|
||||
reqOffset = pVg->currentOffset;
|
||||
|
@ -1094,7 +1078,7 @@ SMqPollReqV2* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t waitTime, SMqClientTopi
|
|||
reqOffset = tmq->resetOffsetCfg;
|
||||
}
|
||||
|
||||
SMqPollReqV2* pReq = taosMemoryMalloc(sizeof(SMqPollReqV2));
|
||||
SMqPollReq* pReq = taosMemoryMalloc(sizeof(SMqPollReq));
|
||||
if (pReq == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
@ -1114,7 +1098,7 @@ SMqPollReqV2* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t waitTime, SMqClientTopi
|
|||
pReq->reqId = generateRequestId();
|
||||
|
||||
pReq->head.vgId = htonl(pVg->vgId);
|
||||
pReq->head.contLen = htonl(sizeof(SMqPollReqV2));
|
||||
pReq->head.contLen = htonl(sizeof(SMqPollReq));
|
||||
return pReq;
|
||||
}
|
||||
|
||||
|
@ -1157,7 +1141,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t waitTime) {
|
|||
#endif
|
||||
}
|
||||
atomic_store_32(&pVg->vgSkipCnt, 0);
|
||||
SMqPollReqV2* pReq = tmqBuildConsumeReqImpl(tmq, waitTime, pTopic, pVg);
|
||||
SMqPollReq* pReq = tmqBuildConsumeReqImpl(tmq, waitTime, pTopic, pVg);
|
||||
if (pReq == NULL) {
|
||||
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
|
||||
tsem_post(&tmq->rspSem);
|
||||
|
@ -1175,7 +1159,6 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t waitTime) {
|
|||
pParam->pTopic = pTopic;
|
||||
pParam->vgId = pVg->vgId;
|
||||
pParam->epoch = tmq->epoch;
|
||||
pParam->sync = 0;
|
||||
|
||||
SMsgSendInfo* sendInfo = taosMemoryMalloc(sizeof(SMsgSendInfo));
|
||||
if (sendInfo == NULL) {
|
||||
|
@ -1188,7 +1171,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t waitTime) {
|
|||
|
||||
sendInfo->msgInfo = (SDataBuf){
|
||||
.pData = pReq,
|
||||
.len = sizeof(SMqPollReqV2),
|
||||
.len = sizeof(SMqPollReq),
|
||||
.handle = NULL,
|
||||
};
|
||||
sendInfo->requestId = pReq->reqId;
|
||||
|
@ -1282,7 +1265,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t wait_time) {
|
|||
return (TAOS_RES*)rspObj;
|
||||
}
|
||||
|
||||
if (atomic_load_8(&tmq->status) != TMQ_CONSUMER_STATUS__READY) {
|
||||
if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__INIT) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
|
|
@ -33,12 +33,12 @@ extern "C" {
|
|||
|
||||
// tqDebug ===================
|
||||
// clang-format off
|
||||
#define tqFatal(...) do { if (tqDebugFlag & DEBUG_FATAL) { taosPrintLog("TQ FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0)
|
||||
#define tqError(...) do { if (tqDebugFlag & DEBUG_ERROR) { taosPrintLog("TQ ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while(0)
|
||||
#define tqWarn(...) do { if (tqDebugFlag & DEBUG_WARN) { taosPrintLog("TQ WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} while(0)
|
||||
#define tqInfo(...) do { if (tqDebugFlag & DEBUG_INFO) { taosPrintLog("TQ ", DEBUG_INFO, 255, __VA_ARGS__); }} while(0)
|
||||
#define tqDebug(...) do { if (tqDebugFlag & DEBUG_DEBUG) { taosPrintLog("TQ ", DEBUG_DEBUG, tqDebugFlag, __VA_ARGS__); }} while(0)
|
||||
#define tqTrace(...) do { if (tqDebugFlag & DEBUG_TRACE) { taosPrintLog("TQ ", DEBUG_TRACE, tqDebugFlag, __VA_ARGS__); }} while(0)
|
||||
#define tqFatal(...) do { if (tqDebugFlag & DEBUG_FATAL) { taosPrintLog("TQ FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0)
|
||||
#define tqError(...) do { if (tqDebugFlag & DEBUG_ERROR) { taosPrintLog("TQ ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while(0)
|
||||
#define tqWarn(...) do { if (tqDebugFlag & DEBUG_WARN) { taosPrintLog("TQ WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} while(0)
|
||||
#define tqInfo(...) do { if (tqDebugFlag & DEBUG_INFO) { taosPrintLog("TQ ", DEBUG_INFO, 255, __VA_ARGS__); }} while(0)
|
||||
#define tqDebug(...) do { if (tqDebugFlag & DEBUG_DEBUG) { taosPrintLog("TQ ", DEBUG_DEBUG, tqDebugFlag, __VA_ARGS__); }} while(0)
|
||||
#define tqTrace(...) do { if (tqDebugFlag & DEBUG_TRACE) { taosPrintLog("TQ ", DEBUG_TRACE, tqDebugFlag, __VA_ARGS__); }} while(0)
|
||||
// clang-format on
|
||||
|
||||
#define TQ_BUFFER_SIZE 4
|
||||
|
|
|
@ -346,11 +346,11 @@ int32_t tqDeserializeConsumer(STQ* pTq, const STqSerializedHead* pHead, STqConsu
|
|||
}
|
||||
|
||||
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
||||
SMqPollReqV2* pReq = pMsg->pCont;
|
||||
int64_t consumerId = pReq->consumerId;
|
||||
int64_t waitTime = pReq->waitTime;
|
||||
int32_t reqEpoch = pReq->epoch;
|
||||
int64_t fetchOffset;
|
||||
SMqPollReq* pReq = pMsg->pCont;
|
||||
int64_t consumerId = pReq->consumerId;
|
||||
int64_t waitTime = pReq->waitTime;
|
||||
int32_t reqEpoch = pReq->epoch;
|
||||
int64_t fetchOffset;
|
||||
|
||||
// get offset to fetch message
|
||||
if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__EARLIEAST) {
|
||||
|
|
|
@ -19,7 +19,7 @@
|
|||
|
||||
#ifdef USE_TD_MEMORY
|
||||
|
||||
#define TD_MEMORY_SYMBOL ('T'<<24|'A'<<16|'O'<<8|'S')
|
||||
#define TD_MEMORY_SYMBOL ('T' << 24 | 'A' << 16 | 'O' << 8 | 'S')
|
||||
|
||||
#define TD_MEMORY_STACK_TRACE_DEPTH 10
|
||||
|
||||
|
@ -28,7 +28,7 @@ typedef struct TdMemoryInfo *TdMemoryInfoPtr;
|
|||
typedef struct TdMemoryInfo {
|
||||
int32_t symbol;
|
||||
int32_t memorySize;
|
||||
void *stackTrace[TD_MEMORY_STACK_TRACE_DEPTH]; // gdb: disassemble /m 0xXXX
|
||||
void *stackTrace[TD_MEMORY_STACK_TRACE_DEPTH]; // gdb: disassemble /m 0xXXX
|
||||
// TdMemoryInfoPtr pNext;
|
||||
// TdMemoryInfoPtr pPrev;
|
||||
} TdMemoryInfo;
|
||||
|
@ -36,11 +36,11 @@ typedef struct TdMemoryInfo {
|
|||
// static TdMemoryInfoPtr GlobalMemoryPtr = NULL;
|
||||
|
||||
#ifdef WINDOWS
|
||||
#define tstrdup(str) _strdup(str)
|
||||
#define tstrdup(str) _strdup(str)
|
||||
#else
|
||||
#define tstrdup(str) strdup(str)
|
||||
#define tstrdup(str) strdup(str)
|
||||
|
||||
#include<execinfo.h>
|
||||
#include <execinfo.h>
|
||||
|
||||
#define STACKCALL __attribute__((regparm(1), noinline))
|
||||
void **STACKCALL taosGetEbp(void) {
|
||||
|
@ -54,9 +54,9 @@ void **STACKCALL taosGetEbp(void) {
|
|||
|
||||
int32_t taosBackTrace(void **buffer, int32_t size) {
|
||||
int32_t frame = 0;
|
||||
void **ebp;
|
||||
void **ret = NULL;
|
||||
size_t func_frame_distance = 0;
|
||||
void **ebp;
|
||||
void **ret = NULL;
|
||||
size_t func_frame_distance = 0;
|
||||
if (buffer != NULL && size > 0) {
|
||||
ebp = taosGetEbp();
|
||||
func_frame_distance = (size_t)*ebp - (size_t)ebp;
|
||||
|
@ -89,9 +89,9 @@ void *taosMemoryMalloc(int32_t size) {
|
|||
TdMemoryInfoPtr pTdMemoryInfo = (TdMemoryInfoPtr)tmp;
|
||||
pTdMemoryInfo->memorySize = size;
|
||||
pTdMemoryInfo->symbol = TD_MEMORY_SYMBOL;
|
||||
taosBackTrace(pTdMemoryInfo->stackTrace,TD_MEMORY_STACK_TRACE_DEPTH);
|
||||
taosBackTrace(pTdMemoryInfo->stackTrace, TD_MEMORY_STACK_TRACE_DEPTH);
|
||||
|
||||
return (char*)tmp + sizeof(TdMemoryInfo);
|
||||
return (char *)tmp + sizeof(TdMemoryInfo);
|
||||
#else
|
||||
return malloc(size);
|
||||
#endif
|
||||
|
@ -100,15 +100,15 @@ void *taosMemoryMalloc(int32_t size) {
|
|||
void *taosMemoryCalloc(int32_t num, int32_t size) {
|
||||
#ifdef USE_TD_MEMORY
|
||||
int32_t memorySize = num * size;
|
||||
char *tmp = calloc(memorySize + sizeof(TdMemoryInfo), 1);
|
||||
char *tmp = calloc(memorySize + sizeof(TdMemoryInfo), 1);
|
||||
if (tmp == NULL) return NULL;
|
||||
|
||||
TdMemoryInfoPtr pTdMemoryInfo = (TdMemoryInfoPtr)tmp;
|
||||
pTdMemoryInfo->memorySize = memorySize;
|
||||
pTdMemoryInfo->symbol = TD_MEMORY_SYMBOL;
|
||||
taosBackTrace(pTdMemoryInfo->stackTrace,TD_MEMORY_STACK_TRACE_DEPTH);
|
||||
taosBackTrace(pTdMemoryInfo->stackTrace, TD_MEMORY_STACK_TRACE_DEPTH);
|
||||
|
||||
return (char*)tmp + sizeof(TdMemoryInfo);
|
||||
return (char *)tmp + sizeof(TdMemoryInfo);
|
||||
#else
|
||||
return calloc(num, size);
|
||||
#endif
|
||||
|
@ -117,8 +117,8 @@ void *taosMemoryCalloc(int32_t num, int32_t size) {
|
|||
void *taosMemoryRealloc(void *ptr, int32_t size) {
|
||||
#ifdef USE_TD_MEMORY
|
||||
if (ptr == NULL) return taosMemoryMalloc(size);
|
||||
|
||||
TdMemoryInfoPtr pTdMemoryInfo = (TdMemoryInfoPtr)((char*)ptr - sizeof(TdMemoryInfo));
|
||||
|
||||
TdMemoryInfoPtr pTdMemoryInfo = (TdMemoryInfoPtr)((char *)ptr - sizeof(TdMemoryInfo));
|
||||
assert(pTdMemoryInfo->symbol == TD_MEMORY_SYMBOL);
|
||||
|
||||
TdMemoryInfo tdMemoryInfo;
|
||||
|
@ -126,11 +126,11 @@ void *taosMemoryRealloc(void *ptr, int32_t size) {
|
|||
|
||||
void *tmp = realloc(pTdMemoryInfo, size + sizeof(TdMemoryInfo));
|
||||
if (tmp == NULL) return NULL;
|
||||
|
||||
|
||||
memcpy(tmp, &tdMemoryInfo, sizeof(TdMemoryInfo));
|
||||
((TdMemoryInfoPtr)tmp)->memorySize = size;
|
||||
|
||||
return (char*)tmp + sizeof(TdMemoryInfo);
|
||||
return (char *)tmp + sizeof(TdMemoryInfo);
|
||||
#else
|
||||
return realloc(ptr, size);
|
||||
#endif
|
||||
|
@ -139,29 +139,26 @@ void *taosMemoryRealloc(void *ptr, int32_t size) {
|
|||
void *taosMemoryStrDup(void *ptr) {
|
||||
#ifdef USE_TD_MEMORY
|
||||
if (ptr == NULL) return NULL;
|
||||
|
||||
TdMemoryInfoPtr pTdMemoryInfo = (TdMemoryInfoPtr)((char*)ptr - sizeof(TdMemoryInfo));
|
||||
|
||||
TdMemoryInfoPtr pTdMemoryInfo = (TdMemoryInfoPtr)((char *)ptr - sizeof(TdMemoryInfo));
|
||||
assert(pTdMemoryInfo->symbol == TD_MEMORY_SYMBOL);
|
||||
|
||||
void *tmp = tstrdup((const char *)pTdMemoryInfo);
|
||||
if (tmp == NULL) return NULL;
|
||||
|
||||
memcpy(tmp, pTdMemoryInfo, sizeof(TdMemoryInfo));
|
||||
taosBackTrace(((TdMemoryInfoPtr)tmp)->stackTrace,TD_MEMORY_STACK_TRACE_DEPTH);
|
||||
|
||||
return (char*)tmp + sizeof(TdMemoryInfo);
|
||||
memcpy(tmp, pTdMemoryInfo, sizeof(TdMemoryInfo));
|
||||
taosBackTrace(((TdMemoryInfoPtr)tmp)->stackTrace, TD_MEMORY_STACK_TRACE_DEPTH);
|
||||
|
||||
return (char *)tmp + sizeof(TdMemoryInfo);
|
||||
#else
|
||||
return tstrdup((const char *)ptr);
|
||||
#endif
|
||||
}
|
||||
|
||||
|
||||
void taosMemoryFree(void *ptr) {
|
||||
if (ptr == NULL) return;
|
||||
|
||||
#ifdef USE_TD_MEMORY
|
||||
TdMemoryInfoPtr pTdMemoryInfo = (TdMemoryInfoPtr)((char*)ptr - sizeof(TdMemoryInfo));
|
||||
if(pTdMemoryInfo->symbol == TD_MEMORY_SYMBOL) {
|
||||
TdMemoryInfoPtr pTdMemoryInfo = (TdMemoryInfoPtr)((char *)ptr - sizeof(TdMemoryInfo));
|
||||
if (pTdMemoryInfo->symbol == TD_MEMORY_SYMBOL) {
|
||||
pTdMemoryInfo->memorySize = 0;
|
||||
// memset(pTdMemoryInfo, 0, sizeof(TdMemoryInfo));
|
||||
free(pTdMemoryInfo);
|
||||
|
@ -177,7 +174,7 @@ int32_t taosMemorySize(void *ptr) {
|
|||
if (ptr == NULL) return 0;
|
||||
|
||||
#ifdef USE_TD_MEMORY
|
||||
TdMemoryInfoPtr pTdMemoryInfo = (TdMemoryInfoPtr)((char*)ptr - sizeof(TdMemoryInfo));
|
||||
TdMemoryInfoPtr pTdMemoryInfo = (TdMemoryInfoPtr)((char *)ptr - sizeof(TdMemoryInfo));
|
||||
assert(pTdMemoryInfo->symbol == TD_MEMORY_SYMBOL);
|
||||
|
||||
return pTdMemoryInfo->memorySize;
|
||||
|
|
|
@ -128,6 +128,7 @@ echo "debugFlag 0" >> $TAOS_CFG
|
|||
echo "mDebugFlag 143" >> $TAOS_CFG
|
||||
echo "dDebugFlag 143" >> $TAOS_CFG
|
||||
echo "vDebugFlag 143" >> $TAOS_CFG
|
||||
echo "tqDebugFlag 143" >> $TAOS_CFG
|
||||
echo "tsdbDebugFlag 143" >> $TAOS_CFG
|
||||
echo "cDebugFlag 143" >> $TAOS_CFG
|
||||
echo "jniDebugFlag 143" >> $TAOS_CFG
|
||||
|
|
Loading…
Reference in New Issue