diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 3cabb030ad..a6dd51b035 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2385,7 +2385,7 @@ typedef struct { int64_t consumerId; int64_t waitTime; int64_t currentOffset; -} SMqPollReqV2; +} SMqPollReq; typedef struct { int32_t vgId; diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index a00fd4714e..7c873acadb 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -246,6 +246,12 @@ void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t pResInfo->numOfCols = numOfCols; // TODO handle memory leak + if (pResInfo->fields != NULL) { + taosMemoryFree(pResInfo->fields); + } + if (pResInfo->userFields != NULL) { + taosMemoryFree(pResInfo->userFields); + } pResInfo->fields = taosMemoryCalloc(numOfCols, sizeof(TAOS_FIELD)); pResInfo->userFields = taosMemoryCalloc(numOfCols, sizeof(TAOS_FIELD)); diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 585de7fe3f..d0f6a296d9 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -16,7 +16,6 @@ #include "clientInt.h" #include "clientLog.h" #include "parser.h" -#include "planner.h" #include "tdatablock.h" #include "tdef.h" #include "tglobal.h" @@ -175,7 +174,6 @@ typedef struct { int32_t epoch; int32_t vgId; tsem_t rspSem; - int32_t sync; } SMqPollCbParam; typedef struct { @@ -387,14 +385,16 @@ tmq_resp_err_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) { } for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) { SMqClientTopic* topic = taosArrayGetP(tmq->clientTopics, i); - tmq_list_append(*topics, strdup(topic->topicName)); + tmq_list_append(*topics, topic->topicName); } return TMQ_RESP_ERR__SUCCESS; } tmq_resp_err_t tmq_unsubscribe(tmq_t* tmq) { - tmq_list_t* lst = tmq_list_new(); - return tmq_subscribe(tmq, lst); + tmq_list_t* lst = tmq_list_new(); + tmq_resp_err_t rsp = tmq_subscribe(tmq, lst); + tmq_list_destroy(lst); + return rsp; } #if 0 @@ -657,6 +657,9 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { int64_t transporterId = 0; asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); + // avoid double free if msg is sent + buf = NULL; + tsem_wait(¶m.rspSem); tsem_destroy(¶m.rspSem); @@ -808,25 +811,6 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { tscWarn("mismatch rsp from vg %d, epoch %d, current epoch %d", pParam->vgId, msgEpoch, tmqEpoch); } -#if 0 - if (pParam->sync == 1) { - /**pParam->msg = taosMemoryMalloc(sizeof(tmq_message_t));*/ - *pParam->msg = taosAllocateQitem(sizeof(tmq_message_t)); - if (*pParam->msg) { - memcpy(*pParam->msg, pMsg->pData, sizeof(SMqRspHead)); - tDecodeSMqConsumeRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &((*pParam->msg)->consumeRsp)); - if ((*pParam->msg)->consumeRsp.numOfTopics != 0) { - pVg->currentOffset = (*pParam->msg)->consumeRsp.rspOffset; - } - taosWriteQitem(tmq->mqueue, *pParam->msg); - tsem_post(&pParam->rspSem); - return 0; - } - tsem_post(&pParam->rspSem); - return -1; - } -#endif - SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper)); if (pRspWrapper == NULL) { tscWarn("msg discard from vg %d, epoch %d since out of memory", pParam->vgId, pParam->epoch); @@ -1082,7 +1066,7 @@ tmq_resp_err_t tmq_seek(tmq_t* tmq, const tmq_topic_vgroup_t* offset) { return TMQ_RESP_ERR__FAIL; } -SMqPollReqV2* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t waitTime, SMqClientTopic* pTopic, SMqClientVg* pVg) { +SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t waitTime, SMqClientTopic* pTopic, SMqClientVg* pVg) { int64_t reqOffset; if (pVg->currentOffset >= 0) { reqOffset = pVg->currentOffset; @@ -1094,7 +1078,7 @@ SMqPollReqV2* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t waitTime, SMqClientTopi reqOffset = tmq->resetOffsetCfg; } - SMqPollReqV2* pReq = taosMemoryMalloc(sizeof(SMqPollReqV2)); + SMqPollReq* pReq = taosMemoryMalloc(sizeof(SMqPollReq)); if (pReq == NULL) { return NULL; } @@ -1114,7 +1098,7 @@ SMqPollReqV2* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t waitTime, SMqClientTopi pReq->reqId = generateRequestId(); pReq->head.vgId = htonl(pVg->vgId); - pReq->head.contLen = htonl(sizeof(SMqPollReqV2)); + pReq->head.contLen = htonl(sizeof(SMqPollReq)); return pReq; } @@ -1157,7 +1141,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t waitTime) { #endif } atomic_store_32(&pVg->vgSkipCnt, 0); - SMqPollReqV2* pReq = tmqBuildConsumeReqImpl(tmq, waitTime, pTopic, pVg); + SMqPollReq* pReq = tmqBuildConsumeReqImpl(tmq, waitTime, pTopic, pVg); if (pReq == NULL) { atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); tsem_post(&tmq->rspSem); @@ -1175,7 +1159,6 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t waitTime) { pParam->pTopic = pTopic; pParam->vgId = pVg->vgId; pParam->epoch = tmq->epoch; - pParam->sync = 0; SMsgSendInfo* sendInfo = taosMemoryMalloc(sizeof(SMsgSendInfo)); if (sendInfo == NULL) { @@ -1188,7 +1171,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t waitTime) { sendInfo->msgInfo = (SDataBuf){ .pData = pReq, - .len = sizeof(SMqPollReqV2), + .len = sizeof(SMqPollReq), .handle = NULL, }; sendInfo->requestId = pReq->reqId; @@ -1282,7 +1265,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t wait_time) { return (TAOS_RES*)rspObj; } - if (atomic_load_8(&tmq->status) != TMQ_CONSUMER_STATUS__READY) { + if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__INIT) { return NULL; } diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index b0461067e1..5eb89e8bb7 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -33,12 +33,12 @@ extern "C" { // tqDebug =================== // clang-format off -#define tqFatal(...) do { if (tqDebugFlag & DEBUG_FATAL) { taosPrintLog("TQ FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0) -#define tqError(...) do { if (tqDebugFlag & DEBUG_ERROR) { taosPrintLog("TQ ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while(0) -#define tqWarn(...) do { if (tqDebugFlag & DEBUG_WARN) { taosPrintLog("TQ WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} while(0) -#define tqInfo(...) do { if (tqDebugFlag & DEBUG_INFO) { taosPrintLog("TQ ", DEBUG_INFO, 255, __VA_ARGS__); }} while(0) -#define tqDebug(...) do { if (tqDebugFlag & DEBUG_DEBUG) { taosPrintLog("TQ ", DEBUG_DEBUG, tqDebugFlag, __VA_ARGS__); }} while(0) -#define tqTrace(...) do { if (tqDebugFlag & DEBUG_TRACE) { taosPrintLog("TQ ", DEBUG_TRACE, tqDebugFlag, __VA_ARGS__); }} while(0) +#define tqFatal(...) do { if (tqDebugFlag & DEBUG_FATAL) { taosPrintLog("TQ FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0) +#define tqError(...) do { if (tqDebugFlag & DEBUG_ERROR) { taosPrintLog("TQ ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while(0) +#define tqWarn(...) do { if (tqDebugFlag & DEBUG_WARN) { taosPrintLog("TQ WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} while(0) +#define tqInfo(...) do { if (tqDebugFlag & DEBUG_INFO) { taosPrintLog("TQ ", DEBUG_INFO, 255, __VA_ARGS__); }} while(0) +#define tqDebug(...) do { if (tqDebugFlag & DEBUG_DEBUG) { taosPrintLog("TQ ", DEBUG_DEBUG, tqDebugFlag, __VA_ARGS__); }} while(0) +#define tqTrace(...) do { if (tqDebugFlag & DEBUG_TRACE) { taosPrintLog("TQ ", DEBUG_TRACE, tqDebugFlag, __VA_ARGS__); }} while(0) // clang-format on #define TQ_BUFFER_SIZE 4 diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 31ab1390c6..7cee82f660 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -346,11 +346,11 @@ int32_t tqDeserializeConsumer(STQ* pTq, const STqSerializedHead* pHead, STqConsu } int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { - SMqPollReqV2* pReq = pMsg->pCont; - int64_t consumerId = pReq->consumerId; - int64_t waitTime = pReq->waitTime; - int32_t reqEpoch = pReq->epoch; - int64_t fetchOffset; + SMqPollReq* pReq = pMsg->pCont; + int64_t consumerId = pReq->consumerId; + int64_t waitTime = pReq->waitTime; + int32_t reqEpoch = pReq->epoch; + int64_t fetchOffset; // get offset to fetch message if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__EARLIEAST) { diff --git a/source/os/src/osMemory.c b/source/os/src/osMemory.c index 5b733daec2..73c37c28f7 100644 --- a/source/os/src/osMemory.c +++ b/source/os/src/osMemory.c @@ -19,7 +19,7 @@ #ifdef USE_TD_MEMORY -#define TD_MEMORY_SYMBOL ('T'<<24|'A'<<16|'O'<<8|'S') +#define TD_MEMORY_SYMBOL ('T' << 24 | 'A' << 16 | 'O' << 8 | 'S') #define TD_MEMORY_STACK_TRACE_DEPTH 10 @@ -28,7 +28,7 @@ typedef struct TdMemoryInfo *TdMemoryInfoPtr; typedef struct TdMemoryInfo { int32_t symbol; int32_t memorySize; - void *stackTrace[TD_MEMORY_STACK_TRACE_DEPTH]; // gdb: disassemble /m 0xXXX + void *stackTrace[TD_MEMORY_STACK_TRACE_DEPTH]; // gdb: disassemble /m 0xXXX // TdMemoryInfoPtr pNext; // TdMemoryInfoPtr pPrev; } TdMemoryInfo; @@ -36,11 +36,11 @@ typedef struct TdMemoryInfo { // static TdMemoryInfoPtr GlobalMemoryPtr = NULL; #ifdef WINDOWS - #define tstrdup(str) _strdup(str) +#define tstrdup(str) _strdup(str) #else - #define tstrdup(str) strdup(str) +#define tstrdup(str) strdup(str) -#include +#include #define STACKCALL __attribute__((regparm(1), noinline)) void **STACKCALL taosGetEbp(void) { @@ -54,9 +54,9 @@ void **STACKCALL taosGetEbp(void) { int32_t taosBackTrace(void **buffer, int32_t size) { int32_t frame = 0; - void **ebp; - void **ret = NULL; - size_t func_frame_distance = 0; + void **ebp; + void **ret = NULL; + size_t func_frame_distance = 0; if (buffer != NULL && size > 0) { ebp = taosGetEbp(); func_frame_distance = (size_t)*ebp - (size_t)ebp; @@ -89,9 +89,9 @@ void *taosMemoryMalloc(int32_t size) { TdMemoryInfoPtr pTdMemoryInfo = (TdMemoryInfoPtr)tmp; pTdMemoryInfo->memorySize = size; pTdMemoryInfo->symbol = TD_MEMORY_SYMBOL; - taosBackTrace(pTdMemoryInfo->stackTrace,TD_MEMORY_STACK_TRACE_DEPTH); + taosBackTrace(pTdMemoryInfo->stackTrace, TD_MEMORY_STACK_TRACE_DEPTH); - return (char*)tmp + sizeof(TdMemoryInfo); + return (char *)tmp + sizeof(TdMemoryInfo); #else return malloc(size); #endif @@ -100,15 +100,15 @@ void *taosMemoryMalloc(int32_t size) { void *taosMemoryCalloc(int32_t num, int32_t size) { #ifdef USE_TD_MEMORY int32_t memorySize = num * size; - char *tmp = calloc(memorySize + sizeof(TdMemoryInfo), 1); + char *tmp = calloc(memorySize + sizeof(TdMemoryInfo), 1); if (tmp == NULL) return NULL; TdMemoryInfoPtr pTdMemoryInfo = (TdMemoryInfoPtr)tmp; pTdMemoryInfo->memorySize = memorySize; pTdMemoryInfo->symbol = TD_MEMORY_SYMBOL; - taosBackTrace(pTdMemoryInfo->stackTrace,TD_MEMORY_STACK_TRACE_DEPTH); + taosBackTrace(pTdMemoryInfo->stackTrace, TD_MEMORY_STACK_TRACE_DEPTH); - return (char*)tmp + sizeof(TdMemoryInfo); + return (char *)tmp + sizeof(TdMemoryInfo); #else return calloc(num, size); #endif @@ -117,8 +117,8 @@ void *taosMemoryCalloc(int32_t num, int32_t size) { void *taosMemoryRealloc(void *ptr, int32_t size) { #ifdef USE_TD_MEMORY if (ptr == NULL) return taosMemoryMalloc(size); - - TdMemoryInfoPtr pTdMemoryInfo = (TdMemoryInfoPtr)((char*)ptr - sizeof(TdMemoryInfo)); + + TdMemoryInfoPtr pTdMemoryInfo = (TdMemoryInfoPtr)((char *)ptr - sizeof(TdMemoryInfo)); assert(pTdMemoryInfo->symbol == TD_MEMORY_SYMBOL); TdMemoryInfo tdMemoryInfo; @@ -126,11 +126,11 @@ void *taosMemoryRealloc(void *ptr, int32_t size) { void *tmp = realloc(pTdMemoryInfo, size + sizeof(TdMemoryInfo)); if (tmp == NULL) return NULL; - + memcpy(tmp, &tdMemoryInfo, sizeof(TdMemoryInfo)); ((TdMemoryInfoPtr)tmp)->memorySize = size; - return (char*)tmp + sizeof(TdMemoryInfo); + return (char *)tmp + sizeof(TdMemoryInfo); #else return realloc(ptr, size); #endif @@ -139,29 +139,26 @@ void *taosMemoryRealloc(void *ptr, int32_t size) { void *taosMemoryStrDup(void *ptr) { #ifdef USE_TD_MEMORY if (ptr == NULL) return NULL; - - TdMemoryInfoPtr pTdMemoryInfo = (TdMemoryInfoPtr)((char*)ptr - sizeof(TdMemoryInfo)); + + TdMemoryInfoPtr pTdMemoryInfo = (TdMemoryInfoPtr)((char *)ptr - sizeof(TdMemoryInfo)); assert(pTdMemoryInfo->symbol == TD_MEMORY_SYMBOL); void *tmp = tstrdup((const char *)pTdMemoryInfo); if (tmp == NULL) return NULL; - - memcpy(tmp, pTdMemoryInfo, sizeof(TdMemoryInfo)); - taosBackTrace(((TdMemoryInfoPtr)tmp)->stackTrace,TD_MEMORY_STACK_TRACE_DEPTH); - return (char*)tmp + sizeof(TdMemoryInfo); + memcpy(tmp, pTdMemoryInfo, sizeof(TdMemoryInfo)); + taosBackTrace(((TdMemoryInfoPtr)tmp)->stackTrace, TD_MEMORY_STACK_TRACE_DEPTH); + + return (char *)tmp + sizeof(TdMemoryInfo); #else return tstrdup((const char *)ptr); #endif } - void taosMemoryFree(void *ptr) { - if (ptr == NULL) return; - #ifdef USE_TD_MEMORY - TdMemoryInfoPtr pTdMemoryInfo = (TdMemoryInfoPtr)((char*)ptr - sizeof(TdMemoryInfo)); - if(pTdMemoryInfo->symbol == TD_MEMORY_SYMBOL) { + TdMemoryInfoPtr pTdMemoryInfo = (TdMemoryInfoPtr)((char *)ptr - sizeof(TdMemoryInfo)); + if (pTdMemoryInfo->symbol == TD_MEMORY_SYMBOL) { pTdMemoryInfo->memorySize = 0; // memset(pTdMemoryInfo, 0, sizeof(TdMemoryInfo)); free(pTdMemoryInfo); @@ -177,7 +174,7 @@ int32_t taosMemorySize(void *ptr) { if (ptr == NULL) return 0; #ifdef USE_TD_MEMORY - TdMemoryInfoPtr pTdMemoryInfo = (TdMemoryInfoPtr)((char*)ptr - sizeof(TdMemoryInfo)); + TdMemoryInfoPtr pTdMemoryInfo = (TdMemoryInfoPtr)((char *)ptr - sizeof(TdMemoryInfo)); assert(pTdMemoryInfo->symbol == TD_MEMORY_SYMBOL); return pTdMemoryInfo->memorySize; diff --git a/tests/script/sh/deploy.sh b/tests/script/sh/deploy.sh index ec847dedbb..da295f640e 100755 --- a/tests/script/sh/deploy.sh +++ b/tests/script/sh/deploy.sh @@ -128,6 +128,7 @@ echo "debugFlag 0" >> $TAOS_CFG echo "mDebugFlag 143" >> $TAOS_CFG echo "dDebugFlag 143" >> $TAOS_CFG echo "vDebugFlag 143" >> $TAOS_CFG +echo "tqDebugFlag 143" >> $TAOS_CFG echo "tsdbDebugFlag 143" >> $TAOS_CFG echo "cDebugFlag 143" >> $TAOS_CFG echo "jniDebugFlag 143" >> $TAOS_CFG