fix: memory leak of geos

This commit is contained in:
kailixu 2024-08-24 09:56:27 +08:00
parent 7ec5ecf8fc
commit 3dcd7ac371
6 changed files with 95 additions and 101 deletions

View File

@ -39,9 +39,8 @@ typedef struct SGeosContext {
} SGeosContext;
SGeosContext *acquireThreadLocalGeosCtx();
SGeosContext *getThreadLocalGeosCtx();
int32_t getThreadLocalGeosCtx(SGeosContext **ppCtx);
const char *getGeosErrMsg(int32_t code);
void taosGeosDestroy();
#ifdef __cplusplus
}

View File

@ -87,7 +87,6 @@ void taos_cleanup(void) {
tscDebug("rpc cleanup");
taosConvDestroy();
taosGeosDestroy();
tmqMgmtClose();

View File

@ -122,7 +122,6 @@ void dmCleanupDnode(SDnode *pDnode) {
streamMetaCleanup();
indexCleanup();
taosConvDestroy();
taosGeosDestroy();
// compress destroy
tsCompressExit();

View File

@ -14,7 +14,7 @@
*/
#include "geosWrapper.h"
#include "tdef.h"
#include "tutil.h"
#include "types.h"
typedef char (*_geosRelationFunc_t)(GEOSContextHandle_t handle, const GEOSGeometry *g1, const GEOSGeometry *g2);
@ -35,7 +35,9 @@ void geosErrMsgeHandler(const char *errMsg, void *userData) {
int32_t initCtxMakePoint() {
int32_t code = TSDB_CODE_FAILED;
SGeosContext *geosCtx = getThreadLocalGeosCtx();
SGeosContext *geosCtx = NULL;
TAOS_CHECK_RETURN(getThreadLocalGeosCtx(&geosCtx));
if (geosCtx->handle == NULL) {
geosCtx->handle = GEOS_init_r();
@ -60,7 +62,9 @@ int32_t initCtxMakePoint() {
// need to call geosFreeBuffer(*outputGeom) later
int32_t doMakePoint(double x, double y, unsigned char **outputGeom, size_t *size) {
int32_t code = TSDB_CODE_FAILED;
SGeosContext *geosCtx = getThreadLocalGeosCtx();
SGeosContext *geosCtx = NULL;
TAOS_CHECK_RETURN(getThreadLocalGeosCtx(&geosCtx));
GEOSGeometry *geom = NULL;
unsigned char *wkb = NULL;
@ -165,7 +169,9 @@ static int32_t initWktRegex(pcre2_code **ppRegex, pcre2_match_data **ppMatchData
int32_t initCtxGeomFromText() {
int32_t code = TSDB_CODE_FAILED;
SGeosContext *geosCtx = getThreadLocalGeosCtx();
SGeosContext *geosCtx = NULL;
TAOS_CHECK_RETURN(getThreadLocalGeosCtx(&geosCtx));
if (geosCtx->handle == NULL) {
geosCtx->handle = GEOS_init_r();
@ -201,7 +207,9 @@ int32_t initCtxGeomFromText() {
// need to call geosFreeBuffer(*outputGeom) later
int32_t doGeomFromText(const char *inputWKT, unsigned char **outputGeom, size_t *size) {
int32_t code = TSDB_CODE_FAILED;
SGeosContext *geosCtx = getThreadLocalGeosCtx();
SGeosContext *geosCtx = NULL;
TAOS_CHECK_RETURN(getThreadLocalGeosCtx(&geosCtx));
GEOSGeometry *geom = NULL;
unsigned char *wkb = NULL;
@ -236,7 +244,9 @@ _exit:
int32_t initCtxAsText() {
int32_t code = TSDB_CODE_FAILED;
SGeosContext *geosCtx = getThreadLocalGeosCtx();
SGeosContext *geosCtx = NULL;
TAOS_CHECK_RETURN(getThreadLocalGeosCtx(&geosCtx));
if (geosCtx->handle == NULL) {
geosCtx->handle = GEOS_init_r();
@ -272,7 +282,9 @@ int32_t initCtxAsText() {
// need to call geosFreeBuffer(*outputWKT) later
int32_t doAsText(const unsigned char *inputGeom, size_t size, char **outputWKT) {
int32_t code = TSDB_CODE_FAILED;
SGeosContext *geosCtx = getThreadLocalGeosCtx();
SGeosContext *geosCtx = NULL;
TAOS_CHECK_RETURN(getThreadLocalGeosCtx(&geosCtx));
GEOSGeometry *geom = NULL;
char *wkt = NULL;
@ -303,7 +315,9 @@ _exit:
int32_t initCtxRelationFunc() {
int32_t code = TSDB_CODE_FAILED;
SGeosContext *geosCtx = getThreadLocalGeosCtx();
SGeosContext *geosCtx = NULL;
TAOS_CHECK_RETURN(getThreadLocalGeosCtx(&geosCtx));
if (geosCtx->handle == NULL) {
geosCtx->handle = GEOS_init_r();
@ -328,7 +342,9 @@ int32_t doGeosRelation(const GEOSGeometry *geom1, const GEOSPreparedGeometry *pr
bool swapped, char *res, _geosRelationFunc_t relationFn, _geosRelationFunc_t swappedRelationFn,
_geosPreparedRelationFunc_t preparedRelationFn,
_geosPreparedRelationFunc_t swappedPreparedRelationFn) {
SGeosContext *geosCtx = getThreadLocalGeosCtx();
SGeosContext *geosCtx = NULL;
TAOS_CHECK_RETURN(getThreadLocalGeosCtx(&geosCtx));
if (!preparedGeom1) {
if (!swapped) {
@ -390,8 +406,6 @@ int32_t doContainsProperly(const GEOSGeometry *geom1, const GEOSPreparedGeometry
// need to call destroyGeometry(outputGeom, outputPreparedGeom) later
int32_t readGeometry(const unsigned char *input, GEOSGeometry **outputGeom,
const GEOSPreparedGeometry **outputPreparedGeom) {
SGeosContext *geosCtx = getThreadLocalGeosCtx();
ASSERT(outputGeom); // it is not allowed if outputGeom is NULL
*outputGeom = NULL;
@ -403,6 +417,9 @@ int32_t readGeometry(const unsigned char *input, GEOSGeometry **outputGeom,
return TSDB_CODE_SUCCESS;
}
SGeosContext *geosCtx = NULL;
TAOS_CHECK_RETURN(getThreadLocalGeosCtx(&geosCtx));
*outputGeom = GEOSWKBReader_read_r(geosCtx->handle, geosCtx->WKBReader, varDataVal(input), varDataLen(input));
if (*outputGeom == NULL) {
return TSDB_CODE_FUNC_FUNTION_PARA_VALUE;

View File

@ -14,102 +14,82 @@
*/
#include "tgeosctx.h"
#include "tarray.h"
#include "tdef.h"
#include "tlockfree.h"
#include "tlog.h"
#include "tutil.h"
#define GEOS_POOL_CAPACITY 64
typedef struct {
SArray *poolArray; // totalSize: (GEOS_POOL_CAPACITY * (taosArrayGetSize(poolArray) - 1)) + size
SGeosContext *pool; // current SGeosContext pool
int32_t size; // size of current SGeosContext pool, size <= GEOS_POOL_CAPACITY
SRWLatch lock;
} SGeosContextPool;
static SGeosContextPool sGeosPool = {0};
static threadlocal TdThreadKey tlGeosCtxKey = 0;
static threadlocal SGeosContext tlGeosCtxObj = {0};
static threadlocal SGeosContext *tlGeosCtx = NULL;
static void destroyThreadLocalGeosCtx();
SGeosContext *acquireThreadLocalGeosCtx() { return tlGeosCtx; }
SGeosContext *getThreadLocalGeosCtx() {
if (tlGeosCtx) return tlGeosCtx;
taosWLockLatch(&sGeosPool.lock);
if (!sGeosPool.pool || sGeosPool.size >= GEOS_POOL_CAPACITY) {
if (!(sGeosPool.pool = (SGeosContext *)taosMemoryCalloc(GEOS_POOL_CAPACITY, sizeof(SGeosContext)))) {
taosWUnLockLatch(&sGeosPool.lock);
return NULL;
}
if (!sGeosPool.poolArray) {
if (!(sGeosPool.poolArray = taosArrayInit(16, POINTER_BYTES))) {
taosMemoryFreeClear(sGeosPool.pool);
taosWUnLockLatch(&sGeosPool.lock);
return NULL;
}
}
if (!taosArrayPush(sGeosPool.poolArray, &sGeosPool.pool)) {
taosMemoryFreeClear(sGeosPool.pool);
taosWUnLockLatch(&sGeosPool.lock);
return NULL;
}
sGeosPool.size = 0;
int32_t getThreadLocalGeosCtx(SGeosContext **ppCtx) {
if (tlGeosCtx) {
*ppCtx = tlGeosCtx;
return 0;
}
tlGeosCtx = sGeosPool.pool + sGeosPool.size;
++sGeosPool.size;
taosWUnLockLatch(&sGeosPool.lock);
return tlGeosCtx;
int32_t code = 0, lino = 0;
if ((taosThreadKeyCreate(&tlGeosCtxKey, destroyThreadLocalGeosCtx)) != 0) {
TAOS_CHECK_EXIT(TAOS_SYSTEM_ERROR(errno));
}
if ((taosThreadSetSpecific(tlGeosCtxKey, &tlGeosCtxObj)) != 0) {
TAOS_CHECK_EXIT(TAOS_SYSTEM_ERROR(errno));
}
tlGeosCtx = taosThreadGetSpecific(tlGeosCtxKey);
if (tlGeosCtx == NULL) {
if (errno) {
TAOS_CHECK_EXIT(TAOS_SYSTEM_ERROR(errno));
} else {
TAOS_CHECK_EXIT(TSDB_CODE_NOT_FOUND);
}
}
*ppCtx = tlGeosCtx;
_exit:
if (code != 0) {
*ppCtx = NULL;
uError("failed to get thread local geos context at lino:%d since %s", lino, tstrerror(code));
}
TAOS_RETURN(code);
}
const char *getGeosErrMsg(int32_t code) { return tlGeosCtx ? tlGeosCtx->errMsg : (code != 0 ? tstrerror(code) : ""); }
const char *getGeosErrMsg(int32_t code) { return tlGeosCtx ? tlGeosCtx->errMsg : code ? strerror(code) : ""; }
static void destroyGeosCtx(SGeosContext *pCtx) {
if (pCtx) {
if (pCtx->WKTReader) {
GEOSWKTReader_destroy_r(pCtx->handle, pCtx->WKTReader);
pCtx->WKTReader = NULL;
}
if (pCtx->WKTWriter) {
GEOSWKTWriter_destroy_r(pCtx->handle, pCtx->WKTWriter);
pCtx->WKTWriter = NULL;
}
if (pCtx->WKBReader) {
GEOSWKBReader_destroy_r(pCtx->handle, pCtx->WKBReader);
pCtx->WKBReader = NULL;
}
if (pCtx->WKBWriter) {
GEOSWKBWriter_destroy_r(pCtx->handle, pCtx->WKBWriter);
pCtx->WKBWriter = NULL;
}
if (pCtx->WKTRegex) {
destroyRegexes(pCtx->WKTRegex, pCtx->WKTMatchData);
pCtx->WKTRegex = NULL;
pCtx->WKTMatchData = NULL;
}
if (pCtx->handle) {
GEOS_finish_r(pCtx->handle);
pCtx->handle = NULL;
}
static void destroyThreadLocalGeosCtx(void *param) {
SGeosContext *tlGeosCtx = &tlGeosCtxObj;
if (tlGeosCtx->WKTReader) {
GEOSWKTReader_destroy_r(tlGeosCtx->handle, tlGeosCtx->WKTReader);
tlGeosCtx->WKTReader = NULL;
}
}
void taosGeosDestroy() {
uInfo("geos is cleaned up");
int32_t size = taosArrayGetSize(sGeosPool.poolArray);
for (int32_t i = 0; i < size; ++i) {
SGeosContext *pool = *(SGeosContext **)TARRAY_GET_ELEM(sGeosPool.poolArray, i);
int32_t poolSize = i == size - 1 ? sGeosPool.size : GEOS_POOL_CAPACITY;
for (int32_t j = 0; j < poolSize; ++j) {
destroyGeosCtx(pool + j);
}
taosMemoryFree(pool);
if (tlGeosCtx->WKTWriter) {
GEOSWKTWriter_destroy_r(tlGeosCtx->handle, tlGeosCtx->WKTWriter);
tlGeosCtx->WKTWriter = NULL;
}
if (tlGeosCtx->WKBReader) {
GEOSWKBReader_destroy_r(tlGeosCtx->handle, tlGeosCtx->WKBReader);
tlGeosCtx->WKBReader = NULL;
}
if (tlGeosCtx->WKBWriter) {
GEOSWKBWriter_destroy_r(tlGeosCtx->handle, tlGeosCtx->WKBWriter);
tlGeosCtx->WKBWriter = NULL;
}
if (tlGeosCtx->WKTRegex) {
destroyRegexes(tlGeosCtx->WKTRegex, tlGeosCtx->WKTMatchData);
tlGeosCtx->WKTRegex = NULL;
tlGeosCtx->WKTMatchData = NULL;
}
if (tlGeosCtx->handle) {
GEOS_finish_r(tlGeosCtx->handle);
tlGeosCtx->handle = NULL;
}
taosArrayDestroy(sGeosPool.poolArray);
sGeosPool.poolArray = NULL;
}

View File

@ -613,14 +613,14 @@ void shellPrintGeometry(const unsigned char *val, int32_t length, int32_t width)
code = initCtxAsText();
if (code != TSDB_CODE_SUCCESS) {
shellPrintString(destroyThreadLocalGeosCtx(), width);
shellPrintString(getGeosErrMsg(code), width);
return;
}
char *outputWKT = NULL;
code = doAsText(val, length, &outputWKT);
if (code != TSDB_CODE_SUCCESS) {
shellPrintString(destroyThreadLocalGeosCtx(), width); // should NOT happen
shellPrintString(getGeosErrMsg(code), width); // should NOT happen
return;
}