fix: memory leak of geos

This commit is contained in:
kailixu 2024-08-19 18:36:39 +08:00
parent 800e7c4e7a
commit 0531a4f4bd
11 changed files with 71 additions and 26 deletions

View File

@ -38,8 +38,10 @@ typedef struct SGeosContext {
char errMsg[512]; char errMsg[512];
} SGeosContext; } SGeosContext;
SGeosContext* getThreadLocalGeosCtx(); SGeosContext *acquireThreadLocalGeosCtx();
void destroyThreadLocalGeosCtx(); SGeosContext *getThreadLocalGeosCtx();
const char *getGeosErrMsg(int32_t code);
void taosGeosDestroy();
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -91,11 +91,11 @@ void taos_cleanup(void) {
tscDebug("rpc cleanup"); tscDebug("rpc cleanup");
taosConvDestroy(); taosConvDestroy();
taosGeosDestroy();
tmqMgmtClose(); tmqMgmtClose();
DestroyRegexCache(); DestroyRegexCache();
destroyThreadLocalGeosCtx();
tscInfo("all local resources released"); tscInfo("all local resources released");
taosCleanupCfg(); taosCleanupCfg();

View File

@ -122,7 +122,7 @@ void dmCleanupDnode(SDnode *pDnode) {
streamMetaCleanup(); streamMetaCleanup();
indexCleanup(); indexCleanup();
taosConvDestroy(); taosConvDestroy();
destroyThreadLocalGeosCtx(); taosGeosDestroy();
// compress destroy // compress destroy
tsCompressExit(); tsCompressExit();

View File

@ -979,7 +979,7 @@ static int32_t sysTableGetGeomText(char* iGeom, int32_t nGeom, char** output, in
if (TSDB_CODE_SUCCESS != (code = initCtxAsText()) || if (TSDB_CODE_SUCCESS != (code = initCtxAsText()) ||
TSDB_CODE_SUCCESS != (code = doAsText(iGeom, nGeom, &outputWKT))) { TSDB_CODE_SUCCESS != (code = doAsText(iGeom, nGeom, &outputWKT))) {
qError("geo text for systable failed:%s", getThreadLocalGeosCtx()->errMsg); qError("geo text for systable failed:%s", getGeosErrMsg(code));
*output = NULL; *output = NULL;
*nOutput = 0; *nOutput = 0;
return code; return code;

View File

@ -23,7 +23,8 @@ typedef char (*_geosPreparedRelationFunc_t)(GEOSContextHandle_t handle, const GE
void geosFreeBuffer(void *buffer) { void geosFreeBuffer(void *buffer) {
if (buffer) { if (buffer) {
GEOSFree_r(getThreadLocalGeosCtx()->handle, buffer); SGeosContext *pCtx = acquireThreadLocalGeosCtx();
if (pCtx) GEOSFree_r(pCtx->handle, buffer);
} }
} }
@ -36,6 +37,11 @@ int32_t initCtxMakePoint() {
int32_t code = TSDB_CODE_FAILED; int32_t code = TSDB_CODE_FAILED;
SGeosContext *geosCtx = getThreadLocalGeosCtx(); SGeosContext *geosCtx = getThreadLocalGeosCtx();
if (!geosCtx) {
code = TSDB_CODE_OUT_OF_MEMORY;
return code;
}
if (geosCtx->handle == NULL) { if (geosCtx->handle == NULL) {
geosCtx->handle = GEOS_init_r(); geosCtx->handle = GEOS_init_r();
if (geosCtx->handle == NULL) { if (geosCtx->handle == NULL) {
@ -61,6 +67,11 @@ int32_t doMakePoint(double x, double y, unsigned char **outputGeom, size_t *size
int32_t code = TSDB_CODE_FAILED; int32_t code = TSDB_CODE_FAILED;
SGeosContext *geosCtx = getThreadLocalGeosCtx(); SGeosContext *geosCtx = getThreadLocalGeosCtx();
if (!geosCtx) {
code = TSDB_CODE_OUT_OF_MEMORY;
return code;
}
GEOSGeometry *geom = NULL; GEOSGeometry *geom = NULL;
unsigned char *wkb = NULL; unsigned char *wkb = NULL;
@ -166,6 +177,11 @@ int32_t initCtxGeomFromText() {
int32_t code = TSDB_CODE_FAILED; int32_t code = TSDB_CODE_FAILED;
SGeosContext *geosCtx = getThreadLocalGeosCtx(); SGeosContext *geosCtx = getThreadLocalGeosCtx();
if (!geosCtx) {
code = TSDB_CODE_OUT_OF_MEMORY;
return code;
}
if (geosCtx->handle == NULL) { if (geosCtx->handle == NULL) {
geosCtx->handle = GEOS_init_r(); geosCtx->handle = GEOS_init_r();
if (geosCtx->handle == NULL) { if (geosCtx->handle == NULL) {
@ -202,6 +218,11 @@ int32_t doGeomFromText(const char *inputWKT, unsigned char **outputGeom, size_t
int32_t code = TSDB_CODE_FAILED; int32_t code = TSDB_CODE_FAILED;
SGeosContext *geosCtx = getThreadLocalGeosCtx(); SGeosContext *geosCtx = getThreadLocalGeosCtx();
if (!geosCtx) {
code = TSDB_CODE_OUT_OF_MEMORY;
return code;
}
GEOSGeometry *geom = NULL; GEOSGeometry *geom = NULL;
unsigned char *wkb = NULL; unsigned char *wkb = NULL;
@ -237,6 +258,11 @@ int32_t initCtxAsText() {
int32_t code = TSDB_CODE_FAILED; int32_t code = TSDB_CODE_FAILED;
SGeosContext *geosCtx = getThreadLocalGeosCtx(); SGeosContext *geosCtx = getThreadLocalGeosCtx();
if (!geosCtx) {
code = TSDB_CODE_OUT_OF_MEMORY;
return code;
}
if (geosCtx->handle == NULL) { if (geosCtx->handle == NULL) {
geosCtx->handle = GEOS_init_r(); geosCtx->handle = GEOS_init_r();
if (geosCtx->handle == NULL) { if (geosCtx->handle == NULL) {
@ -273,6 +299,11 @@ int32_t doAsText(const unsigned char *inputGeom, size_t size, char **outputWKT)
int32_t code = TSDB_CODE_FAILED; int32_t code = TSDB_CODE_FAILED;
SGeosContext *geosCtx = getThreadLocalGeosCtx(); SGeosContext *geosCtx = getThreadLocalGeosCtx();
if (!geosCtx) {
code = TSDB_CODE_OUT_OF_MEMORY;
return code;
}
GEOSGeometry *geom = NULL; GEOSGeometry *geom = NULL;
char *wkt = NULL; char *wkt = NULL;
@ -304,6 +335,11 @@ int32_t initCtxRelationFunc() {
int32_t code = TSDB_CODE_FAILED; int32_t code = TSDB_CODE_FAILED;
SGeosContext *geosCtx = getThreadLocalGeosCtx(); SGeosContext *geosCtx = getThreadLocalGeosCtx();
if (!geosCtx) {
code = TSDB_CODE_OUT_OF_MEMORY;
return code;
}
if (geosCtx->handle == NULL) { if (geosCtx->handle == NULL) {
geosCtx->handle = GEOS_init_r(); geosCtx->handle = GEOS_init_r();
if (geosCtx->handle == NULL) { if (geosCtx->handle == NULL) {
@ -329,6 +365,10 @@ int32_t doGeosRelation(const GEOSGeometry *geom1, const GEOSPreparedGeometry *pr
_geosPreparedRelationFunc_t swappedPreparedRelationFn) { _geosPreparedRelationFunc_t swappedPreparedRelationFn) {
SGeosContext *geosCtx = getThreadLocalGeosCtx(); SGeosContext *geosCtx = getThreadLocalGeosCtx();
if (!geosCtx) {
return TSDB_CODE_OUT_OF_MEMORY;
}
if (!preparedGeom1) { if (!preparedGeom1) {
if (!swapped) { if (!swapped) {
ASSERT(relationFn); ASSERT(relationFn);
@ -390,6 +430,9 @@ int32_t doContainsProperly(const GEOSGeometry *geom1, const GEOSPreparedGeometry
int32_t readGeometry(const unsigned char *input, GEOSGeometry **outputGeom, int32_t readGeometry(const unsigned char *input, GEOSGeometry **outputGeom,
const GEOSPreparedGeometry **outputPreparedGeom) { const GEOSPreparedGeometry **outputPreparedGeom) {
SGeosContext *geosCtx = getThreadLocalGeosCtx(); SGeosContext *geosCtx = getThreadLocalGeosCtx();
if (!geosCtx) {
return TSDB_CODE_OUT_OF_MEMORY;
}
ASSERT(outputGeom); // it is not allowed if outputGeom is NULL ASSERT(outputGeom); // it is not allowed if outputGeom is NULL
*outputGeom = NULL; *outputGeom = NULL;

View File

@ -655,7 +655,7 @@ static int32_t parseTagToken(const char** end, SToken* pToken, SSchema* pSchema,
code = parseGeometry(pToken, &output, &size); code = parseGeometry(pToken, &output, &size);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
code = buildSyntaxErrMsg(pMsgBuf, getThreadLocalGeosCtx()->errMsg, pToken->z); code = buildSyntaxErrMsg(pMsgBuf, getGeosErrMsg(code), pToken->z);
} else if (size + VARSTR_HEADER_SIZE > pSchema->bytes) { } else if (size + VARSTR_HEADER_SIZE > pSchema->bytes) {
// Too long values will raise the invalid sql error message // Too long values will raise the invalid sql error message
code = generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_VALUE_TOO_LONG, pSchema->name); code = generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_VALUE_TOO_LONG, pSchema->name);
@ -1646,7 +1646,7 @@ static int32_t parseValueTokenImpl(SInsertParseContext* pCxt, const char** pSql,
code = parseGeometry(pToken, &output, &size); code = parseGeometry(pToken, &output, &size);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
code = buildSyntaxErrMsg(&pCxt->msg, getThreadLocalGeosCtx()->errMsg, pToken->z); code = buildSyntaxErrMsg(&pCxt->msg, getGeosErrMsg(code), pToken->z);
} }
// Too long values will raise the invalid sql error message // Too long values will raise the invalid sql error message
else if (size + VARSTR_HEADER_SIZE > pSchema->bytes) { else if (size + VARSTR_HEADER_SIZE > pSchema->bytes) {

View File

@ -441,12 +441,12 @@ static FORCE_INLINE int32_t varToGeometry(char *buf, SScalarParam *pOut, int32_t
unsigned char *t = NULL; unsigned char *t = NULL;
char *output = NULL; char *output = NULL;
if (initCtxGeomFromText()) { if ((code = initCtxGeomFromText()) != 0) {
sclError("failed to init geometry ctx, %s", getThreadLocalGeosCtx()->errMsg); sclError("failed to init geometry ctx, %s", getGeosErrMsg(code));
SCL_ERR_JRET(TSDB_CODE_APP_ERROR); SCL_ERR_JRET(TSDB_CODE_APP_ERROR);
} }
if (doGeomFromText(buf, &t, &len)) { if ((code = doGeomFromText(buf, &t, &len))) {
sclInfo("failed to convert text to geometry, %s", getThreadLocalGeosCtx()->errMsg); sclInfo("failed to convert text to geometry, %s", getGeosErrMsg(code));
SCL_ERR_JRET(TSDB_CODE_SCALAR_CONVERT_ERROR); SCL_ERR_JRET(TSDB_CODE_SCALAR_CONVERT_ERROR);
} }

View File

@ -26,15 +26,18 @@ typedef struct {
} SGeosContextPool; } SGeosContextPool;
static SGeosContextPool sGeosPool = {0}; static SGeosContextPool sGeosPool = {0};
static threadlocal SGeosContext *tlGeosCtx = NULL; static threadlocal SGeosContext *tlGeosCtx = NULL;
SGeosContext *acquireThreadLocalGeosCtx() { return tlGeosCtx; }
SGeosContext *getThreadLocalGeosCtx() { SGeosContext *getThreadLocalGeosCtx() {
if (tlGeosCtx) return tlGeosCtx; if (tlGeosCtx) {
return tlGeosCtx;
}
taosWLockLatch(&sGeosPool.lock); taosWLockLatch(&sGeosPool.lock);
if (sGeosPool.size >= sGeosPool.capacity) { if (sGeosPool.size >= sGeosPool.capacity) {
sGeosPool.capacity += 64; sGeosPool.capacity += 128;
void *tmp = taosMemoryRealloc(sGeosPool.pool, sGeosPool.capacity * sizeof(SGeosContext)); void *tmp = taosMemoryRealloc(sGeosPool.pool, sGeosPool.capacity * sizeof(SGeosContext));
if (!tmp) { if (!tmp) {
taosWUnLockLatch(&sGeosPool.lock); taosWUnLockLatch(&sGeosPool.lock);
@ -51,6 +54,8 @@ SGeosContext *getThreadLocalGeosCtx() {
return tlGeosCtx; return tlGeosCtx;
} }
const char *getGeosErrMsg(int32_t code) { return tlGeosCtx ? tlGeosCtx->errMsg : (code != 0 ? tstrerror(code) : ""); }
static void destroyGeosCtx(SGeosContext *pCtx) { static void destroyGeosCtx(SGeosContext *pCtx) {
if (pCtx) { if (pCtx) {
if (pCtx->WKTReader) { if (pCtx->WKTReader) {
@ -86,8 +91,8 @@ static void destroyGeosCtx(SGeosContext *pCtx) {
} }
} }
void destroyThreadLocalGeosCtx() { void taosGeosDestroy() {
uInfo("geos ctx is cleaned up"); uInfo("geos is cleaned up");
if (!sGeosPool.pool) return; if (!sGeosPool.pool) return;
for (int32_t i = 0; i < sGeosPool.size; ++i) { for (int32_t i = 0; i < sGeosPool.size; ++i) {
destroyGeosCtx(sGeosPool.pool + i); destroyGeosCtx(sGeosPool.pool + i);

View File

@ -178,8 +178,6 @@ void *taosProcessSchedQueue(void *scheduler) {
(*(msg.tfp))(msg.ahandle, msg.thandle); (*(msg.tfp))(msg.ahandle, msg.thandle);
} }
// destroyThreadLocalGeosCtx();
return NULL; return NULL;
} }

View File

@ -105,7 +105,6 @@ static void *tQWorkerThreadFp(SQueueWorker *worker) {
taosUpdateItemSize(qinfo.queue, 1); taosUpdateItemSize(qinfo.queue, 1);
} }
// destroyThreadLocalGeosCtx();
DestoryThreadLocalRegComp(); DestoryThreadLocalRegComp();
return NULL; return NULL;
@ -665,7 +664,6 @@ static void *tQueryAutoQWorkerThreadFp(SQueryAutoQWorker *worker) {
} }
} }
// destroyThreadLocalGeosCtx();
DestoryThreadLocalRegComp(); DestoryThreadLocalRegComp();
return NULL; return NULL;

View File

@ -611,14 +611,14 @@ void shellPrintGeometry(const unsigned char *val, int32_t length, int32_t width)
code = initCtxAsText(); code = initCtxAsText();
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
shellPrintString(getThreadLocalGeosCtx()->errMsg, width); shellPrintString(getGeosErrMsg(code), width);
return; return;
} }
char *outputWKT = NULL; char *outputWKT = NULL;
code = doAsText(val, length, &outputWKT); code = doAsText(val, length, &outputWKT);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
shellPrintString(getThreadLocalGeosCtx()->errMsg, width); // should NOT happen shellPrintString(getGeosErrMsg(code), width); // should NOT happen
return; return;
} }
@ -1282,7 +1282,6 @@ void *shellThreadLoop(void *arg) {
taosResetTerminalMode(); taosResetTerminalMode();
} while (shellRunCommand(command, true) == 0); } while (shellRunCommand(command, true) == 0);
destroyThreadLocalGeosCtx();
taosMemoryFreeClear(command); taosMemoryFreeClear(command);
shellWriteHistory(); shellWriteHistory();
shellExit(); shellExit();