From 87e56d3d67f394394759efd066e7925451267d4b Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Wed, 21 Aug 2024 10:23:22 +0800 Subject: [PATCH] Revert "fix: memory leak of geos" --- include/util/tgeosctx.h | 8 +- source/client/src/clientMain.c | 2 - source/dnode/mgmt/node_mgmt/src/dmMgmt.c | 2 - source/libs/executor/src/sysscanoperator.c | 2 +- source/libs/geometry/src/geosWrapper.c | 28 +----- source/libs/parser/src/parInsertSql.c | 4 +- source/libs/scalar/src/sclvector.c | 8 +- source/util/src/tgeosctx.c | 111 +++++---------------- source/util/src/tsched.c | 2 + source/util/src/tworker.c | 2 + tools/shell/src/shellEngine.c | 5 +- 11 files changed, 45 insertions(+), 129 deletions(-) diff --git a/include/util/tgeosctx.h b/include/util/tgeosctx.h index a4355db29a..267ba9e049 100644 --- a/include/util/tgeosctx.h +++ b/include/util/tgeosctx.h @@ -32,16 +32,14 @@ typedef struct SGeosContext { GEOSWKBReader *WKBReader; GEOSWKBWriter *WKBWriter; - pcre2_code *WKTRegex; + pcre2_code *WKTRegex; pcre2_match_data *WKTMatchData; char errMsg[512]; } SGeosContext; -SGeosContext *acquireThreadLocalGeosCtx(); -SGeosContext *getThreadLocalGeosCtx(); -const char *getGeosErrMsg(int32_t code); -void taosGeosDestroy(); +SGeosContext* getThreadLocalGeosCtx(); +void destroyThreadLocalGeosCtx(); #ifdef __cplusplus } diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index c0eaf27077..d007dae7f7 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -23,7 +23,6 @@ #include "query.h" #include "scheduler.h" #include "tdatablock.h" -#include "tgeosctx.h" #include "tglobal.h" #include "tmsg.h" #include "tref.h" @@ -87,7 +86,6 @@ void taos_cleanup(void) { tscDebug("rpc cleanup"); taosConvDestroy(); - taosGeosDestroy(); tmqMgmtClose(); diff --git a/source/dnode/mgmt/node_mgmt/src/dmMgmt.c b/source/dnode/mgmt/node_mgmt/src/dmMgmt.c index ae6efa7af4..fdce9fd4c9 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmMgmt.c +++ b/source/dnode/mgmt/node_mgmt/src/dmMgmt.c @@ -19,7 +19,6 @@ #include "index.h" #include "qworker.h" #include "tcompression.h" -#include "tgeosctx.h" #include "tglobal.h" #include "tgrant.h" #include "tstream.h" @@ -122,7 +121,6 @@ void dmCleanupDnode(SDnode *pDnode) { streamMetaCleanup(); indexCleanup(); taosConvDestroy(); - taosGeosDestroy(); // compress destroy tsCompressExit(); diff --git a/source/libs/executor/src/sysscanoperator.c b/source/libs/executor/src/sysscanoperator.c index 082d4e7789..d8a2331980 100644 --- a/source/libs/executor/src/sysscanoperator.c +++ b/source/libs/executor/src/sysscanoperator.c @@ -979,7 +979,7 @@ static int32_t sysTableGetGeomText(char* iGeom, int32_t nGeom, char** output, in if (TSDB_CODE_SUCCESS != (code = initCtxAsText()) || TSDB_CODE_SUCCESS != (code = doAsText(iGeom, nGeom, &outputWKT))) { - qError("geo text for systable failed:%s", getGeosErrMsg(code)); + qError("geo text for systable failed:%s", getThreadLocalGeosCtx()->errMsg); *output = NULL; *nOutput = 0; return code; diff --git a/source/libs/geometry/src/geosWrapper.c b/source/libs/geometry/src/geosWrapper.c index 7372521276..dde34edc91 100644 --- a/source/libs/geometry/src/geosWrapper.c +++ b/source/libs/geometry/src/geosWrapper.c @@ -23,8 +23,7 @@ typedef char (*_geosPreparedRelationFunc_t)(GEOSContextHandle_t handle, const GE void geosFreeBuffer(void *buffer) { if (buffer) { - SGeosContext *pCtx = acquireThreadLocalGeosCtx(); - if (pCtx) GEOSFree_r(pCtx->handle, buffer); + GEOSFree_r(getThreadLocalGeosCtx()->handle, buffer); } } @@ -37,8 +36,6 @@ int32_t initCtxMakePoint() { int32_t code = TSDB_CODE_FAILED; SGeosContext *geosCtx = getThreadLocalGeosCtx(); - if (!geosCtx) return TSDB_CODE_OUT_OF_MEMORY; - if (geosCtx->handle == NULL) { geosCtx->handle = GEOS_init_r(); if (geosCtx->handle == NULL) { @@ -64,8 +61,6 @@ int32_t doMakePoint(double x, double y, unsigned char **outputGeom, size_t *size int32_t code = TSDB_CODE_FAILED; SGeosContext *geosCtx = getThreadLocalGeosCtx(); - if (!geosCtx) return TSDB_CODE_OUT_OF_MEMORY; - GEOSGeometry *geom = NULL; unsigned char *wkb = NULL; @@ -171,8 +166,6 @@ int32_t initCtxGeomFromText() { int32_t code = TSDB_CODE_FAILED; SGeosContext *geosCtx = getThreadLocalGeosCtx(); - if (!geosCtx) return TSDB_CODE_OUT_OF_MEMORY; - if (geosCtx->handle == NULL) { geosCtx->handle = GEOS_init_r(); if (geosCtx->handle == NULL) { @@ -209,8 +202,6 @@ int32_t doGeomFromText(const char *inputWKT, unsigned char **outputGeom, size_t int32_t code = TSDB_CODE_FAILED; SGeosContext *geosCtx = getThreadLocalGeosCtx(); - if (!geosCtx) return TSDB_CODE_OUT_OF_MEMORY; - GEOSGeometry *geom = NULL; unsigned char *wkb = NULL; @@ -246,8 +237,6 @@ int32_t initCtxAsText() { int32_t code = TSDB_CODE_FAILED; SGeosContext *geosCtx = getThreadLocalGeosCtx(); - if (!geosCtx) return TSDB_CODE_OUT_OF_MEMORY; - if (geosCtx->handle == NULL) { geosCtx->handle = GEOS_init_r(); if (geosCtx->handle == NULL) { @@ -284,8 +273,6 @@ int32_t doAsText(const unsigned char *inputGeom, size_t size, char **outputWKT) int32_t code = TSDB_CODE_FAILED; SGeosContext *geosCtx = getThreadLocalGeosCtx(); - if (!geosCtx) return TSDB_CODE_OUT_OF_MEMORY; - GEOSGeometry *geom = NULL; char *wkt = NULL; @@ -317,8 +304,6 @@ int32_t initCtxRelationFunc() { int32_t code = TSDB_CODE_FAILED; SGeosContext *geosCtx = getThreadLocalGeosCtx(); - if (!geosCtx) return TSDB_CODE_OUT_OF_MEMORY; - if (geosCtx->handle == NULL) { geosCtx->handle = GEOS_init_r(); if (geosCtx->handle == NULL) { @@ -344,8 +329,6 @@ int32_t doGeosRelation(const GEOSGeometry *geom1, const GEOSPreparedGeometry *pr _geosPreparedRelationFunc_t swappedPreparedRelationFn) { SGeosContext *geosCtx = getThreadLocalGeosCtx(); - if (!geosCtx) return TSDB_CODE_OUT_OF_MEMORY; - if (!preparedGeom1) { if (!swapped) { ASSERT(relationFn); @@ -406,6 +389,8 @@ int32_t doContainsProperly(const GEOSGeometry *geom1, const GEOSPreparedGeometry // need to call destroyGeometry(outputGeom, outputPreparedGeom) later int32_t readGeometry(const unsigned char *input, GEOSGeometry **outputGeom, const GEOSPreparedGeometry **outputPreparedGeom) { + SGeosContext *geosCtx = getThreadLocalGeosCtx(); + ASSERT(outputGeom); // it is not allowed if outputGeom is NULL *outputGeom = NULL; @@ -417,10 +402,6 @@ int32_t readGeometry(const unsigned char *input, GEOSGeometry **outputGeom, return TSDB_CODE_SUCCESS; } - SGeosContext *geosCtx = getThreadLocalGeosCtx(); - - if (!geosCtx) return TSDB_CODE_OUT_OF_MEMORY; - *outputGeom = GEOSWKBReader_read_r(geosCtx->handle, geosCtx->WKBReader, varDataVal(input), varDataLen(input)); if (*outputGeom == NULL) { return TSDB_CODE_FUNC_FUNTION_PARA_VALUE; @@ -437,8 +418,7 @@ int32_t readGeometry(const unsigned char *input, GEOSGeometry **outputGeom, } void destroyGeometry(GEOSGeometry **geom, const GEOSPreparedGeometry **preparedGeom) { - SGeosContext *geosCtx = acquireThreadLocalGeosCtx(); - if (!geosCtx) return; + SGeosContext *geosCtx = getThreadLocalGeosCtx(); if (preparedGeom && *preparedGeom) { GEOSPreparedGeom_destroy_r(geosCtx->handle, *preparedGeom); diff --git a/source/libs/parser/src/parInsertSql.c b/source/libs/parser/src/parInsertSql.c index aa6116287e..cb94cd42f7 100644 --- a/source/libs/parser/src/parInsertSql.c +++ b/source/libs/parser/src/parInsertSql.c @@ -655,7 +655,7 @@ static int32_t parseTagToken(const char** end, SToken* pToken, SSchema* pSchema, code = parseGeometry(pToken, &output, &size); if (code != TSDB_CODE_SUCCESS) { - code = buildSyntaxErrMsg(pMsgBuf, getGeosErrMsg(code), pToken->z); + code = buildSyntaxErrMsg(pMsgBuf, getThreadLocalGeosCtx()->errMsg, pToken->z); } else if (size + VARSTR_HEADER_SIZE > pSchema->bytes) { // Too long values will raise the invalid sql error message code = generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_VALUE_TOO_LONG, pSchema->name); @@ -1646,7 +1646,7 @@ static int32_t parseValueTokenImpl(SInsertParseContext* pCxt, const char** pSql, code = parseGeometry(pToken, &output, &size); if (code != TSDB_CODE_SUCCESS) { - code = buildSyntaxErrMsg(&pCxt->msg, getGeosErrMsg(code), pToken->z); + code = buildSyntaxErrMsg(&pCxt->msg, getThreadLocalGeosCtx()->errMsg, pToken->z); } // Too long values will raise the invalid sql error message else if (size + VARSTR_HEADER_SIZE > pSchema->bytes) { diff --git a/source/libs/scalar/src/sclvector.c b/source/libs/scalar/src/sclvector.c index bc8a2ae233..5556108a52 100644 --- a/source/libs/scalar/src/sclvector.c +++ b/source/libs/scalar/src/sclvector.c @@ -446,12 +446,12 @@ static FORCE_INLINE int32_t varToGeometry(char *buf, SScalarParam *pOut, int32_t unsigned char *t = NULL; char *output = NULL; - if ((code = initCtxGeomFromText()) != 0) { - sclError("failed to init geometry ctx, %s", getGeosErrMsg(code)); + if (initCtxGeomFromText()) { + sclError("failed to init geometry ctx, %s", getThreadLocalGeosCtx()->errMsg); SCL_ERR_JRET(TSDB_CODE_APP_ERROR); } - if ((code = doGeomFromText(buf, &t, &len)) != 0) { - sclError("failed to convert text to geometry, %s", getGeosErrMsg(code)); + if (doGeomFromText(buf, &t, &len)) { + sclInfo("failed to convert text to geometry, %s", getThreadLocalGeosCtx()->errMsg); SCL_ERR_JRET(TSDB_CODE_SCALAR_CONVERT_ERROR); } diff --git a/source/util/src/tgeosctx.c b/source/util/src/tgeosctx.c index 82a360edd1..a05734c911 100644 --- a/source/util/src/tgeosctx.c +++ b/source/util/src/tgeosctx.c @@ -14,102 +14,39 @@ */ #include "tgeosctx.h" -#include "tarray.h" #include "tdef.h" -#include "tlockfree.h" -#include "tlog.h" -#define GEOS_POOL_CAPACITY 64 -typedef struct { - SArray *poolArray; // totalSize: (GEOS_POOL_CAPACITY * (taosArrayGetSize(poolArray) - 1)) + size - SGeosContext *pool; // current SGeosContext pool - int32_t size; // size of current SGeosContext pool, size <= GEOS_POOL_CAPACITY - SRWLatch lock; -} SGeosContextPool; +static threadlocal SGeosContext tlGeosCtx = {0}; -static SGeosContextPool sGeosPool = {0}; -static threadlocal SGeosContext *tlGeosCtx = NULL; +SGeosContext* getThreadLocalGeosCtx() { return &tlGeosCtx; } -SGeosContext *acquireThreadLocalGeosCtx() { return tlGeosCtx; } - -SGeosContext *getThreadLocalGeosCtx() { - if (tlGeosCtx) return tlGeosCtx; - - taosWLockLatch(&sGeosPool.lock); - if (!sGeosPool.pool || sGeosPool.size >= GEOS_POOL_CAPACITY) { - if (!(sGeosPool.pool = (SGeosContext *)taosMemoryCalloc(GEOS_POOL_CAPACITY, sizeof(SGeosContext)))) { - taosWUnLockLatch(&sGeosPool.lock); - return NULL; - } - if (!sGeosPool.poolArray) { - if (!(sGeosPool.poolArray = taosArrayInit(16, POINTER_BYTES))) { - taosMemoryFreeClear(sGeosPool.pool); - taosWUnLockLatch(&sGeosPool.lock); - return NULL; - } - } - if (!taosArrayPush(sGeosPool.poolArray, &sGeosPool.pool)) { - taosMemoryFreeClear(sGeosPool.pool); - taosWUnLockLatch(&sGeosPool.lock); - return NULL; - } - sGeosPool.size = 0; +void destroyThreadLocalGeosCtx() { + if (tlGeosCtx.WKTReader) { + GEOSWKTReader_destroy_r(tlGeosCtx.handle, tlGeosCtx.WKTReader); + tlGeosCtx.WKTReader = NULL; } - tlGeosCtx = sGeosPool.pool + sGeosPool.size; - ++sGeosPool.size; - taosWUnLockLatch(&sGeosPool.lock); - return tlGeosCtx; -} + if (tlGeosCtx.WKTWriter) { + GEOSWKTWriter_destroy_r(tlGeosCtx.handle, tlGeosCtx.WKTWriter); + tlGeosCtx.WKTWriter = NULL; + } -const char *getGeosErrMsg(int32_t code) { return tlGeosCtx ? tlGeosCtx->errMsg : (code != 0 ? tstrerror(code) : ""); } + if (tlGeosCtx.WKBReader) { + GEOSWKBReader_destroy_r(tlGeosCtx.handle, tlGeosCtx.WKBReader); + tlGeosCtx.WKBReader = NULL; + } -static void destroyGeosCtx(SGeosContext *pCtx) { - if (pCtx) { - if (pCtx->WKTReader) { - GEOSWKTReader_destroy_r(pCtx->handle, pCtx->WKTReader); - pCtx->WKTReader = NULL; - } + if (tlGeosCtx.WKBWriter) { + GEOSWKBWriter_destroy_r(tlGeosCtx.handle, tlGeosCtx.WKBWriter); + tlGeosCtx.WKBWriter = NULL; + } - if (pCtx->WKTWriter) { - GEOSWKTWriter_destroy_r(pCtx->handle, pCtx->WKTWriter); - pCtx->WKTWriter = NULL; - } + if (tlGeosCtx.WKTRegex) { + destroyRegexes(tlGeosCtx.WKTRegex, tlGeosCtx.WKTMatchData); + } - if (pCtx->WKBReader) { - GEOSWKBReader_destroy_r(pCtx->handle, pCtx->WKBReader); - pCtx->WKBReader = NULL; - } - - if (pCtx->WKBWriter) { - GEOSWKBWriter_destroy_r(pCtx->handle, pCtx->WKBWriter); - pCtx->WKBWriter = NULL; - } - - if (pCtx->WKTRegex) { - destroyRegexes(pCtx->WKTRegex, pCtx->WKTMatchData); - pCtx->WKTRegex = NULL; - pCtx->WKTMatchData = NULL; - } - - if (pCtx->handle) { - GEOS_finish_r(pCtx->handle); - pCtx->handle = NULL; - } + if (tlGeosCtx.handle) { + GEOS_finish_r(tlGeosCtx.handle); + tlGeosCtx.handle = NULL; } } - -void taosGeosDestroy() { - uInfo("geos is cleaned up"); - int32_t size = taosArrayGetSize(sGeosPool.poolArray); - for (int32_t i = 0; i < size; ++i) { - SGeosContext *pool = *(SGeosContext **)TARRAY_GET_ELEM(sGeosPool.poolArray, i); - int32_t poolSize = i == size - 1 ? sGeosPool.size : GEOS_POOL_CAPACITY; - for (int32_t j = 0; j < poolSize; ++j) { - destroyGeosCtx(pool + j); - } - taosMemoryFree(pool); - } - taosArrayDestroy(sGeosPool.poolArray); - sGeosPool.poolArray = NULL; -} \ No newline at end of file diff --git a/source/util/src/tsched.c b/source/util/src/tsched.c index 55a927f340..6779e8dee5 100644 --- a/source/util/src/tsched.c +++ b/source/util/src/tsched.c @@ -178,6 +178,8 @@ void *taosProcessSchedQueue(void *scheduler) { (*(msg.tfp))(msg.ahandle, msg.thandle); } + destroyThreadLocalGeosCtx(); + return NULL; } diff --git a/source/util/src/tworker.c b/source/util/src/tworker.c index ebec134c91..b2064d6787 100644 --- a/source/util/src/tworker.c +++ b/source/util/src/tworker.c @@ -105,6 +105,7 @@ static void *tQWorkerThreadFp(SQueueWorker *worker) { taosUpdateItemSize(qinfo.queue, 1); } + destroyThreadLocalGeosCtx(); DestoryThreadLocalRegComp(); return NULL; @@ -664,6 +665,7 @@ static void *tQueryAutoQWorkerThreadFp(SQueryAutoQWorker *worker) { } } + destroyThreadLocalGeosCtx(); DestoryThreadLocalRegComp(); return NULL; diff --git a/tools/shell/src/shellEngine.c b/tools/shell/src/shellEngine.c index 2c8330c433..0ccbd683dc 100644 --- a/tools/shell/src/shellEngine.c +++ b/tools/shell/src/shellEngine.c @@ -611,14 +611,14 @@ void shellPrintGeometry(const unsigned char *val, int32_t length, int32_t width) code = initCtxAsText(); if (code != TSDB_CODE_SUCCESS) { - shellPrintString(getGeosErrMsg(code), width); + shellPrintString(getThreadLocalGeosCtx()->errMsg, width); return; } char *outputWKT = NULL; code = doAsText(val, length, &outputWKT); if (code != TSDB_CODE_SUCCESS) { - shellPrintString(getGeosErrMsg(code), width); // should NOT happen + shellPrintString(getThreadLocalGeosCtx()->errMsg, width); // should NOT happen return; } @@ -1282,6 +1282,7 @@ void *shellThreadLoop(void *arg) { taosResetTerminalMode(); } while (shellRunCommand(command, true) == 0); + destroyThreadLocalGeosCtx(); taosMemoryFreeClear(command); shellWriteHistory(); shellExit();