From b072a83bbdb53ab2826e36100c5314d03e57eeab Mon Sep 17 00:00:00 2001 From: kailixu Date: Wed, 21 Aug 2024 15:45:57 +0800 Subject: [PATCH 01/17] fix: memory leak of geos --- include/util/tgeosctx.h | 8 +- source/client/src/clientMain.c | 2 + source/dnode/mgmt/node_mgmt/src/dmMgmt.c | 2 + source/libs/executor/src/sysscanoperator.c | 2 +- source/libs/geometry/src/geosWrapper.c | 6 +- source/libs/parser/src/parInsertSql.c | 4 +- source/libs/scalar/src/sclvector.c | 8 +- source/util/src/tgeosctx.c | 111 ++++++++++++++++----- source/util/src/tsched.c | 1 - source/util/src/tworker.c | 2 - tools/shell/src/shellEngine.c | 5 +- 11 files changed, 109 insertions(+), 42 deletions(-) diff --git a/include/util/tgeosctx.h b/include/util/tgeosctx.h index 267ba9e049..a4355db29a 100644 --- a/include/util/tgeosctx.h +++ b/include/util/tgeosctx.h @@ -32,14 +32,16 @@ typedef struct SGeosContext { GEOSWKBReader *WKBReader; GEOSWKBWriter *WKBWriter; - pcre2_code *WKTRegex; + pcre2_code *WKTRegex; pcre2_match_data *WKTMatchData; char errMsg[512]; } SGeosContext; -SGeosContext* getThreadLocalGeosCtx(); -void destroyThreadLocalGeosCtx(); +SGeosContext *acquireThreadLocalGeosCtx(); +SGeosContext *getThreadLocalGeosCtx(); +const char *getGeosErrMsg(int32_t code); +void taosGeosDestroy(); #ifdef __cplusplus } diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index de56a4844a..996aef96ac 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -23,6 +23,7 @@ #include "query.h" #include "scheduler.h" #include "tdatablock.h" +#include "tgeosctx.h" #include "tglobal.h" #include "tmsg.h" #include "tref.h" @@ -86,6 +87,7 @@ void taos_cleanup(void) { tscDebug("rpc cleanup"); taosConvDestroy(); + taosGeosDestroy(); tmqMgmtClose(); diff --git a/source/dnode/mgmt/node_mgmt/src/dmMgmt.c b/source/dnode/mgmt/node_mgmt/src/dmMgmt.c index fdce9fd4c9..ae6efa7af4 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmMgmt.c +++ b/source/dnode/mgmt/node_mgmt/src/dmMgmt.c @@ -19,6 +19,7 @@ #include "index.h" #include "qworker.h" #include "tcompression.h" +#include "tgeosctx.h" #include "tglobal.h" #include "tgrant.h" #include "tstream.h" @@ -121,6 +122,7 @@ void dmCleanupDnode(SDnode *pDnode) { streamMetaCleanup(); indexCleanup(); taosConvDestroy(); + taosGeosDestroy(); // compress destroy tsCompressExit(); diff --git a/source/libs/executor/src/sysscanoperator.c b/source/libs/executor/src/sysscanoperator.c index d8a2331980..082d4e7789 100644 --- a/source/libs/executor/src/sysscanoperator.c +++ b/source/libs/executor/src/sysscanoperator.c @@ -979,7 +979,7 @@ static int32_t sysTableGetGeomText(char* iGeom, int32_t nGeom, char** output, in if (TSDB_CODE_SUCCESS != (code = initCtxAsText()) || TSDB_CODE_SUCCESS != (code = doAsText(iGeom, nGeom, &outputWKT))) { - qError("geo text for systable failed:%s", getThreadLocalGeosCtx()->errMsg); + qError("geo text for systable failed:%s", getGeosErrMsg(code)); *output = NULL; *nOutput = 0; return code; diff --git a/source/libs/geometry/src/geosWrapper.c b/source/libs/geometry/src/geosWrapper.c index dde34edc91..d83fc3b3a8 100644 --- a/source/libs/geometry/src/geosWrapper.c +++ b/source/libs/geometry/src/geosWrapper.c @@ -23,7 +23,8 @@ typedef char (*_geosPreparedRelationFunc_t)(GEOSContextHandle_t handle, const GE void geosFreeBuffer(void *buffer) { if (buffer) { - GEOSFree_r(getThreadLocalGeosCtx()->handle, buffer); + SGeosContext *pCtx = acquireThreadLocalGeosCtx(); + if (pCtx) GEOSFree_r(pCtx->handle, buffer); } } @@ -418,7 +419,8 @@ int32_t readGeometry(const unsigned char *input, GEOSGeometry **outputGeom, } void destroyGeometry(GEOSGeometry **geom, const GEOSPreparedGeometry **preparedGeom) { - SGeosContext *geosCtx = getThreadLocalGeosCtx(); + SGeosContext *geosCtx = acquireThreadLocalGeosCtx(); + if (!geosCtx) return; if (preparedGeom && *preparedGeom) { GEOSPreparedGeom_destroy_r(geosCtx->handle, *preparedGeom); diff --git a/source/libs/parser/src/parInsertSql.c b/source/libs/parser/src/parInsertSql.c index cb94cd42f7..aa6116287e 100644 --- a/source/libs/parser/src/parInsertSql.c +++ b/source/libs/parser/src/parInsertSql.c @@ -655,7 +655,7 @@ static int32_t parseTagToken(const char** end, SToken* pToken, SSchema* pSchema, code = parseGeometry(pToken, &output, &size); if (code != TSDB_CODE_SUCCESS) { - code = buildSyntaxErrMsg(pMsgBuf, getThreadLocalGeosCtx()->errMsg, pToken->z); + code = buildSyntaxErrMsg(pMsgBuf, getGeosErrMsg(code), pToken->z); } else if (size + VARSTR_HEADER_SIZE > pSchema->bytes) { // Too long values will raise the invalid sql error message code = generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_VALUE_TOO_LONG, pSchema->name); @@ -1646,7 +1646,7 @@ static int32_t parseValueTokenImpl(SInsertParseContext* pCxt, const char** pSql, code = parseGeometry(pToken, &output, &size); if (code != TSDB_CODE_SUCCESS) { - code = buildSyntaxErrMsg(&pCxt->msg, getThreadLocalGeosCtx()->errMsg, pToken->z); + code = buildSyntaxErrMsg(&pCxt->msg, getGeosErrMsg(code), pToken->z); } // Too long values will raise the invalid sql error message else if (size + VARSTR_HEADER_SIZE > pSchema->bytes) { diff --git a/source/libs/scalar/src/sclvector.c b/source/libs/scalar/src/sclvector.c index 5556108a52..ead6053505 100644 --- a/source/libs/scalar/src/sclvector.c +++ b/source/libs/scalar/src/sclvector.c @@ -446,12 +446,12 @@ static FORCE_INLINE int32_t varToGeometry(char *buf, SScalarParam *pOut, int32_t unsigned char *t = NULL; char *output = NULL; - if (initCtxGeomFromText()) { - sclError("failed to init geometry ctx, %s", getThreadLocalGeosCtx()->errMsg); + if ((code = initCtxGeomFromText()) != 0) { + sclError("failed to init geometry ctx, %s", getGeosErrMsg(code)); SCL_ERR_JRET(TSDB_CODE_APP_ERROR); } - if (doGeomFromText(buf, &t, &len)) { - sclInfo("failed to convert text to geometry, %s", getThreadLocalGeosCtx()->errMsg); + if ((code = doGeomFromText(buf, &t, &len)) != 0) { + sclInfo("failed to convert text to geometry, %s", getGeosErrMsg(code)); SCL_ERR_JRET(TSDB_CODE_SCALAR_CONVERT_ERROR); } diff --git a/source/util/src/tgeosctx.c b/source/util/src/tgeosctx.c index a05734c911..82a360edd1 100644 --- a/source/util/src/tgeosctx.c +++ b/source/util/src/tgeosctx.c @@ -14,39 +14,102 @@ */ #include "tgeosctx.h" +#include "tarray.h" #include "tdef.h" +#include "tlockfree.h" +#include "tlog.h" -static threadlocal SGeosContext tlGeosCtx = {0}; +#define GEOS_POOL_CAPACITY 64 +typedef struct { + SArray *poolArray; // totalSize: (GEOS_POOL_CAPACITY * (taosArrayGetSize(poolArray) - 1)) + size + SGeosContext *pool; // current SGeosContext pool + int32_t size; // size of current SGeosContext pool, size <= GEOS_POOL_CAPACITY + SRWLatch lock; +} SGeosContextPool; -SGeosContext* getThreadLocalGeosCtx() { return &tlGeosCtx; } +static SGeosContextPool sGeosPool = {0}; +static threadlocal SGeosContext *tlGeosCtx = NULL; -void destroyThreadLocalGeosCtx() { - if (tlGeosCtx.WKTReader) { - GEOSWKTReader_destroy_r(tlGeosCtx.handle, tlGeosCtx.WKTReader); - tlGeosCtx.WKTReader = NULL; +SGeosContext *acquireThreadLocalGeosCtx() { return tlGeosCtx; } + +SGeosContext *getThreadLocalGeosCtx() { + if (tlGeosCtx) return tlGeosCtx; + + taosWLockLatch(&sGeosPool.lock); + if (!sGeosPool.pool || sGeosPool.size >= GEOS_POOL_CAPACITY) { + if (!(sGeosPool.pool = (SGeosContext *)taosMemoryCalloc(GEOS_POOL_CAPACITY, sizeof(SGeosContext)))) { + taosWUnLockLatch(&sGeosPool.lock); + return NULL; + } + if (!sGeosPool.poolArray) { + if (!(sGeosPool.poolArray = taosArrayInit(16, POINTER_BYTES))) { + taosMemoryFreeClear(sGeosPool.pool); + taosWUnLockLatch(&sGeosPool.lock); + return NULL; + } + } + if (!taosArrayPush(sGeosPool.poolArray, &sGeosPool.pool)) { + taosMemoryFreeClear(sGeosPool.pool); + taosWUnLockLatch(&sGeosPool.lock); + return NULL; + } + sGeosPool.size = 0; } + tlGeosCtx = sGeosPool.pool + sGeosPool.size; + ++sGeosPool.size; + taosWUnLockLatch(&sGeosPool.lock); - if (tlGeosCtx.WKTWriter) { - GEOSWKTWriter_destroy_r(tlGeosCtx.handle, tlGeosCtx.WKTWriter); - tlGeosCtx.WKTWriter = NULL; - } + return tlGeosCtx; +} - if (tlGeosCtx.WKBReader) { - GEOSWKBReader_destroy_r(tlGeosCtx.handle, tlGeosCtx.WKBReader); - tlGeosCtx.WKBReader = NULL; - } +const char *getGeosErrMsg(int32_t code) { return tlGeosCtx ? tlGeosCtx->errMsg : (code != 0 ? tstrerror(code) : ""); } - if (tlGeosCtx.WKBWriter) { - GEOSWKBWriter_destroy_r(tlGeosCtx.handle, tlGeosCtx.WKBWriter); - tlGeosCtx.WKBWriter = NULL; - } +static void destroyGeosCtx(SGeosContext *pCtx) { + if (pCtx) { + if (pCtx->WKTReader) { + GEOSWKTReader_destroy_r(pCtx->handle, pCtx->WKTReader); + pCtx->WKTReader = NULL; + } - if (tlGeosCtx.WKTRegex) { - destroyRegexes(tlGeosCtx.WKTRegex, tlGeosCtx.WKTMatchData); - } + if (pCtx->WKTWriter) { + GEOSWKTWriter_destroy_r(pCtx->handle, pCtx->WKTWriter); + pCtx->WKTWriter = NULL; + } - if (tlGeosCtx.handle) { - GEOS_finish_r(tlGeosCtx.handle); - tlGeosCtx.handle = NULL; + if (pCtx->WKBReader) { + GEOSWKBReader_destroy_r(pCtx->handle, pCtx->WKBReader); + pCtx->WKBReader = NULL; + } + + if (pCtx->WKBWriter) { + GEOSWKBWriter_destroy_r(pCtx->handle, pCtx->WKBWriter); + pCtx->WKBWriter = NULL; + } + + if (pCtx->WKTRegex) { + destroyRegexes(pCtx->WKTRegex, pCtx->WKTMatchData); + pCtx->WKTRegex = NULL; + pCtx->WKTMatchData = NULL; + } + + if (pCtx->handle) { + GEOS_finish_r(pCtx->handle); + pCtx->handle = NULL; + } } } + +void taosGeosDestroy() { + uInfo("geos is cleaned up"); + int32_t size = taosArrayGetSize(sGeosPool.poolArray); + for (int32_t i = 0; i < size; ++i) { + SGeosContext *pool = *(SGeosContext **)TARRAY_GET_ELEM(sGeosPool.poolArray, i); + int32_t poolSize = i == size - 1 ? sGeosPool.size : GEOS_POOL_CAPACITY; + for (int32_t j = 0; j < poolSize; ++j) { + destroyGeosCtx(pool + j); + } + taosMemoryFree(pool); + } + taosArrayDestroy(sGeosPool.poolArray); + sGeosPool.poolArray = NULL; +} \ No newline at end of file diff --git a/source/util/src/tsched.c b/source/util/src/tsched.c index 6779e8dee5..34c74660fc 100644 --- a/source/util/src/tsched.c +++ b/source/util/src/tsched.c @@ -178,7 +178,6 @@ void *taosProcessSchedQueue(void *scheduler) { (*(msg.tfp))(msg.ahandle, msg.thandle); } - destroyThreadLocalGeosCtx(); return NULL; } diff --git a/source/util/src/tworker.c b/source/util/src/tworker.c index b2064d6787..ebec134c91 100644 --- a/source/util/src/tworker.c +++ b/source/util/src/tworker.c @@ -105,7 +105,6 @@ static void *tQWorkerThreadFp(SQueueWorker *worker) { taosUpdateItemSize(qinfo.queue, 1); } - destroyThreadLocalGeosCtx(); DestoryThreadLocalRegComp(); return NULL; @@ -665,7 +664,6 @@ static void *tQueryAutoQWorkerThreadFp(SQueryAutoQWorker *worker) { } } - destroyThreadLocalGeosCtx(); DestoryThreadLocalRegComp(); return NULL; diff --git a/tools/shell/src/shellEngine.c b/tools/shell/src/shellEngine.c index 0ccbd683dc..8f3e4d0d8d 100644 --- a/tools/shell/src/shellEngine.c +++ b/tools/shell/src/shellEngine.c @@ -611,14 +611,14 @@ void shellPrintGeometry(const unsigned char *val, int32_t length, int32_t width) code = initCtxAsText(); if (code != TSDB_CODE_SUCCESS) { - shellPrintString(getThreadLocalGeosCtx()->errMsg, width); + shellPrintString(destroyThreadLocalGeosCtx(), width); return; } char *outputWKT = NULL; code = doAsText(val, length, &outputWKT); if (code != TSDB_CODE_SUCCESS) { - shellPrintString(getThreadLocalGeosCtx()->errMsg, width); // should NOT happen + shellPrintString(destroyThreadLocalGeosCtx(), width); // should NOT happen return; } @@ -1282,7 +1282,6 @@ void *shellThreadLoop(void *arg) { taosResetTerminalMode(); } while (shellRunCommand(command, true) == 0); - destroyThreadLocalGeosCtx(); taosMemoryFreeClear(command); shellWriteHistory(); shellExit(); From 3dcd7ac371dfedc3de054e486fd5ea6d39bb4fb1 Mon Sep 17 00:00:00 2001 From: kailixu Date: Sat, 24 Aug 2024 09:56:27 +0800 Subject: [PATCH 02/17] fix: memory leak of geos --- include/util/tgeosctx.h | 3 +- source/client/src/clientMain.c | 1 - source/dnode/mgmt/node_mgmt/src/dmMgmt.c | 1 - source/libs/geometry/src/geosWrapper.c | 39 ++++-- source/util/src/tgeosctx.c | 148 ++++++++++------------- tools/shell/src/shellEngine.c | 4 +- 6 files changed, 95 insertions(+), 101 deletions(-) diff --git a/include/util/tgeosctx.h b/include/util/tgeosctx.h index a4355db29a..6c27d24dac 100644 --- a/include/util/tgeosctx.h +++ b/include/util/tgeosctx.h @@ -39,9 +39,8 @@ typedef struct SGeosContext { } SGeosContext; SGeosContext *acquireThreadLocalGeosCtx(); -SGeosContext *getThreadLocalGeosCtx(); +int32_t getThreadLocalGeosCtx(SGeosContext **ppCtx); const char *getGeosErrMsg(int32_t code); -void taosGeosDestroy(); #ifdef __cplusplus } diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 0d33ef955c..105072eb9b 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -87,7 +87,6 @@ void taos_cleanup(void) { tscDebug("rpc cleanup"); taosConvDestroy(); - taosGeosDestroy(); tmqMgmtClose(); diff --git a/source/dnode/mgmt/node_mgmt/src/dmMgmt.c b/source/dnode/mgmt/node_mgmt/src/dmMgmt.c index ae6efa7af4..a593a36049 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmMgmt.c +++ b/source/dnode/mgmt/node_mgmt/src/dmMgmt.c @@ -122,7 +122,6 @@ void dmCleanupDnode(SDnode *pDnode) { streamMetaCleanup(); indexCleanup(); taosConvDestroy(); - taosGeosDestroy(); // compress destroy tsCompressExit(); diff --git a/source/libs/geometry/src/geosWrapper.c b/source/libs/geometry/src/geosWrapper.c index d83fc3b3a8..b55ece4037 100644 --- a/source/libs/geometry/src/geosWrapper.c +++ b/source/libs/geometry/src/geosWrapper.c @@ -14,7 +14,7 @@ */ #include "geosWrapper.h" -#include "tdef.h" +#include "tutil.h" #include "types.h" typedef char (*_geosRelationFunc_t)(GEOSContextHandle_t handle, const GEOSGeometry *g1, const GEOSGeometry *g2); @@ -35,7 +35,9 @@ void geosErrMsgeHandler(const char *errMsg, void *userData) { int32_t initCtxMakePoint() { int32_t code = TSDB_CODE_FAILED; - SGeosContext *geosCtx = getThreadLocalGeosCtx(); + SGeosContext *geosCtx = NULL; + + TAOS_CHECK_RETURN(getThreadLocalGeosCtx(&geosCtx)); if (geosCtx->handle == NULL) { geosCtx->handle = GEOS_init_r(); @@ -60,7 +62,9 @@ int32_t initCtxMakePoint() { // need to call geosFreeBuffer(*outputGeom) later int32_t doMakePoint(double x, double y, unsigned char **outputGeom, size_t *size) { int32_t code = TSDB_CODE_FAILED; - SGeosContext *geosCtx = getThreadLocalGeosCtx(); + SGeosContext *geosCtx = NULL; + + TAOS_CHECK_RETURN(getThreadLocalGeosCtx(&geosCtx)); GEOSGeometry *geom = NULL; unsigned char *wkb = NULL; @@ -165,7 +169,9 @@ static int32_t initWktRegex(pcre2_code **ppRegex, pcre2_match_data **ppMatchData int32_t initCtxGeomFromText() { int32_t code = TSDB_CODE_FAILED; - SGeosContext *geosCtx = getThreadLocalGeosCtx(); + SGeosContext *geosCtx = NULL; + + TAOS_CHECK_RETURN(getThreadLocalGeosCtx(&geosCtx)); if (geosCtx->handle == NULL) { geosCtx->handle = GEOS_init_r(); @@ -201,7 +207,9 @@ int32_t initCtxGeomFromText() { // need to call geosFreeBuffer(*outputGeom) later int32_t doGeomFromText(const char *inputWKT, unsigned char **outputGeom, size_t *size) { int32_t code = TSDB_CODE_FAILED; - SGeosContext *geosCtx = getThreadLocalGeosCtx(); + SGeosContext *geosCtx = NULL; + + TAOS_CHECK_RETURN(getThreadLocalGeosCtx(&geosCtx)); GEOSGeometry *geom = NULL; unsigned char *wkb = NULL; @@ -236,7 +244,9 @@ _exit: int32_t initCtxAsText() { int32_t code = TSDB_CODE_FAILED; - SGeosContext *geosCtx = getThreadLocalGeosCtx(); + SGeosContext *geosCtx = NULL; + + TAOS_CHECK_RETURN(getThreadLocalGeosCtx(&geosCtx)); if (geosCtx->handle == NULL) { geosCtx->handle = GEOS_init_r(); @@ -272,7 +282,9 @@ int32_t initCtxAsText() { // need to call geosFreeBuffer(*outputWKT) later int32_t doAsText(const unsigned char *inputGeom, size_t size, char **outputWKT) { int32_t code = TSDB_CODE_FAILED; - SGeosContext *geosCtx = getThreadLocalGeosCtx(); + SGeosContext *geosCtx = NULL; + + TAOS_CHECK_RETURN(getThreadLocalGeosCtx(&geosCtx)); GEOSGeometry *geom = NULL; char *wkt = NULL; @@ -303,7 +315,9 @@ _exit: int32_t initCtxRelationFunc() { int32_t code = TSDB_CODE_FAILED; - SGeosContext *geosCtx = getThreadLocalGeosCtx(); + SGeosContext *geosCtx = NULL; + + TAOS_CHECK_RETURN(getThreadLocalGeosCtx(&geosCtx)); if (geosCtx->handle == NULL) { geosCtx->handle = GEOS_init_r(); @@ -328,7 +342,9 @@ int32_t doGeosRelation(const GEOSGeometry *geom1, const GEOSPreparedGeometry *pr bool swapped, char *res, _geosRelationFunc_t relationFn, _geosRelationFunc_t swappedRelationFn, _geosPreparedRelationFunc_t preparedRelationFn, _geosPreparedRelationFunc_t swappedPreparedRelationFn) { - SGeosContext *geosCtx = getThreadLocalGeosCtx(); + SGeosContext *geosCtx = NULL; + + TAOS_CHECK_RETURN(getThreadLocalGeosCtx(&geosCtx)); if (!preparedGeom1) { if (!swapped) { @@ -390,8 +406,6 @@ int32_t doContainsProperly(const GEOSGeometry *geom1, const GEOSPreparedGeometry // need to call destroyGeometry(outputGeom, outputPreparedGeom) later int32_t readGeometry(const unsigned char *input, GEOSGeometry **outputGeom, const GEOSPreparedGeometry **outputPreparedGeom) { - SGeosContext *geosCtx = getThreadLocalGeosCtx(); - ASSERT(outputGeom); // it is not allowed if outputGeom is NULL *outputGeom = NULL; @@ -403,6 +417,9 @@ int32_t readGeometry(const unsigned char *input, GEOSGeometry **outputGeom, return TSDB_CODE_SUCCESS; } + SGeosContext *geosCtx = NULL; + TAOS_CHECK_RETURN(getThreadLocalGeosCtx(&geosCtx)); + *outputGeom = GEOSWKBReader_read_r(geosCtx->handle, geosCtx->WKBReader, varDataVal(input), varDataLen(input)); if (*outputGeom == NULL) { return TSDB_CODE_FUNC_FUNTION_PARA_VALUE; diff --git a/source/util/src/tgeosctx.c b/source/util/src/tgeosctx.c index 82a360edd1..2d083284e7 100644 --- a/source/util/src/tgeosctx.c +++ b/source/util/src/tgeosctx.c @@ -14,102 +14,82 @@ */ #include "tgeosctx.h" -#include "tarray.h" -#include "tdef.h" -#include "tlockfree.h" #include "tlog.h" +#include "tutil.h" -#define GEOS_POOL_CAPACITY 64 -typedef struct { - SArray *poolArray; // totalSize: (GEOS_POOL_CAPACITY * (taosArrayGetSize(poolArray) - 1)) + size - SGeosContext *pool; // current SGeosContext pool - int32_t size; // size of current SGeosContext pool, size <= GEOS_POOL_CAPACITY - SRWLatch lock; -} SGeosContextPool; - -static SGeosContextPool sGeosPool = {0}; +static threadlocal TdThreadKey tlGeosCtxKey = 0; +static threadlocal SGeosContext tlGeosCtxObj = {0}; static threadlocal SGeosContext *tlGeosCtx = NULL; +static void destroyThreadLocalGeosCtx(); + SGeosContext *acquireThreadLocalGeosCtx() { return tlGeosCtx; } -SGeosContext *getThreadLocalGeosCtx() { - if (tlGeosCtx) return tlGeosCtx; - - taosWLockLatch(&sGeosPool.lock); - if (!sGeosPool.pool || sGeosPool.size >= GEOS_POOL_CAPACITY) { - if (!(sGeosPool.pool = (SGeosContext *)taosMemoryCalloc(GEOS_POOL_CAPACITY, sizeof(SGeosContext)))) { - taosWUnLockLatch(&sGeosPool.lock); - return NULL; - } - if (!sGeosPool.poolArray) { - if (!(sGeosPool.poolArray = taosArrayInit(16, POINTER_BYTES))) { - taosMemoryFreeClear(sGeosPool.pool); - taosWUnLockLatch(&sGeosPool.lock); - return NULL; - } - } - if (!taosArrayPush(sGeosPool.poolArray, &sGeosPool.pool)) { - taosMemoryFreeClear(sGeosPool.pool); - taosWUnLockLatch(&sGeosPool.lock); - return NULL; - } - sGeosPool.size = 0; +int32_t getThreadLocalGeosCtx(SGeosContext **ppCtx) { + if (tlGeosCtx) { + *ppCtx = tlGeosCtx; + return 0; } - tlGeosCtx = sGeosPool.pool + sGeosPool.size; - ++sGeosPool.size; - taosWUnLockLatch(&sGeosPool.lock); - return tlGeosCtx; + int32_t code = 0, lino = 0; + if ((taosThreadKeyCreate(&tlGeosCtxKey, destroyThreadLocalGeosCtx)) != 0) { + TAOS_CHECK_EXIT(TAOS_SYSTEM_ERROR(errno)); + } + + if ((taosThreadSetSpecific(tlGeosCtxKey, &tlGeosCtxObj)) != 0) { + TAOS_CHECK_EXIT(TAOS_SYSTEM_ERROR(errno)); + } + + tlGeosCtx = taosThreadGetSpecific(tlGeosCtxKey); + if (tlGeosCtx == NULL) { + if (errno) { + TAOS_CHECK_EXIT(TAOS_SYSTEM_ERROR(errno)); + } else { + TAOS_CHECK_EXIT(TSDB_CODE_NOT_FOUND); + } + } + *ppCtx = tlGeosCtx; +_exit: + if (code != 0) { + *ppCtx = NULL; + uError("failed to get thread local geos context at lino:%d since %s", lino, tstrerror(code)); + } + + TAOS_RETURN(code); } -const char *getGeosErrMsg(int32_t code) { return tlGeosCtx ? tlGeosCtx->errMsg : (code != 0 ? tstrerror(code) : ""); } +const char *getGeosErrMsg(int32_t code) { return tlGeosCtx ? tlGeosCtx->errMsg : code ? strerror(code) : ""; } -static void destroyGeosCtx(SGeosContext *pCtx) { - if (pCtx) { - if (pCtx->WKTReader) { - GEOSWKTReader_destroy_r(pCtx->handle, pCtx->WKTReader); - pCtx->WKTReader = NULL; - } - - if (pCtx->WKTWriter) { - GEOSWKTWriter_destroy_r(pCtx->handle, pCtx->WKTWriter); - pCtx->WKTWriter = NULL; - } - - if (pCtx->WKBReader) { - GEOSWKBReader_destroy_r(pCtx->handle, pCtx->WKBReader); - pCtx->WKBReader = NULL; - } - - if (pCtx->WKBWriter) { - GEOSWKBWriter_destroy_r(pCtx->handle, pCtx->WKBWriter); - pCtx->WKBWriter = NULL; - } - - if (pCtx->WKTRegex) { - destroyRegexes(pCtx->WKTRegex, pCtx->WKTMatchData); - pCtx->WKTRegex = NULL; - pCtx->WKTMatchData = NULL; - } - - if (pCtx->handle) { - GEOS_finish_r(pCtx->handle); - pCtx->handle = NULL; - } +static void destroyThreadLocalGeosCtx(void *param) { + SGeosContext *tlGeosCtx = &tlGeosCtxObj; + if (tlGeosCtx->WKTReader) { + GEOSWKTReader_destroy_r(tlGeosCtx->handle, tlGeosCtx->WKTReader); + tlGeosCtx->WKTReader = NULL; } -} -void taosGeosDestroy() { - uInfo("geos is cleaned up"); - int32_t size = taosArrayGetSize(sGeosPool.poolArray); - for (int32_t i = 0; i < size; ++i) { - SGeosContext *pool = *(SGeosContext **)TARRAY_GET_ELEM(sGeosPool.poolArray, i); - int32_t poolSize = i == size - 1 ? sGeosPool.size : GEOS_POOL_CAPACITY; - for (int32_t j = 0; j < poolSize; ++j) { - destroyGeosCtx(pool + j); - } - taosMemoryFree(pool); + if (tlGeosCtx->WKTWriter) { + GEOSWKTWriter_destroy_r(tlGeosCtx->handle, tlGeosCtx->WKTWriter); + tlGeosCtx->WKTWriter = NULL; + } + + if (tlGeosCtx->WKBReader) { + GEOSWKBReader_destroy_r(tlGeosCtx->handle, tlGeosCtx->WKBReader); + tlGeosCtx->WKBReader = NULL; + } + + if (tlGeosCtx->WKBWriter) { + GEOSWKBWriter_destroy_r(tlGeosCtx->handle, tlGeosCtx->WKBWriter); + tlGeosCtx->WKBWriter = NULL; + } + + if (tlGeosCtx->WKTRegex) { + destroyRegexes(tlGeosCtx->WKTRegex, tlGeosCtx->WKTMatchData); + tlGeosCtx->WKTRegex = NULL; + tlGeosCtx->WKTMatchData = NULL; + } + + if (tlGeosCtx->handle) { + GEOS_finish_r(tlGeosCtx->handle); + tlGeosCtx->handle = NULL; } - taosArrayDestroy(sGeosPool.poolArray); - sGeosPool.poolArray = NULL; } \ No newline at end of file diff --git a/tools/shell/src/shellEngine.c b/tools/shell/src/shellEngine.c index 3f638197c0..7fdfd29542 100644 --- a/tools/shell/src/shellEngine.c +++ b/tools/shell/src/shellEngine.c @@ -613,14 +613,14 @@ void shellPrintGeometry(const unsigned char *val, int32_t length, int32_t width) code = initCtxAsText(); if (code != TSDB_CODE_SUCCESS) { - shellPrintString(destroyThreadLocalGeosCtx(), width); + shellPrintString(getGeosErrMsg(code), width); return; } char *outputWKT = NULL; code = doAsText(val, length, &outputWKT); if (code != TSDB_CODE_SUCCESS) { - shellPrintString(destroyThreadLocalGeosCtx(), width); // should NOT happen + shellPrintString(getGeosErrMsg(code), width); // should NOT happen return; } From 95fd9f1f6f322156ecb81b433e1b153e549fdd42 Mon Sep 17 00:00:00 2001 From: kailixu Date: Sat, 24 Aug 2024 09:59:39 +0800 Subject: [PATCH 03/17] fix: memory leak of geos --- source/util/src/tgeosctx.c | 70 ++++++++++++++++++-------------------- 1 file changed, 34 insertions(+), 36 deletions(-) diff --git a/source/util/src/tgeosctx.c b/source/util/src/tgeosctx.c index 2d083284e7..e0986ad18c 100644 --- a/source/util/src/tgeosctx.c +++ b/source/util/src/tgeosctx.c @@ -21,7 +21,39 @@ static threadlocal TdThreadKey tlGeosCtxKey = 0; static threadlocal SGeosContext tlGeosCtxObj = {0}; static threadlocal SGeosContext *tlGeosCtx = NULL; -static void destroyThreadLocalGeosCtx(); +static void destroyThreadLocalGeosCtx(void *param) { + SGeosContext *tlGeosCtx = &tlGeosCtxObj; + if (tlGeosCtx->WKTReader) { + GEOSWKTReader_destroy_r(tlGeosCtx->handle, tlGeosCtx->WKTReader); + tlGeosCtx->WKTReader = NULL; + } + + if (tlGeosCtx->WKTWriter) { + GEOSWKTWriter_destroy_r(tlGeosCtx->handle, tlGeosCtx->WKTWriter); + tlGeosCtx->WKTWriter = NULL; + } + + if (tlGeosCtx->WKBReader) { + GEOSWKBReader_destroy_r(tlGeosCtx->handle, tlGeosCtx->WKBReader); + tlGeosCtx->WKBReader = NULL; + } + + if (tlGeosCtx->WKBWriter) { + GEOSWKBWriter_destroy_r(tlGeosCtx->handle, tlGeosCtx->WKBWriter); + tlGeosCtx->WKBWriter = NULL; + } + + if (tlGeosCtx->WKTRegex) { + destroyRegexes(tlGeosCtx->WKTRegex, tlGeosCtx->WKTMatchData); + tlGeosCtx->WKTRegex = NULL; + tlGeosCtx->WKTMatchData = NULL; + } + + if (tlGeosCtx->handle) { + GEOS_finish_r(tlGeosCtx->handle); + tlGeosCtx->handle = NULL; + } +} SGeosContext *acquireThreadLocalGeosCtx() { return tlGeosCtx; } @@ -58,38 +90,4 @@ _exit: TAOS_RETURN(code); } -const char *getGeosErrMsg(int32_t code) { return tlGeosCtx ? tlGeosCtx->errMsg : code ? strerror(code) : ""; } - -static void destroyThreadLocalGeosCtx(void *param) { - SGeosContext *tlGeosCtx = &tlGeosCtxObj; - if (tlGeosCtx->WKTReader) { - GEOSWKTReader_destroy_r(tlGeosCtx->handle, tlGeosCtx->WKTReader); - tlGeosCtx->WKTReader = NULL; - } - - if (tlGeosCtx->WKTWriter) { - GEOSWKTWriter_destroy_r(tlGeosCtx->handle, tlGeosCtx->WKTWriter); - tlGeosCtx->WKTWriter = NULL; - } - - if (tlGeosCtx->WKBReader) { - GEOSWKBReader_destroy_r(tlGeosCtx->handle, tlGeosCtx->WKBReader); - tlGeosCtx->WKBReader = NULL; - } - - if (tlGeosCtx->WKBWriter) { - GEOSWKBWriter_destroy_r(tlGeosCtx->handle, tlGeosCtx->WKBWriter); - tlGeosCtx->WKBWriter = NULL; - } - - if (tlGeosCtx->WKTRegex) { - destroyRegexes(tlGeosCtx->WKTRegex, tlGeosCtx->WKTMatchData); - tlGeosCtx->WKTRegex = NULL; - tlGeosCtx->WKTMatchData = NULL; - } - - if (tlGeosCtx->handle) { - GEOS_finish_r(tlGeosCtx->handle); - tlGeosCtx->handle = NULL; - } -} \ No newline at end of file +const char *getGeosErrMsg(int32_t code) { return tlGeosCtx ? tlGeosCtx->errMsg : code ? strerror(code) : ""; } \ No newline at end of file From 11d25078c0154f43e1279d756c02f7a8f1d0a85b Mon Sep 17 00:00:00 2001 From: kailixu Date: Sat, 24 Aug 2024 10:04:32 +0800 Subject: [PATCH 04/17] fix: memory leak of geos --- source/util/src/tgeosctx.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/util/src/tgeosctx.c b/source/util/src/tgeosctx.c index e0986ad18c..bfaed88671 100644 --- a/source/util/src/tgeosctx.c +++ b/source/util/src/tgeosctx.c @@ -84,7 +84,7 @@ int32_t getThreadLocalGeosCtx(SGeosContext **ppCtx) { _exit: if (code != 0) { *ppCtx = NULL; - uError("failed to get thread local geos context at lino:%d since %s", lino, tstrerror(code)); + uError("failed to get geos context at lino:%d since %s", lino, tstrerror(code)); } TAOS_RETURN(code); From 6cd7fd0288f901f853a4b820348f86e3fa10c14e Mon Sep 17 00:00:00 2001 From: kailixu Date: Sat, 24 Aug 2024 10:19:16 +0800 Subject: [PATCH 05/17] fix: memory leak of geos --- source/client/src/clientMain.c | 1 - source/dnode/mgmt/node_mgmt/src/dmMgmt.c | 1 - source/libs/geometry/src/geosWrapper.c | 1 - source/util/src/tgeosctx.c | 10 +--------- 4 files changed, 1 insertion(+), 12 deletions(-) diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 105072eb9b..55401d7eb2 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -24,7 +24,6 @@ #include "scheduler.h" #include "tcompare.h" #include "tdatablock.h" -#include "tgeosctx.h" #include "tglobal.h" #include "tmsg.h" #include "tref.h" diff --git a/source/dnode/mgmt/node_mgmt/src/dmMgmt.c b/source/dnode/mgmt/node_mgmt/src/dmMgmt.c index a593a36049..fdce9fd4c9 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmMgmt.c +++ b/source/dnode/mgmt/node_mgmt/src/dmMgmt.c @@ -19,7 +19,6 @@ #include "index.h" #include "qworker.h" #include "tcompression.h" -#include "tgeosctx.h" #include "tglobal.h" #include "tgrant.h" #include "tstream.h" diff --git a/source/libs/geometry/src/geosWrapper.c b/source/libs/geometry/src/geosWrapper.c index b55ece4037..b362faa28c 100644 --- a/source/libs/geometry/src/geosWrapper.c +++ b/source/libs/geometry/src/geosWrapper.c @@ -419,7 +419,6 @@ int32_t readGeometry(const unsigned char *input, GEOSGeometry **outputGeom, SGeosContext *geosCtx = NULL; TAOS_CHECK_RETURN(getThreadLocalGeosCtx(&geosCtx)); - *outputGeom = GEOSWKBReader_read_r(geosCtx->handle, geosCtx->WKBReader, varDataVal(input), varDataLen(input)); if (*outputGeom == NULL) { return TSDB_CODE_FUNC_FUNTION_PARA_VALUE; diff --git a/source/util/src/tgeosctx.c b/source/util/src/tgeosctx.c index bfaed88671..e8015f691a 100644 --- a/source/util/src/tgeosctx.c +++ b/source/util/src/tgeosctx.c @@ -72,21 +72,13 @@ int32_t getThreadLocalGeosCtx(SGeosContext **ppCtx) { TAOS_CHECK_EXIT(TAOS_SYSTEM_ERROR(errno)); } - tlGeosCtx = taosThreadGetSpecific(tlGeosCtxKey); - if (tlGeosCtx == NULL) { - if (errno) { - TAOS_CHECK_EXIT(TAOS_SYSTEM_ERROR(errno)); - } else { - TAOS_CHECK_EXIT(TSDB_CODE_NOT_FOUND); - } - } + tlGeosCtx = &tlGeosCtxObj; *ppCtx = tlGeosCtx; _exit: if (code != 0) { *ppCtx = NULL; uError("failed to get geos context at lino:%d since %s", lino, tstrerror(code)); } - TAOS_RETURN(code); } From 3c7ec707b2ba23e15f587fecc4882f78f0655359 Mon Sep 17 00:00:00 2001 From: kailixu Date: Sat, 24 Aug 2024 10:36:19 +0800 Subject: [PATCH 06/17] fix: memory leak of geos --- source/util/src/tgeosctx.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/source/util/src/tgeosctx.c b/source/util/src/tgeosctx.c index e8015f691a..76a81aa901 100644 --- a/source/util/src/tgeosctx.c +++ b/source/util/src/tgeosctx.c @@ -82,4 +82,6 @@ _exit: TAOS_RETURN(code); } -const char *getGeosErrMsg(int32_t code) { return tlGeosCtx ? tlGeosCtx->errMsg : code ? strerror(code) : ""; } \ No newline at end of file +const char *getGeosErrMsg(int32_t code) { + return (tlGeosCtx && tlGeosCtx->errMsg[0] != 0) ? tlGeosCtx->errMsg : (code ? tstrerror(code) : ""); +} \ No newline at end of file From a4e6444646c1929492184534ee7c3a5b106f9ad6 Mon Sep 17 00:00:00 2001 From: kailixu Date: Sat, 24 Aug 2024 10:37:29 +0800 Subject: [PATCH 07/17] fix: memory leak of geos --- source/util/src/tgeosctx.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/util/src/tgeosctx.c b/source/util/src/tgeosctx.c index 76a81aa901..9248c8cf58 100644 --- a/source/util/src/tgeosctx.c +++ b/source/util/src/tgeosctx.c @@ -84,4 +84,4 @@ _exit: const char *getGeosErrMsg(int32_t code) { return (tlGeosCtx && tlGeosCtx->errMsg[0] != 0) ? tlGeosCtx->errMsg : (code ? tstrerror(code) : ""); -} \ No newline at end of file +} From 825db81bef66005d633d7dbd5402f83acc020319 Mon Sep 17 00:00:00 2001 From: kailixu Date: Sat, 24 Aug 2024 10:43:40 +0800 Subject: [PATCH 08/17] fix: memory leak of geos --- source/util/src/tgeosctx.c | 4 ---- 1 file changed, 4 deletions(-) diff --git a/source/util/src/tgeosctx.c b/source/util/src/tgeosctx.c index 9248c8cf58..41d9bb9ae3 100644 --- a/source/util/src/tgeosctx.c +++ b/source/util/src/tgeosctx.c @@ -68,10 +68,6 @@ int32_t getThreadLocalGeosCtx(SGeosContext **ppCtx) { TAOS_CHECK_EXIT(TAOS_SYSTEM_ERROR(errno)); } - if ((taosThreadSetSpecific(tlGeosCtxKey, &tlGeosCtxObj)) != 0) { - TAOS_CHECK_EXIT(TAOS_SYSTEM_ERROR(errno)); - } - tlGeosCtx = &tlGeosCtxObj; *ppCtx = tlGeosCtx; _exit: From 401673318c9efb5115a9594412e0312051987e88 Mon Sep 17 00:00:00 2001 From: kailixu Date: Sat, 24 Aug 2024 14:30:36 +0800 Subject: [PATCH 09/17] fix: memory leak of geos --- source/util/src/tgeosctx.c | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/source/util/src/tgeosctx.c b/source/util/src/tgeosctx.c index 41d9bb9ae3..9248c8cf58 100644 --- a/source/util/src/tgeosctx.c +++ b/source/util/src/tgeosctx.c @@ -68,6 +68,10 @@ int32_t getThreadLocalGeosCtx(SGeosContext **ppCtx) { TAOS_CHECK_EXIT(TAOS_SYSTEM_ERROR(errno)); } + if ((taosThreadSetSpecific(tlGeosCtxKey, &tlGeosCtxObj)) != 0) { + TAOS_CHECK_EXIT(TAOS_SYSTEM_ERROR(errno)); + } + tlGeosCtx = &tlGeosCtxObj; *ppCtx = tlGeosCtx; _exit: From b456d6cfcb0b07a2f2c6806077de049297d8f614 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Sun, 25 Aug 2024 19:56:47 +0800 Subject: [PATCH 10/17] Update tgeosctx.c --- source/util/src/tgeosctx.c | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/source/util/src/tgeosctx.c b/source/util/src/tgeosctx.c index 9248c8cf58..65c3da2d44 100644 --- a/source/util/src/tgeosctx.c +++ b/source/util/src/tgeosctx.c @@ -19,7 +19,6 @@ static threadlocal TdThreadKey tlGeosCtxKey = 0; static threadlocal SGeosContext tlGeosCtxObj = {0}; -static threadlocal SGeosContext *tlGeosCtx = NULL; static void destroyThreadLocalGeosCtx(void *param) { SGeosContext *tlGeosCtx = &tlGeosCtxObj; @@ -55,11 +54,10 @@ static void destroyThreadLocalGeosCtx(void *param) { } } -SGeosContext *acquireThreadLocalGeosCtx() { return tlGeosCtx; } +SGeosContext *acquireThreadLocalGeosCtx() { return taosThreadGetSpecific(tlGeosCtxKey); } int32_t getThreadLocalGeosCtx(SGeosContext **ppCtx) { - if (tlGeosCtx) { - *ppCtx = tlGeosCtx; + if ((*ppCtx = taosThreadGetSpecific(tlGeosCtxKey))) { return 0; } @@ -72,8 +70,13 @@ int32_t getThreadLocalGeosCtx(SGeosContext **ppCtx) { TAOS_CHECK_EXIT(TAOS_SYSTEM_ERROR(errno)); } - tlGeosCtx = &tlGeosCtxObj; - *ppCtx = tlGeosCtx; + if (!(*ppCtx = taosThreadGetSpecific(tlGeosCtxKey))) { + if (errno != 0) { + TAOS_CHECK_EXIT(TAOS_SYSTEM_ERROR(errno)); + } else { + TAOS_CHECK_EXIT(TSDB_CODE_NOT_FOUND); + } + } _exit: if (code != 0) { *ppCtx = NULL; @@ -83,5 +86,6 @@ _exit: } const char *getGeosErrMsg(int32_t code) { + SGeosContext *tlGeosCtx = taosThreadGetSpecific(tlGeosCtxKey) return (tlGeosCtx && tlGeosCtx->errMsg[0] != 0) ? tlGeosCtx->errMsg : (code ? tstrerror(code) : ""); } From 095fd4efc31e255fc9cedeb4465692b23fb3058a Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Sun, 25 Aug 2024 20:02:29 +0800 Subject: [PATCH 11/17] Update tgeosctx.c --- source/util/src/tgeosctx.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/util/src/tgeosctx.c b/source/util/src/tgeosctx.c index 65c3da2d44..2cef6c0d9f 100644 --- a/source/util/src/tgeosctx.c +++ b/source/util/src/tgeosctx.c @@ -86,6 +86,6 @@ _exit: } const char *getGeosErrMsg(int32_t code) { - SGeosContext *tlGeosCtx = taosThreadGetSpecific(tlGeosCtxKey) + SGeosContext *tlGeosCtx = taosThreadGetSpecific(tlGeosCtxKey); return (tlGeosCtx && tlGeosCtx->errMsg[0] != 0) ? tlGeosCtx->errMsg : (code ? tstrerror(code) : ""); } From d9883ac0578ba8d6b36d9167718f9a5a6099b581 Mon Sep 17 00:00:00 2001 From: kailixu Date: Mon, 26 Aug 2024 09:35:44 +0800 Subject: [PATCH 12/17] fix: memory leak of geos --- source/util/src/tgeosctx.c | 67 ++++++++++++++++++++------------------ 1 file changed, 35 insertions(+), 32 deletions(-) diff --git a/source/util/src/tgeosctx.c b/source/util/src/tgeosctx.c index 2cef6c0d9f..d87dfebaf7 100644 --- a/source/util/src/tgeosctx.c +++ b/source/util/src/tgeosctx.c @@ -18,46 +18,50 @@ #include "tutil.h" static threadlocal TdThreadKey tlGeosCtxKey = 0; -static threadlocal SGeosContext tlGeosCtxObj = {0}; +static threadlocal SGeosContext *tlGeosCtx = NULL; static void destroyThreadLocalGeosCtx(void *param) { - SGeosContext *tlGeosCtx = &tlGeosCtxObj; - if (tlGeosCtx->WKTReader) { - GEOSWKTReader_destroy_r(tlGeosCtx->handle, tlGeosCtx->WKTReader); - tlGeosCtx->WKTReader = NULL; + SGeosContext *pGeosCtx = (SGeosContext *)param; + if (!pGeosCtx) { + return; + } + if (pGeosCtx->WKTReader) { + GEOSWKTReader_destroy_r(pGeosCtx->handle, pGeosCtx->WKTReader); + pGeosCtx->WKTReader = NULL; } - if (tlGeosCtx->WKTWriter) { - GEOSWKTWriter_destroy_r(tlGeosCtx->handle, tlGeosCtx->WKTWriter); - tlGeosCtx->WKTWriter = NULL; + if (pGeosCtx->WKTWriter) { + GEOSWKTWriter_destroy_r(pGeosCtx->handle, pGeosCtx->WKTWriter); + pGeosCtx->WKTWriter = NULL; } - if (tlGeosCtx->WKBReader) { - GEOSWKBReader_destroy_r(tlGeosCtx->handle, tlGeosCtx->WKBReader); - tlGeosCtx->WKBReader = NULL; + if (pGeosCtx->WKBReader) { + GEOSWKBReader_destroy_r(pGeosCtx->handle, pGeosCtx->WKBReader); + pGeosCtx->WKBReader = NULL; } - if (tlGeosCtx->WKBWriter) { - GEOSWKBWriter_destroy_r(tlGeosCtx->handle, tlGeosCtx->WKBWriter); - tlGeosCtx->WKBWriter = NULL; + if (pGeosCtx->WKBWriter) { + GEOSWKBWriter_destroy_r(pGeosCtx->handle, pGeosCtx->WKBWriter); + pGeosCtx->WKBWriter = NULL; } - if (tlGeosCtx->WKTRegex) { - destroyRegexes(tlGeosCtx->WKTRegex, tlGeosCtx->WKTMatchData); - tlGeosCtx->WKTRegex = NULL; - tlGeosCtx->WKTMatchData = NULL; + if (pGeosCtx->WKTRegex) { + destroyRegexes(pGeosCtx->WKTRegex, pGeosCtx->WKTMatchData); + pGeosCtx->WKTRegex = NULL; + pGeosCtx->WKTMatchData = NULL; } - if (tlGeosCtx->handle) { - GEOS_finish_r(tlGeosCtx->handle); - tlGeosCtx->handle = NULL; + if (pGeosCtx->handle) { + GEOS_finish_r(pGeosCtx->handle); + pGeosCtx->handle = NULL; } + taosMemoryFree(pGeosCtx); } -SGeosContext *acquireThreadLocalGeosCtx() { return taosThreadGetSpecific(tlGeosCtxKey); } +SGeosContext *acquireThreadLocalGeosCtx() { return tlGeosCtx; } int32_t getThreadLocalGeosCtx(SGeosContext **ppCtx) { - if ((*ppCtx = taosThreadGetSpecific(tlGeosCtxKey))) { + if ((*ppCtx = tlGeosCtx)) { return 0; } @@ -66,17 +70,17 @@ int32_t getThreadLocalGeosCtx(SGeosContext **ppCtx) { TAOS_CHECK_EXIT(TAOS_SYSTEM_ERROR(errno)); } - if ((taosThreadSetSpecific(tlGeosCtxKey, &tlGeosCtxObj)) != 0) { + SGeosContext *tlGeosCtxObj = (SGeosContext *)taosMemoryCalloc(1, sizeof(SGeosContext)); + if (!tlGeosCtxObj) { + TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY); + } + if ((taosThreadSetSpecific(tlGeosCtxKey, (const void *)tlGeosCtxObj)) != 0) { + taosMemoryFreeClear(tlGeosCtxObj); TAOS_CHECK_EXIT(TAOS_SYSTEM_ERROR(errno)); } - if (!(*ppCtx = taosThreadGetSpecific(tlGeosCtxKey))) { - if (errno != 0) { - TAOS_CHECK_EXIT(TAOS_SYSTEM_ERROR(errno)); - } else { - TAOS_CHECK_EXIT(TSDB_CODE_NOT_FOUND); - } - } + *ppCtx = tlGeosCtx = tlGeosCtxObj; + _exit: if (code != 0) { *ppCtx = NULL; @@ -86,6 +90,5 @@ _exit: } const char *getGeosErrMsg(int32_t code) { - SGeosContext *tlGeosCtx = taosThreadGetSpecific(tlGeosCtxKey); return (tlGeosCtx && tlGeosCtx->errMsg[0] != 0) ? tlGeosCtx->errMsg : (code ? tstrerror(code) : ""); } From 8b2af4f09d68dfa183d2e3681632767dd58cd6cd Mon Sep 17 00:00:00 2001 From: kailixu Date: Wed, 28 Aug 2024 19:12:23 +0800 Subject: [PATCH 13/17] fix: memory leak of geos --- include/os/osThread.h | 10 +++++- source/libs/geometry/test/geomIoFuncTest.cpp | 24 ++++++++------ .../geometry/test/geomRelationFuncTest.cpp | 24 ++++++++------ source/os/src/osThread.c | 32 +++++++++++++++++++ source/util/src/tgeosctx.c | 6 ++++ 5 files changed, 77 insertions(+), 19 deletions(-) diff --git a/include/os/osThread.h b/include/os/osThread.h index 38c1b366f0..f39c01abee 100644 --- a/include/os/osThread.h +++ b/include/os/osThread.h @@ -22,8 +22,12 @@ extern "C" { #endif -#if defined(WINDOWS) && !defined(__USE_PTHREAD) +#if defined(WINDOWS) +#include #include +#endif + +#if defined(WINDOWS) && !defined(__USE_PTHREAD) #define __USE_WIN_THREAD // https://learn.microsoft.com/en-us/windows/win32/winprog/using-the-windows-headers // #ifndef _WIN32_WINNT @@ -275,6 +279,10 @@ int32_t taosThreadSpinUnlock(TdThreadSpinlock *lock); void taosThreadTestCancel(void); void taosThreadClear(TdThread *thread); +#ifdef WINDOWS +bool taosThreadIsMain(); +#endif + #ifdef __cplusplus } #endif diff --git a/source/libs/geometry/test/geomIoFuncTest.cpp b/source/libs/geometry/test/geomIoFuncTest.cpp index 62fc90cb9e..2eff1bf0ea 100644 --- a/source/libs/geometry/test/geomIoFuncTest.cpp +++ b/source/libs/geometry/test/geomIoFuncTest.cpp @@ -320,20 +320,26 @@ void geomIoFuncTestAsTextFunction() { callAsTextWrapper2(TSDB_DATA_TYPE_GEOMETRY, strInput, valTypeArray, 1, TSDB_CODE_FUNC_FUNTION_PARA_VALUE); } -static void *geomIoFuncTest(void *arg) { +static void geomIoFuncTestImpl() { geomIoFuncTestMakePointFunctionTwoColumns(); geomIoFuncTestMakePointFunctionConstant(); geomIoFuncTestMakePointFunctionWithNull(); geomIoFuncTestGeomFromTextFunction(); geomIoFuncTestAsTextFunction(); - - return NULL; } -TEST(GeomIoFuncTest, summary) { - TdThread threadId; - int32_t ret = taosThreadCreate(&threadId, NULL, geomIoFuncTest, 0); - ASSERT_EQ(ret, 0); - ret = taosThreadJoin(threadId, NULL); - ASSERT_EQ(ret, 0); +static void *geomIoFuncTestFunc(void *arg) { + geomIoFuncTestImpl(); + return nullptr; +} + +static void geomIoFuncTestInThread() { + TdThread thread; + ASSERT_EQ(taosThreadCreate(&thread, nullptr, geomIoFuncTestFunc, NULL), 0); + ASSERT_EQ(taosThreadJoin(thread, nullptr), 0); +} + +TEST(threadGeomFuncTest, threadFuncTest) { + geomIoFuncTestImpl(); + geomIoFuncTestInThread(); } \ No newline at end of file diff --git a/source/libs/geometry/test/geomRelationFuncTest.cpp b/source/libs/geometry/test/geomRelationFuncTest.cpp index 82bc3a4055..4563d443cb 100644 --- a/source/libs/geometry/test/geomRelationFuncTest.cpp +++ b/source/libs/geometry/test/geomRelationFuncTest.cpp @@ -238,21 +238,27 @@ void geomRelationFuncTestContainsProperlyFunction() { geomRelationFuncTest(containsProperlyFunction, expectedResults); } -static void *geomRelationFuncTest(void *arg) { +static void geomRelationFuncTestImpl() { geomRelationFuncTestIntersectsFunction(); geomRelationFuncTestEqualsFunction(); geomRelationFuncTestTouchesFunction(); geomRelationFuncTestCoversFunction(); geomRelationFuncTestContainsFunction(); geomRelationFuncTestContainsProperlyFunction(); - - return NULL; } -TEST(GeomRelationFuncTest, summary) { - TdThread threadId; - int32_t ret = taosThreadCreate(&threadId, NULL, geomRelationFuncTest, 0); - ASSERT_EQ(ret, 0); - ret = taosThreadJoin(threadId, NULL); - ASSERT_EQ(ret, 0); +static void *geomRelationFuncTestFunc(void *arg) { + geomRelationFuncTestImpl(); + return nullptr; } + +static void geomRelationFuncTestInThread() { + TdThread thread; + ASSERT_EQ(taosThreadCreate(&thread, nullptr, geomRelationFuncTestFunc, NULL), 0); + ASSERT_EQ(taosThreadJoin(thread, nullptr), 0); +} + +TEST(threadGeomRelationFuncTest, threadGeomRelationFuncTest) { + geomRelationFuncTestImpl(); + geomRelationFuncTestInThread(); +} \ No newline at end of file diff --git a/source/os/src/osThread.c b/source/os/src/osThread.c index 5a24e7775f..1fc48f145b 100644 --- a/source/os/src/osThread.c +++ b/source/os/src/osThread.c @@ -841,3 +841,35 @@ void taosThreadTestCancel(void) { void taosThreadClear(TdThread *thread) { (void)memset(thread, 0, sizeof(TdThread)); } + +#ifdef WINDOWS +bool taosThreadIsMain() { + DWORD curProcessId = GetCurrentProcessId(); + DWORD curThreadId = GetCurrentThreadId(); + DWORD dwThreadId = -1; + + HANDLE hThreadSnapshot = CreateToolhelp32Snapshot(TH32CS_SNAPTHREAD, 0); + if (hThreadSnapshot == INVALID_HANDLE_VALUE) { + return false; + } + + THREADENTRY32 te32; + te32.dwSize = sizeof(THREADENTRY32); + + if (!Thread32First(hThreadSnapshot, &te32)) { + CloseHandle(hThreadSnapshot); + return false; + } + + do { + if (te32.th32OwnerProcessID == curProcessId) { + dwThreadId = te32.th32ThreadID; + break; + } + } while (Thread32Next(hThreadSnapshot, &te32)); + + CloseHandle(hThreadSnapshot); + + return curThreadId == dwThreadId; +} +#endif diff --git a/source/util/src/tgeosctx.c b/source/util/src/tgeosctx.c index 9d8f64e01a..9c4f78684d 100644 --- a/source/util/src/tgeosctx.c +++ b/source/util/src/tgeosctx.c @@ -21,6 +21,10 @@ static TdThreadKey tlGeosCtxKey = 0; static threadlocal SGeosContext *tlGeosCtx = NULL; static void destroyThreadLocalGeosCtx(void *param) { +#ifdef WINDOWS + if (taosThreadIsMain()) return; +#endif + SGeosContext *pGeosCtx = (SGeosContext *)param; if (!pGeosCtx) { return; @@ -72,9 +76,11 @@ int32_t getThreadLocalGeosCtx(SGeosContext **ppCtx) { SGeosContext *tlGeosCtxObj = (SGeosContext *)taosMemoryCalloc(1, sizeof(SGeosContext)); if (!tlGeosCtxObj) { + taosThreadKeyDelete(tlGeosCtxKey); TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY); } if ((taosThreadSetSpecific(tlGeosCtxKey, (const void *)tlGeosCtxObj)) != 0) { + taosThreadKeyDelete(tlGeosCtxKey); taosMemoryFreeClear(tlGeosCtxObj); TAOS_CHECK_EXIT(TAOS_SYSTEM_ERROR(errno)); } From 06d60bfce6891c6fe5c1b403d46113b6d1e9a1ea Mon Sep 17 00:00:00 2001 From: Kaili Xu Date: Wed, 28 Aug 2024 19:15:28 +0800 Subject: [PATCH 14/17] Update osThread.h --- include/os/osThread.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/include/os/osThread.h b/include/os/osThread.h index f39c01abee..73c499b1dc 100644 --- a/include/os/osThread.h +++ b/include/os/osThread.h @@ -22,7 +22,7 @@ extern "C" { #endif -#if defined(WINDOWS) +#ifdef WINDOWS #include #include #endif From b8dcb50c86f7237c953f233d99aacccab45b7c9b Mon Sep 17 00:00:00 2001 From: kailixu Date: Wed, 28 Aug 2024 19:53:50 +0800 Subject: [PATCH 15/17] fix: memory leak of geos --- source/util/src/tgeosctx.c | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/source/util/src/tgeosctx.c b/source/util/src/tgeosctx.c index 9c4f78684d..362159d42a 100644 --- a/source/util/src/tgeosctx.c +++ b/source/util/src/tgeosctx.c @@ -17,7 +17,9 @@ #include "tlog.h" #include "tutil.h" -static TdThreadKey tlGeosCtxKey = 0; +static TdThreadKey tlGeosCtxKey = 0; +static int8_t tlGeosCtxKeyInited = 0; + static threadlocal SGeosContext *tlGeosCtx = NULL; static void destroyThreadLocalGeosCtx(void *param) { @@ -70,17 +72,18 @@ int32_t getThreadLocalGeosCtx(SGeosContext **ppCtx) { } int32_t code = 0, lino = 0; - if ((taosThreadKeyCreate(&tlGeosCtxKey, destroyThreadLocalGeosCtx)) != 0) { - TAOS_CHECK_EXIT(TAOS_SYSTEM_ERROR(errno)); + if (atomic_val_compare_exchange_8(&tlGeosCtxKeyInited, 0, 1) == 0) { + if ((taosThreadKeyCreate(&tlGeosCtxKey, destroyThreadLocalGeosCtx)) != 0) { + atomic_store_8(&tlGeosCtxKeyInited, 0); + TAOS_CHECK_EXIT(TAOS_SYSTEM_ERROR(errno)); + } } SGeosContext *tlGeosCtxObj = (SGeosContext *)taosMemoryCalloc(1, sizeof(SGeosContext)); if (!tlGeosCtxObj) { - taosThreadKeyDelete(tlGeosCtxKey); TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY); } if ((taosThreadSetSpecific(tlGeosCtxKey, (const void *)tlGeosCtxObj)) != 0) { - taosThreadKeyDelete(tlGeosCtxKey); taosMemoryFreeClear(tlGeosCtxObj); TAOS_CHECK_EXIT(TAOS_SYSTEM_ERROR(errno)); } From 203c04ca2dcc596cbb4fa679444c754e4d3272bd Mon Sep 17 00:00:00 2001 From: kailixu Date: Wed, 28 Aug 2024 19:57:49 +0800 Subject: [PATCH 16/17] fix: memory leak of geos --- source/util/src/tgeosctx.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/util/src/tgeosctx.c b/source/util/src/tgeosctx.c index 362159d42a..805d398275 100644 --- a/source/util/src/tgeosctx.c +++ b/source/util/src/tgeosctx.c @@ -93,7 +93,7 @@ int32_t getThreadLocalGeosCtx(SGeosContext **ppCtx) { _exit: if (code != 0) { *ppCtx = NULL; - uError("failed to get geos context at lino:%d since %s", lino, tstrerror(code)); + uError("failed to get geos context at line:%d since %s", lino, tstrerror(code)); } TAOS_RETURN(code); } From b357d3da2a808adfe5920bb2491105cd1a110117 Mon Sep 17 00:00:00 2001 From: Shungang Li Date: Thu, 29 Aug 2024 14:47:03 +0800 Subject: [PATCH 17/17] enh: (s3) continue to check other s3 cfg after an error occurs --- source/common/src/cos.c | 55 ++++++++++++++++++++++++++++++----------- 1 file changed, 40 insertions(+), 15 deletions(-) diff --git a/source/common/src/cos.c b/source/common/src/cos.c index 3b5fca0fea..250a4815f4 100644 --- a/source/common/src/cos.c +++ b/source/common/src/cos.c @@ -72,23 +72,42 @@ void s3CleanUp() { /*s3End();*/ static int32_t s3ListBucket(char const *bucketname); +static void s3DumpCfgByEp(int8_t epIndex) { + // clang-format off + (void)fprintf(stdout, + "%-24s %s\n" + "%-24s %s\n" + "%-24s %s\n" + "%-24s %s\n" + "%-24s %s\n" + "%-24s %s\n", + "hostName", tsS3Hostname[epIndex], + "bucketName", tsS3BucketName, + "protocol", (protocolG[epIndex] == S3ProtocolHTTPS ? "https" : "http"), + "uristyle", (uriStyleG[epIndex] == S3UriStyleVirtualHost ? "virtualhost" : "path"), + "accessKey", tsS3AccessKeyId[epIndex], + "accessKeySecret", tsS3AccessKeySecret[epIndex]); + // clang-format on +} + int32_t s3CheckCfg() { int32_t code = 0, lino = 0; int8_t i = 0; if (!tsS3Enabled) { (void)fprintf(stderr, "s3 not configured.\n"); - goto _exit; + TAOS_RETURN(code); } code = s3Begin(); if (code != 0) { (void)fprintf(stderr, "failed to initialize s3.\n"); - TAOS_CHECK_GOTO(code, &lino, _exit); + TAOS_RETURN(code); } for (; i < tsS3EpNum; i++) { - (void)fprintf(stdout, "test s3 ep: %d/%d.\n", i + 1, tsS3EpNum); + (void)fprintf(stdout, "test s3 ep (%d/%d):\n", i + 1, tsS3EpNum); + s3DumpCfgByEp(i); // test put char testdata[17] = "0123456789abcdef"; @@ -109,15 +128,15 @@ int32_t s3CheckCfg() { if (!fp) { (void)fprintf(stderr, "failed to open test file: %s.\n", path); // uError("ERROR: %s Failed to open %s", __func__, path); - TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _exit); + TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _next); } if (taosWriteFile(fp, testdata, strlen(testdata)) < 0) { (void)fprintf(stderr, "failed to write test file: %s.\n", path); - TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _exit); + TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _next); } if (taosFsyncFile(fp) < 0) { (void)fprintf(stderr, "failed to fsync test file: %s.\n", path); - TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _exit); + TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _next); } (void)taosCloseFile(&fp); @@ -125,7 +144,7 @@ int32_t s3CheckCfg() { code = s3PutObjectFromFileOffsetByEp(path, objectname[0], 0, 16, i); if (code != 0) { (void)fprintf(stderr, "put object %s : failed.\n", objectname[0]); - TAOS_CHECK_GOTO(code, &lino, _exit); + TAOS_CHECK_GOTO(code, &lino, _next); } (void)fprintf(stderr, "put object %s: success.\n\n", objectname[0]); @@ -134,7 +153,7 @@ int32_t s3CheckCfg() { code = s3ListBucketByEp(tsS3BucketName, i); if (code != 0) { (void)fprintf(stderr, "listing bucket %s : failed.\n", tsS3BucketName); - TAOS_CHECK_GOTO(code, &lino, _exit); + TAOS_CHECK_GOTO(code, &lino, _next); } (void)fprintf(stderr, "listing bucket %s: success.\n\n", tsS3BucketName); @@ -147,7 +166,7 @@ int32_t s3CheckCfg() { code = s3GetObjectBlockByEp(objectname[0], c_offset, c_len, true, &pBlock, i); if (code != 0) { (void)fprintf(stderr, "get object %s : failed.\n", objectname[0]); - TAOS_CHECK_GOTO(code, &lino, _exit); + TAOS_CHECK_GOTO(code, &lino, _next); } char buf[7] = {0}; (void)memcpy(buf, pBlock, c_len); @@ -160,18 +179,24 @@ int32_t s3CheckCfg() { code = s3DeleteObjectsByEp(objectname, 1, i); if (code != 0) { (void)fprintf(stderr, "delete object %s : failed.\n", objectname[0]); - TAOS_CHECK_GOTO(code, &lino, _exit); + TAOS_CHECK_GOTO(code, &lino, _next); } (void)fprintf(stderr, "delete object %s: success.\n\n", objectname[0]); + + _next: + if (fp) { + (void)taosCloseFile(&fp); + } + + if (TSDB_CODE_SUCCESS != code) { + (void)fprintf(stderr, "s3 check failed, code: %d, line: %d, index: %d.\n", code, lino, i); + } + + (void)fprintf(stdout, "=================================================================\n"); } s3End(); -_exit: - if (TSDB_CODE_SUCCESS != code) { - (void)fprintf(stderr, "s3 check failed, code: %d, line: %d, index: %d.\n", code, lino, i); - } - TAOS_RETURN(code); }