From 800e7c4e7acd19ec33c2524d1fdde61e8ce64495 Mon Sep 17 00:00:00 2001 From: kailixu Date: Mon, 19 Aug 2024 17:27:15 +0800 Subject: [PATCH 1/8] fix: memory leak of geos --- source/client/src/clientMain.c | 2 + source/dnode/mgmt/node_mgmt/src/dmMgmt.c | 2 + source/util/src/tgeosctx.c | 92 +++++++++++++++++------- source/util/src/tsched.c | 2 +- source/util/src/tworker.c | 4 +- 5 files changed, 75 insertions(+), 27 deletions(-) diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 12702a93f3..a403f9d1c2 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -23,6 +23,7 @@ #include "query.h" #include "scheduler.h" #include "tdatablock.h" +#include "tgeosctx.h" #include "tglobal.h" #include "tmsg.h" #include "tref.h" @@ -94,6 +95,7 @@ void taos_cleanup(void) { tmqMgmtClose(); DestroyRegexCache(); + destroyThreadLocalGeosCtx(); tscInfo("all local resources released"); taosCleanupCfg(); diff --git a/source/dnode/mgmt/node_mgmt/src/dmMgmt.c b/source/dnode/mgmt/node_mgmt/src/dmMgmt.c index fdce9fd4c9..1d62d4bd90 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmMgmt.c +++ b/source/dnode/mgmt/node_mgmt/src/dmMgmt.c @@ -19,6 +19,7 @@ #include "index.h" #include "qworker.h" #include "tcompression.h" +#include "tgeosctx.h" #include "tglobal.h" #include "tgrant.h" #include "tstream.h" @@ -121,6 +122,7 @@ void dmCleanupDnode(SDnode *pDnode) { streamMetaCleanup(); indexCleanup(); taosConvDestroy(); + destroyThreadLocalGeosCtx(); // compress destroy tsCompressExit(); diff --git a/source/util/src/tgeosctx.c b/source/util/src/tgeosctx.c index a05734c911..47d5cc992b 100644 --- a/source/util/src/tgeosctx.c +++ b/source/util/src/tgeosctx.c @@ -15,38 +15,82 @@ #include "tgeosctx.h" #include "tdef.h" +#include "tlockfree.h" +#include "tlog.h" -static threadlocal SGeosContext tlGeosCtx = {0}; +typedef struct { + SGeosContext *pool; + int32_t capacity; + int32_t size; + SRWLatch lock; +} SGeosContextPool; -SGeosContext* getThreadLocalGeosCtx() { return &tlGeosCtx; } +static SGeosContextPool sGeosPool = {0}; -void destroyThreadLocalGeosCtx() { - if (tlGeosCtx.WKTReader) { - GEOSWKTReader_destroy_r(tlGeosCtx.handle, tlGeosCtx.WKTReader); - tlGeosCtx.WKTReader = NULL; +static threadlocal SGeosContext *tlGeosCtx = NULL; + +SGeosContext *getThreadLocalGeosCtx() { + if (tlGeosCtx) return tlGeosCtx; + + taosWLockLatch(&sGeosPool.lock); + if (sGeosPool.size >= sGeosPool.capacity) { + sGeosPool.capacity += 64; + void *tmp = taosMemoryRealloc(sGeosPool.pool, sGeosPool.capacity * sizeof(SGeosContext)); + if (!tmp) { + taosWUnLockLatch(&sGeosPool.lock); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + sGeosPool.pool = tmp; + TAOS_MEMSET(sGeosPool.pool + sGeosPool.size, 0, (sGeosPool.capacity - sGeosPool.size) * sizeof(SGeosContext)); } + tlGeosCtx = sGeosPool.pool + sGeosPool.size; + ++sGeosPool.size; + taosWUnLockLatch(&sGeosPool.lock); - if (tlGeosCtx.WKTWriter) { - GEOSWKTWriter_destroy_r(tlGeosCtx.handle, tlGeosCtx.WKTWriter); - tlGeosCtx.WKTWriter = NULL; - } + return tlGeosCtx; +} - if (tlGeosCtx.WKBReader) { - GEOSWKBReader_destroy_r(tlGeosCtx.handle, tlGeosCtx.WKBReader); - tlGeosCtx.WKBReader = NULL; - } +static void destroyGeosCtx(SGeosContext *pCtx) { + if (pCtx) { + if (pCtx->WKTReader) { + GEOSWKTReader_destroy_r(pCtx->handle, pCtx->WKTReader); + pCtx->WKTReader = NULL; + } - if (tlGeosCtx.WKBWriter) { - GEOSWKBWriter_destroy_r(tlGeosCtx.handle, tlGeosCtx.WKBWriter); - tlGeosCtx.WKBWriter = NULL; - } + if (pCtx->WKTWriter) { + GEOSWKTWriter_destroy_r(pCtx->handle, pCtx->WKTWriter); + pCtx->WKTWriter = NULL; + } - if (tlGeosCtx.WKTRegex) { - destroyRegexes(tlGeosCtx.WKTRegex, tlGeosCtx.WKTMatchData); - } + if (pCtx->WKBReader) { + GEOSWKBReader_destroy_r(pCtx->handle, pCtx->WKBReader); + pCtx->WKBReader = NULL; + } - if (tlGeosCtx.handle) { - GEOS_finish_r(tlGeosCtx.handle); - tlGeosCtx.handle = NULL; + if (pCtx->WKBWriter) { + GEOSWKBWriter_destroy_r(pCtx->handle, pCtx->WKBWriter); + pCtx->WKBWriter = NULL; + } + + if (pCtx->WKTRegex) { + destroyRegexes(pCtx->WKTRegex, pCtx->WKTMatchData); + pCtx->WKTRegex = NULL; + pCtx->WKTMatchData = NULL; + } + + if (pCtx->handle) { + GEOS_finish_r(pCtx->handle); + pCtx->handle = NULL; + } } } + +void destroyThreadLocalGeosCtx() { + uInfo("geos ctx is cleaned up"); + if (!sGeosPool.pool) return; + for (int32_t i = 0; i < sGeosPool.size; ++i) { + destroyGeosCtx(sGeosPool.pool + i); + } + taosMemoryFreeClear(sGeosPool.pool); +} \ No newline at end of file diff --git a/source/util/src/tsched.c b/source/util/src/tsched.c index 6779e8dee5..509dba0890 100644 --- a/source/util/src/tsched.c +++ b/source/util/src/tsched.c @@ -178,7 +178,7 @@ void *taosProcessSchedQueue(void *scheduler) { (*(msg.tfp))(msg.ahandle, msg.thandle); } - destroyThreadLocalGeosCtx(); + // destroyThreadLocalGeosCtx(); return NULL; } diff --git a/source/util/src/tworker.c b/source/util/src/tworker.c index b2064d6787..2da1abed78 100644 --- a/source/util/src/tworker.c +++ b/source/util/src/tworker.c @@ -105,7 +105,7 @@ static void *tQWorkerThreadFp(SQueueWorker *worker) { taosUpdateItemSize(qinfo.queue, 1); } - destroyThreadLocalGeosCtx(); + // destroyThreadLocalGeosCtx(); DestoryThreadLocalRegComp(); return NULL; @@ -665,7 +665,7 @@ static void *tQueryAutoQWorkerThreadFp(SQueryAutoQWorker *worker) { } } - destroyThreadLocalGeosCtx(); + // destroyThreadLocalGeosCtx(); DestoryThreadLocalRegComp(); return NULL; From 0531a4f4bd7a26468232cbedfd1ae4b62c33aa0f Mon Sep 17 00:00:00 2001 From: kailixu Date: Mon, 19 Aug 2024 18:36:39 +0800 Subject: [PATCH 2/8] fix: memory leak of geos --- include/util/tgeosctx.h | 8 ++-- source/client/src/clientMain.c | 2 +- source/dnode/mgmt/node_mgmt/src/dmMgmt.c | 2 +- source/libs/executor/src/sysscanoperator.c | 2 +- source/libs/geometry/src/geosWrapper.c | 45 +++++++++++++++++++++- source/libs/parser/src/parInsertSql.c | 4 +- source/libs/scalar/src/sclvector.c | 8 ++-- source/util/src/tgeosctx.c | 17 +++++--- source/util/src/tsched.c | 2 - source/util/src/tworker.c | 2 - tools/shell/src/shellEngine.c | 5 +-- 11 files changed, 71 insertions(+), 26 deletions(-) diff --git a/include/util/tgeosctx.h b/include/util/tgeosctx.h index 267ba9e049..a4355db29a 100644 --- a/include/util/tgeosctx.h +++ b/include/util/tgeosctx.h @@ -32,14 +32,16 @@ typedef struct SGeosContext { GEOSWKBReader *WKBReader; GEOSWKBWriter *WKBWriter; - pcre2_code *WKTRegex; + pcre2_code *WKTRegex; pcre2_match_data *WKTMatchData; char errMsg[512]; } SGeosContext; -SGeosContext* getThreadLocalGeosCtx(); -void destroyThreadLocalGeosCtx(); +SGeosContext *acquireThreadLocalGeosCtx(); +SGeosContext *getThreadLocalGeosCtx(); +const char *getGeosErrMsg(int32_t code); +void taosGeosDestroy(); #ifdef __cplusplus } diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index a403f9d1c2..ec3147f49a 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -91,11 +91,11 @@ void taos_cleanup(void) { tscDebug("rpc cleanup"); taosConvDestroy(); + taosGeosDestroy(); tmqMgmtClose(); DestroyRegexCache(); - destroyThreadLocalGeosCtx(); tscInfo("all local resources released"); taosCleanupCfg(); diff --git a/source/dnode/mgmt/node_mgmt/src/dmMgmt.c b/source/dnode/mgmt/node_mgmt/src/dmMgmt.c index 1d62d4bd90..ae6efa7af4 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmMgmt.c +++ b/source/dnode/mgmt/node_mgmt/src/dmMgmt.c @@ -122,7 +122,7 @@ void dmCleanupDnode(SDnode *pDnode) { streamMetaCleanup(); indexCleanup(); taosConvDestroy(); - destroyThreadLocalGeosCtx(); + taosGeosDestroy(); // compress destroy tsCompressExit(); diff --git a/source/libs/executor/src/sysscanoperator.c b/source/libs/executor/src/sysscanoperator.c index d8a2331980..082d4e7789 100644 --- a/source/libs/executor/src/sysscanoperator.c +++ b/source/libs/executor/src/sysscanoperator.c @@ -979,7 +979,7 @@ static int32_t sysTableGetGeomText(char* iGeom, int32_t nGeom, char** output, in if (TSDB_CODE_SUCCESS != (code = initCtxAsText()) || TSDB_CODE_SUCCESS != (code = doAsText(iGeom, nGeom, &outputWKT))) { - qError("geo text for systable failed:%s", getThreadLocalGeosCtx()->errMsg); + qError("geo text for systable failed:%s", getGeosErrMsg(code)); *output = NULL; *nOutput = 0; return code; diff --git a/source/libs/geometry/src/geosWrapper.c b/source/libs/geometry/src/geosWrapper.c index dde34edc91..4f3f7d75c2 100644 --- a/source/libs/geometry/src/geosWrapper.c +++ b/source/libs/geometry/src/geosWrapper.c @@ -23,7 +23,8 @@ typedef char (*_geosPreparedRelationFunc_t)(GEOSContextHandle_t handle, const GE void geosFreeBuffer(void *buffer) { if (buffer) { - GEOSFree_r(getThreadLocalGeosCtx()->handle, buffer); + SGeosContext *pCtx = acquireThreadLocalGeosCtx(); + if (pCtx) GEOSFree_r(pCtx->handle, buffer); } } @@ -36,6 +37,11 @@ int32_t initCtxMakePoint() { int32_t code = TSDB_CODE_FAILED; SGeosContext *geosCtx = getThreadLocalGeosCtx(); + if (!geosCtx) { + code = TSDB_CODE_OUT_OF_MEMORY; + return code; + } + if (geosCtx->handle == NULL) { geosCtx->handle = GEOS_init_r(); if (geosCtx->handle == NULL) { @@ -61,6 +67,11 @@ int32_t doMakePoint(double x, double y, unsigned char **outputGeom, size_t *size int32_t code = TSDB_CODE_FAILED; SGeosContext *geosCtx = getThreadLocalGeosCtx(); + if (!geosCtx) { + code = TSDB_CODE_OUT_OF_MEMORY; + return code; + } + GEOSGeometry *geom = NULL; unsigned char *wkb = NULL; @@ -166,6 +177,11 @@ int32_t initCtxGeomFromText() { int32_t code = TSDB_CODE_FAILED; SGeosContext *geosCtx = getThreadLocalGeosCtx(); + if (!geosCtx) { + code = TSDB_CODE_OUT_OF_MEMORY; + return code; + } + if (geosCtx->handle == NULL) { geosCtx->handle = GEOS_init_r(); if (geosCtx->handle == NULL) { @@ -202,6 +218,11 @@ int32_t doGeomFromText(const char *inputWKT, unsigned char **outputGeom, size_t int32_t code = TSDB_CODE_FAILED; SGeosContext *geosCtx = getThreadLocalGeosCtx(); + if (!geosCtx) { + code = TSDB_CODE_OUT_OF_MEMORY; + return code; + } + GEOSGeometry *geom = NULL; unsigned char *wkb = NULL; @@ -237,6 +258,11 @@ int32_t initCtxAsText() { int32_t code = TSDB_CODE_FAILED; SGeosContext *geosCtx = getThreadLocalGeosCtx(); + if (!geosCtx) { + code = TSDB_CODE_OUT_OF_MEMORY; + return code; + } + if (geosCtx->handle == NULL) { geosCtx->handle = GEOS_init_r(); if (geosCtx->handle == NULL) { @@ -273,6 +299,11 @@ int32_t doAsText(const unsigned char *inputGeom, size_t size, char **outputWKT) int32_t code = TSDB_CODE_FAILED; SGeosContext *geosCtx = getThreadLocalGeosCtx(); + if (!geosCtx) { + code = TSDB_CODE_OUT_OF_MEMORY; + return code; + } + GEOSGeometry *geom = NULL; char *wkt = NULL; @@ -304,6 +335,11 @@ int32_t initCtxRelationFunc() { int32_t code = TSDB_CODE_FAILED; SGeosContext *geosCtx = getThreadLocalGeosCtx(); + if (!geosCtx) { + code = TSDB_CODE_OUT_OF_MEMORY; + return code; + } + if (geosCtx->handle == NULL) { geosCtx->handle = GEOS_init_r(); if (geosCtx->handle == NULL) { @@ -329,6 +365,10 @@ int32_t doGeosRelation(const GEOSGeometry *geom1, const GEOSPreparedGeometry *pr _geosPreparedRelationFunc_t swappedPreparedRelationFn) { SGeosContext *geosCtx = getThreadLocalGeosCtx(); + if (!geosCtx) { + return TSDB_CODE_OUT_OF_MEMORY; + } + if (!preparedGeom1) { if (!swapped) { ASSERT(relationFn); @@ -390,6 +430,9 @@ int32_t doContainsProperly(const GEOSGeometry *geom1, const GEOSPreparedGeometry int32_t readGeometry(const unsigned char *input, GEOSGeometry **outputGeom, const GEOSPreparedGeometry **outputPreparedGeom) { SGeosContext *geosCtx = getThreadLocalGeosCtx(); + if (!geosCtx) { + return TSDB_CODE_OUT_OF_MEMORY; + } ASSERT(outputGeom); // it is not allowed if outputGeom is NULL *outputGeom = NULL; diff --git a/source/libs/parser/src/parInsertSql.c b/source/libs/parser/src/parInsertSql.c index cb94cd42f7..aa6116287e 100644 --- a/source/libs/parser/src/parInsertSql.c +++ b/source/libs/parser/src/parInsertSql.c @@ -655,7 +655,7 @@ static int32_t parseTagToken(const char** end, SToken* pToken, SSchema* pSchema, code = parseGeometry(pToken, &output, &size); if (code != TSDB_CODE_SUCCESS) { - code = buildSyntaxErrMsg(pMsgBuf, getThreadLocalGeosCtx()->errMsg, pToken->z); + code = buildSyntaxErrMsg(pMsgBuf, getGeosErrMsg(code), pToken->z); } else if (size + VARSTR_HEADER_SIZE > pSchema->bytes) { // Too long values will raise the invalid sql error message code = generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_VALUE_TOO_LONG, pSchema->name); @@ -1646,7 +1646,7 @@ static int32_t parseValueTokenImpl(SInsertParseContext* pCxt, const char** pSql, code = parseGeometry(pToken, &output, &size); if (code != TSDB_CODE_SUCCESS) { - code = buildSyntaxErrMsg(&pCxt->msg, getThreadLocalGeosCtx()->errMsg, pToken->z); + code = buildSyntaxErrMsg(&pCxt->msg, getGeosErrMsg(code), pToken->z); } // Too long values will raise the invalid sql error message else if (size + VARSTR_HEADER_SIZE > pSchema->bytes) { diff --git a/source/libs/scalar/src/sclvector.c b/source/libs/scalar/src/sclvector.c index 71773ced57..daf44ec527 100644 --- a/source/libs/scalar/src/sclvector.c +++ b/source/libs/scalar/src/sclvector.c @@ -441,12 +441,12 @@ static FORCE_INLINE int32_t varToGeometry(char *buf, SScalarParam *pOut, int32_t unsigned char *t = NULL; char *output = NULL; - if (initCtxGeomFromText()) { - sclError("failed to init geometry ctx, %s", getThreadLocalGeosCtx()->errMsg); + if ((code = initCtxGeomFromText()) != 0) { + sclError("failed to init geometry ctx, %s", getGeosErrMsg(code)); SCL_ERR_JRET(TSDB_CODE_APP_ERROR); } - if (doGeomFromText(buf, &t, &len)) { - sclInfo("failed to convert text to geometry, %s", getThreadLocalGeosCtx()->errMsg); + if ((code = doGeomFromText(buf, &t, &len))) { + sclInfo("failed to convert text to geometry, %s", getGeosErrMsg(code)); SCL_ERR_JRET(TSDB_CODE_SCALAR_CONVERT_ERROR); } diff --git a/source/util/src/tgeosctx.c b/source/util/src/tgeosctx.c index 47d5cc992b..473b7539fc 100644 --- a/source/util/src/tgeosctx.c +++ b/source/util/src/tgeosctx.c @@ -25,16 +25,19 @@ typedef struct { SRWLatch lock; } SGeosContextPool; -static SGeosContextPool sGeosPool = {0}; - +static SGeosContextPool sGeosPool = {0}; static threadlocal SGeosContext *tlGeosCtx = NULL; +SGeosContext *acquireThreadLocalGeosCtx() { return tlGeosCtx; } + SGeosContext *getThreadLocalGeosCtx() { - if (tlGeosCtx) return tlGeosCtx; + if (tlGeosCtx) { + return tlGeosCtx; + } taosWLockLatch(&sGeosPool.lock); if (sGeosPool.size >= sGeosPool.capacity) { - sGeosPool.capacity += 64; + sGeosPool.capacity += 128; void *tmp = taosMemoryRealloc(sGeosPool.pool, sGeosPool.capacity * sizeof(SGeosContext)); if (!tmp) { taosWUnLockLatch(&sGeosPool.lock); @@ -51,6 +54,8 @@ SGeosContext *getThreadLocalGeosCtx() { return tlGeosCtx; } +const char *getGeosErrMsg(int32_t code) { return tlGeosCtx ? tlGeosCtx->errMsg : (code != 0 ? tstrerror(code) : ""); } + static void destroyGeosCtx(SGeosContext *pCtx) { if (pCtx) { if (pCtx->WKTReader) { @@ -86,8 +91,8 @@ static void destroyGeosCtx(SGeosContext *pCtx) { } } -void destroyThreadLocalGeosCtx() { - uInfo("geos ctx is cleaned up"); +void taosGeosDestroy() { + uInfo("geos is cleaned up"); if (!sGeosPool.pool) return; for (int32_t i = 0; i < sGeosPool.size; ++i) { destroyGeosCtx(sGeosPool.pool + i); diff --git a/source/util/src/tsched.c b/source/util/src/tsched.c index 509dba0890..55a927f340 100644 --- a/source/util/src/tsched.c +++ b/source/util/src/tsched.c @@ -178,8 +178,6 @@ void *taosProcessSchedQueue(void *scheduler) { (*(msg.tfp))(msg.ahandle, msg.thandle); } - // destroyThreadLocalGeosCtx(); - return NULL; } diff --git a/source/util/src/tworker.c b/source/util/src/tworker.c index 2da1abed78..ebec134c91 100644 --- a/source/util/src/tworker.c +++ b/source/util/src/tworker.c @@ -105,7 +105,6 @@ static void *tQWorkerThreadFp(SQueueWorker *worker) { taosUpdateItemSize(qinfo.queue, 1); } - // destroyThreadLocalGeosCtx(); DestoryThreadLocalRegComp(); return NULL; @@ -665,7 +664,6 @@ static void *tQueryAutoQWorkerThreadFp(SQueryAutoQWorker *worker) { } } - // destroyThreadLocalGeosCtx(); DestoryThreadLocalRegComp(); return NULL; diff --git a/tools/shell/src/shellEngine.c b/tools/shell/src/shellEngine.c index 0ccbd683dc..2c8330c433 100644 --- a/tools/shell/src/shellEngine.c +++ b/tools/shell/src/shellEngine.c @@ -611,14 +611,14 @@ void shellPrintGeometry(const unsigned char *val, int32_t length, int32_t width) code = initCtxAsText(); if (code != TSDB_CODE_SUCCESS) { - shellPrintString(getThreadLocalGeosCtx()->errMsg, width); + shellPrintString(getGeosErrMsg(code), width); return; } char *outputWKT = NULL; code = doAsText(val, length, &outputWKT); if (code != TSDB_CODE_SUCCESS) { - shellPrintString(getThreadLocalGeosCtx()->errMsg, width); // should NOT happen + shellPrintString(getGeosErrMsg(code), width); // should NOT happen return; } @@ -1282,7 +1282,6 @@ void *shellThreadLoop(void *arg) { taosResetTerminalMode(); } while (shellRunCommand(command, true) == 0); - destroyThreadLocalGeosCtx(); taosMemoryFreeClear(command); shellWriteHistory(); shellExit(); From 6055b9172ec4e1192c077b7de1f57438ef444388 Mon Sep 17 00:00:00 2001 From: kailixu Date: Tue, 20 Aug 2024 09:03:57 +0800 Subject: [PATCH 3/8] fix: memory leak of geos --- source/libs/geometry/src/geosWrapper.c | 51 +++++++------------------- source/util/src/tgeosctx.c | 4 +- 2 files changed, 15 insertions(+), 40 deletions(-) diff --git a/source/libs/geometry/src/geosWrapper.c b/source/libs/geometry/src/geosWrapper.c index 4f3f7d75c2..7372521276 100644 --- a/source/libs/geometry/src/geosWrapper.c +++ b/source/libs/geometry/src/geosWrapper.c @@ -37,10 +37,7 @@ int32_t initCtxMakePoint() { int32_t code = TSDB_CODE_FAILED; SGeosContext *geosCtx = getThreadLocalGeosCtx(); - if (!geosCtx) { - code = TSDB_CODE_OUT_OF_MEMORY; - return code; - } + if (!geosCtx) return TSDB_CODE_OUT_OF_MEMORY; if (geosCtx->handle == NULL) { geosCtx->handle = GEOS_init_r(); @@ -67,10 +64,7 @@ int32_t doMakePoint(double x, double y, unsigned char **outputGeom, size_t *size int32_t code = TSDB_CODE_FAILED; SGeosContext *geosCtx = getThreadLocalGeosCtx(); - if (!geosCtx) { - code = TSDB_CODE_OUT_OF_MEMORY; - return code; - } + if (!geosCtx) return TSDB_CODE_OUT_OF_MEMORY; GEOSGeometry *geom = NULL; unsigned char *wkb = NULL; @@ -177,10 +171,7 @@ int32_t initCtxGeomFromText() { int32_t code = TSDB_CODE_FAILED; SGeosContext *geosCtx = getThreadLocalGeosCtx(); - if (!geosCtx) { - code = TSDB_CODE_OUT_OF_MEMORY; - return code; - } + if (!geosCtx) return TSDB_CODE_OUT_OF_MEMORY; if (geosCtx->handle == NULL) { geosCtx->handle = GEOS_init_r(); @@ -218,10 +209,7 @@ int32_t doGeomFromText(const char *inputWKT, unsigned char **outputGeom, size_t int32_t code = TSDB_CODE_FAILED; SGeosContext *geosCtx = getThreadLocalGeosCtx(); - if (!geosCtx) { - code = TSDB_CODE_OUT_OF_MEMORY; - return code; - } + if (!geosCtx) return TSDB_CODE_OUT_OF_MEMORY; GEOSGeometry *geom = NULL; unsigned char *wkb = NULL; @@ -258,10 +246,7 @@ int32_t initCtxAsText() { int32_t code = TSDB_CODE_FAILED; SGeosContext *geosCtx = getThreadLocalGeosCtx(); - if (!geosCtx) { - code = TSDB_CODE_OUT_OF_MEMORY; - return code; - } + if (!geosCtx) return TSDB_CODE_OUT_OF_MEMORY; if (geosCtx->handle == NULL) { geosCtx->handle = GEOS_init_r(); @@ -299,10 +284,7 @@ int32_t doAsText(const unsigned char *inputGeom, size_t size, char **outputWKT) int32_t code = TSDB_CODE_FAILED; SGeosContext *geosCtx = getThreadLocalGeosCtx(); - if (!geosCtx) { - code = TSDB_CODE_OUT_OF_MEMORY; - return code; - } + if (!geosCtx) return TSDB_CODE_OUT_OF_MEMORY; GEOSGeometry *geom = NULL; char *wkt = NULL; @@ -335,10 +317,7 @@ int32_t initCtxRelationFunc() { int32_t code = TSDB_CODE_FAILED; SGeosContext *geosCtx = getThreadLocalGeosCtx(); - if (!geosCtx) { - code = TSDB_CODE_OUT_OF_MEMORY; - return code; - } + if (!geosCtx) return TSDB_CODE_OUT_OF_MEMORY; if (geosCtx->handle == NULL) { geosCtx->handle = GEOS_init_r(); @@ -365,9 +344,7 @@ int32_t doGeosRelation(const GEOSGeometry *geom1, const GEOSPreparedGeometry *pr _geosPreparedRelationFunc_t swappedPreparedRelationFn) { SGeosContext *geosCtx = getThreadLocalGeosCtx(); - if (!geosCtx) { - return TSDB_CODE_OUT_OF_MEMORY; - } + if (!geosCtx) return TSDB_CODE_OUT_OF_MEMORY; if (!preparedGeom1) { if (!swapped) { @@ -429,11 +406,6 @@ int32_t doContainsProperly(const GEOSGeometry *geom1, const GEOSPreparedGeometry // need to call destroyGeometry(outputGeom, outputPreparedGeom) later int32_t readGeometry(const unsigned char *input, GEOSGeometry **outputGeom, const GEOSPreparedGeometry **outputPreparedGeom) { - SGeosContext *geosCtx = getThreadLocalGeosCtx(); - if (!geosCtx) { - return TSDB_CODE_OUT_OF_MEMORY; - } - ASSERT(outputGeom); // it is not allowed if outputGeom is NULL *outputGeom = NULL; @@ -445,6 +417,10 @@ int32_t readGeometry(const unsigned char *input, GEOSGeometry **outputGeom, return TSDB_CODE_SUCCESS; } + SGeosContext *geosCtx = getThreadLocalGeosCtx(); + + if (!geosCtx) return TSDB_CODE_OUT_OF_MEMORY; + *outputGeom = GEOSWKBReader_read_r(geosCtx->handle, geosCtx->WKBReader, varDataVal(input), varDataLen(input)); if (*outputGeom == NULL) { return TSDB_CODE_FUNC_FUNTION_PARA_VALUE; @@ -461,7 +437,8 @@ int32_t readGeometry(const unsigned char *input, GEOSGeometry **outputGeom, } void destroyGeometry(GEOSGeometry **geom, const GEOSPreparedGeometry **preparedGeom) { - SGeosContext *geosCtx = getThreadLocalGeosCtx(); + SGeosContext *geosCtx = acquireThreadLocalGeosCtx(); + if (!geosCtx) return; if (preparedGeom && *preparedGeom) { GEOSPreparedGeom_destroy_r(geosCtx->handle, *preparedGeom); diff --git a/source/util/src/tgeosctx.c b/source/util/src/tgeosctx.c index 473b7539fc..5d47452fda 100644 --- a/source/util/src/tgeosctx.c +++ b/source/util/src/tgeosctx.c @@ -31,9 +31,7 @@ static threadlocal SGeosContext *tlGeosCtx = NULL; SGeosContext *acquireThreadLocalGeosCtx() { return tlGeosCtx; } SGeosContext *getThreadLocalGeosCtx() { - if (tlGeosCtx) { - return tlGeosCtx; - } + if (tlGeosCtx) return tlGeosCtx; taosWLockLatch(&sGeosPool.lock); if (sGeosPool.size >= sGeosPool.capacity) { From a2217973aeb70a8d7651263c7f11c6dd6e0c7a99 Mon Sep 17 00:00:00 2001 From: kailixu Date: Tue, 20 Aug 2024 09:07:29 +0800 Subject: [PATCH 4/8] fix: memory leak of geos --- source/libs/scalar/src/sclvector.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/scalar/src/sclvector.c b/source/libs/scalar/src/sclvector.c index daf44ec527..e20b0cb6fc 100644 --- a/source/libs/scalar/src/sclvector.c +++ b/source/libs/scalar/src/sclvector.c @@ -445,7 +445,7 @@ static FORCE_INLINE int32_t varToGeometry(char *buf, SScalarParam *pOut, int32_t sclError("failed to init geometry ctx, %s", getGeosErrMsg(code)); SCL_ERR_JRET(TSDB_CODE_APP_ERROR); } - if ((code = doGeomFromText(buf, &t, &len))) { + if ((code = doGeomFromText(buf, &t, &len)) != 0) { sclInfo("failed to convert text to geometry, %s", getGeosErrMsg(code)); SCL_ERR_JRET(TSDB_CODE_SCALAR_CONVERT_ERROR); } From 2f4d0815920a46e45f38fb1e927528e4ff5ca07f Mon Sep 17 00:00:00 2001 From: kailixu Date: Tue, 20 Aug 2024 09:08:22 +0800 Subject: [PATCH 5/8] fix: memory leak of geos --- source/libs/scalar/src/sclvector.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/scalar/src/sclvector.c b/source/libs/scalar/src/sclvector.c index e20b0cb6fc..9bac02e5f9 100644 --- a/source/libs/scalar/src/sclvector.c +++ b/source/libs/scalar/src/sclvector.c @@ -446,7 +446,7 @@ static FORCE_INLINE int32_t varToGeometry(char *buf, SScalarParam *pOut, int32_t SCL_ERR_JRET(TSDB_CODE_APP_ERROR); } if ((code = doGeomFromText(buf, &t, &len)) != 0) { - sclInfo("failed to convert text to geometry, %s", getGeosErrMsg(code)); + sclError("failed to convert text to geometry, %s", getGeosErrMsg(code)); SCL_ERR_JRET(TSDB_CODE_SCALAR_CONVERT_ERROR); } From 66878fd040922b8881501d04768d879c9f5316b0 Mon Sep 17 00:00:00 2001 From: kailixu Date: Tue, 20 Aug 2024 10:10:42 +0800 Subject: [PATCH 6/8] fix: memory leak of geos --- source/util/src/tgeosctx.c | 45 ++++++++++++++++++++++++++------------ 1 file changed, 31 insertions(+), 14 deletions(-) diff --git a/source/util/src/tgeosctx.c b/source/util/src/tgeosctx.c index 5d47452fda..42cde5b8c7 100644 --- a/source/util/src/tgeosctx.c +++ b/source/util/src/tgeosctx.c @@ -14,14 +14,16 @@ */ #include "tgeosctx.h" +#include "tarray.h" #include "tdef.h" #include "tlockfree.h" #include "tlog.h" +#define GEOS_POOL_CAPACITY 64 typedef struct { - SGeosContext *pool; - int32_t capacity; - int32_t size; + SArray *poolArray; // totalSize: (GEOS_POOL_CAPACITY * (taosArrayGetSize(poolArray) - 1)) + size + SGeosContext *pool; // current SGeosContext pool + int32_t size; // size of current SGeosContext pool, size <= GEOS_POOL_CAPACITY SRWLatch lock; } SGeosContextPool; @@ -34,16 +36,26 @@ SGeosContext *getThreadLocalGeosCtx() { if (tlGeosCtx) return tlGeosCtx; taosWLockLatch(&sGeosPool.lock); - if (sGeosPool.size >= sGeosPool.capacity) { - sGeosPool.capacity += 128; - void *tmp = taosMemoryRealloc(sGeosPool.pool, sGeosPool.capacity * sizeof(SGeosContext)); - if (!tmp) { + if (!sGeosPool.pool || sGeosPool.size >= GEOS_POOL_CAPACITY) { + if (!(sGeosPool.pool = (SGeosContext *)taosMemoryCalloc(GEOS_POOL_CAPACITY, sizeof(SGeosContext)))) { taosWUnLockLatch(&sGeosPool.lock); - terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } - sGeosPool.pool = tmp; - TAOS_MEMSET(sGeosPool.pool + sGeosPool.size, 0, (sGeosPool.capacity - sGeosPool.size) * sizeof(SGeosContext)); + if (!sGeosPool.poolArray) { + if (!(sGeosPool.poolArray = taosArrayInit(16, POINTER_BYTES))) { + taosMemoryFree(sGeosPool.pool); + sGeosPool.pool = NULL; + taosWUnLockLatch(&sGeosPool.lock); + return NULL; + } + } + if (!taosArrayPush(sGeosPool.poolArray, &sGeosPool.pool)) { + taosMemoryFree(sGeosPool.pool); + sGeosPool.pool = NULL; + taosWUnLockLatch(&sGeosPool.lock); + return NULL; + } + sGeosPool.size = 0; } tlGeosCtx = sGeosPool.pool + sGeosPool.size; ++sGeosPool.size; @@ -91,9 +103,14 @@ static void destroyGeosCtx(SGeosContext *pCtx) { void taosGeosDestroy() { uInfo("geos is cleaned up"); - if (!sGeosPool.pool) return; - for (int32_t i = 0; i < sGeosPool.size; ++i) { - destroyGeosCtx(sGeosPool.pool + i); + int32_t size = taosArrayGetSize(sGeosPool.poolArray); + for (int32_t i = 0; i < size; ++i) { + SGeosContext *pool = *(SGeosContext **)TARRAY_GET_ELEM(sGeosPool.poolArray, i); + for (int32_t j = 0; j < GEOS_POOL_CAPACITY; ++j) { + destroyGeosCtx(pool + j); + } + taosMemoryFree(pool); } - taosMemoryFreeClear(sGeosPool.pool); + taosArrayDestroy(sGeosPool.poolArray); + sGeosPool.poolArray = NULL; } \ No newline at end of file From 5e77f6f6ca873e0b1aa308e5721a383fdec1f7d6 Mon Sep 17 00:00:00 2001 From: kailixu Date: Tue, 20 Aug 2024 10:18:14 +0800 Subject: [PATCH 7/8] fix: memory leak of geos --- source/util/src/tgeosctx.c | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/source/util/src/tgeosctx.c b/source/util/src/tgeosctx.c index 42cde5b8c7..99655ed7f7 100644 --- a/source/util/src/tgeosctx.c +++ b/source/util/src/tgeosctx.c @@ -43,15 +43,13 @@ SGeosContext *getThreadLocalGeosCtx() { } if (!sGeosPool.poolArray) { if (!(sGeosPool.poolArray = taosArrayInit(16, POINTER_BYTES))) { - taosMemoryFree(sGeosPool.pool); - sGeosPool.pool = NULL; + taosMemoryFreeClear(sGeosPool.pool); taosWUnLockLatch(&sGeosPool.lock); return NULL; } } if (!taosArrayPush(sGeosPool.poolArray, &sGeosPool.pool)) { - taosMemoryFree(sGeosPool.pool); - sGeosPool.pool = NULL; + taosMemoryFreeClear(sGeosPool.pool); taosWUnLockLatch(&sGeosPool.lock); return NULL; } From 7ac14bca97d841839bf299cdc994b512c149a110 Mon Sep 17 00:00:00 2001 From: kailixu Date: Tue, 20 Aug 2024 14:06:35 +0800 Subject: [PATCH 8/8] fix: memory leak of geos --- source/util/src/tgeosctx.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/source/util/src/tgeosctx.c b/source/util/src/tgeosctx.c index 99655ed7f7..82a360edd1 100644 --- a/source/util/src/tgeosctx.c +++ b/source/util/src/tgeosctx.c @@ -104,7 +104,8 @@ void taosGeosDestroy() { int32_t size = taosArrayGetSize(sGeosPool.poolArray); for (int32_t i = 0; i < size; ++i) { SGeosContext *pool = *(SGeosContext **)TARRAY_GET_ELEM(sGeosPool.poolArray, i); - for (int32_t j = 0; j < GEOS_POOL_CAPACITY; ++j) { + int32_t poolSize = i == size - 1 ? sGeosPool.size : GEOS_POOL_CAPACITY; + for (int32_t j = 0; j < poolSize; ++j) { destroyGeosCtx(pool + j); } taosMemoryFree(pool);