fix: memory leak of geos

This commit is contained in:
kailixu 2024-08-19 17:27:15 +08:00
parent 2457c61411
commit 800e7c4e7a
5 changed files with 75 additions and 27 deletions

View File

@ -23,6 +23,7 @@
#include "query.h" #include "query.h"
#include "scheduler.h" #include "scheduler.h"
#include "tdatablock.h" #include "tdatablock.h"
#include "tgeosctx.h"
#include "tglobal.h" #include "tglobal.h"
#include "tmsg.h" #include "tmsg.h"
#include "tref.h" #include "tref.h"
@ -94,6 +95,7 @@ void taos_cleanup(void) {
tmqMgmtClose(); tmqMgmtClose();
DestroyRegexCache(); DestroyRegexCache();
destroyThreadLocalGeosCtx();
tscInfo("all local resources released"); tscInfo("all local resources released");
taosCleanupCfg(); taosCleanupCfg();

View File

@ -19,6 +19,7 @@
#include "index.h" #include "index.h"
#include "qworker.h" #include "qworker.h"
#include "tcompression.h" #include "tcompression.h"
#include "tgeosctx.h"
#include "tglobal.h" #include "tglobal.h"
#include "tgrant.h" #include "tgrant.h"
#include "tstream.h" #include "tstream.h"
@ -121,6 +122,7 @@ void dmCleanupDnode(SDnode *pDnode) {
streamMetaCleanup(); streamMetaCleanup();
indexCleanup(); indexCleanup();
taosConvDestroy(); taosConvDestroy();
destroyThreadLocalGeosCtx();
// compress destroy // compress destroy
tsCompressExit(); tsCompressExit();

View File

@ -15,38 +15,82 @@
#include "tgeosctx.h" #include "tgeosctx.h"
#include "tdef.h" #include "tdef.h"
#include "tlockfree.h"
#include "tlog.h"
static threadlocal SGeosContext tlGeosCtx = {0}; typedef struct {
SGeosContext *pool;
int32_t capacity;
int32_t size;
SRWLatch lock;
} SGeosContextPool;
SGeosContext* getThreadLocalGeosCtx() { return &tlGeosCtx; } static SGeosContextPool sGeosPool = {0};
void destroyThreadLocalGeosCtx() { static threadlocal SGeosContext *tlGeosCtx = NULL;
if (tlGeosCtx.WKTReader) {
GEOSWKTReader_destroy_r(tlGeosCtx.handle, tlGeosCtx.WKTReader); SGeosContext *getThreadLocalGeosCtx() {
tlGeosCtx.WKTReader = NULL; if (tlGeosCtx) return tlGeosCtx;
taosWLockLatch(&sGeosPool.lock);
if (sGeosPool.size >= sGeosPool.capacity) {
sGeosPool.capacity += 64;
void *tmp = taosMemoryRealloc(sGeosPool.pool, sGeosPool.capacity * sizeof(SGeosContext));
if (!tmp) {
taosWUnLockLatch(&sGeosPool.lock);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
sGeosPool.pool = tmp;
TAOS_MEMSET(sGeosPool.pool + sGeosPool.size, 0, (sGeosPool.capacity - sGeosPool.size) * sizeof(SGeosContext));
}
tlGeosCtx = sGeosPool.pool + sGeosPool.size;
++sGeosPool.size;
taosWUnLockLatch(&sGeosPool.lock);
return tlGeosCtx;
}
static void destroyGeosCtx(SGeosContext *pCtx) {
if (pCtx) {
if (pCtx->WKTReader) {
GEOSWKTReader_destroy_r(pCtx->handle, pCtx->WKTReader);
pCtx->WKTReader = NULL;
} }
if (tlGeosCtx.WKTWriter) { if (pCtx->WKTWriter) {
GEOSWKTWriter_destroy_r(tlGeosCtx.handle, tlGeosCtx.WKTWriter); GEOSWKTWriter_destroy_r(pCtx->handle, pCtx->WKTWriter);
tlGeosCtx.WKTWriter = NULL; pCtx->WKTWriter = NULL;
} }
if (tlGeosCtx.WKBReader) { if (pCtx->WKBReader) {
GEOSWKBReader_destroy_r(tlGeosCtx.handle, tlGeosCtx.WKBReader); GEOSWKBReader_destroy_r(pCtx->handle, pCtx->WKBReader);
tlGeosCtx.WKBReader = NULL; pCtx->WKBReader = NULL;
} }
if (tlGeosCtx.WKBWriter) { if (pCtx->WKBWriter) {
GEOSWKBWriter_destroy_r(tlGeosCtx.handle, tlGeosCtx.WKBWriter); GEOSWKBWriter_destroy_r(pCtx->handle, pCtx->WKBWriter);
tlGeosCtx.WKBWriter = NULL; pCtx->WKBWriter = NULL;
} }
if (tlGeosCtx.WKTRegex) { if (pCtx->WKTRegex) {
destroyRegexes(tlGeosCtx.WKTRegex, tlGeosCtx.WKTMatchData); destroyRegexes(pCtx->WKTRegex, pCtx->WKTMatchData);
pCtx->WKTRegex = NULL;
pCtx->WKTMatchData = NULL;
} }
if (tlGeosCtx.handle) { if (pCtx->handle) {
GEOS_finish_r(tlGeosCtx.handle); GEOS_finish_r(pCtx->handle);
tlGeosCtx.handle = NULL; pCtx->handle = NULL;
}
} }
} }
void destroyThreadLocalGeosCtx() {
uInfo("geos ctx is cleaned up");
if (!sGeosPool.pool) return;
for (int32_t i = 0; i < sGeosPool.size; ++i) {
destroyGeosCtx(sGeosPool.pool + i);
}
taosMemoryFreeClear(sGeosPool.pool);
}

View File

@ -178,7 +178,7 @@ void *taosProcessSchedQueue(void *scheduler) {
(*(msg.tfp))(msg.ahandle, msg.thandle); (*(msg.tfp))(msg.ahandle, msg.thandle);
} }
destroyThreadLocalGeosCtx(); // destroyThreadLocalGeosCtx();
return NULL; return NULL;
} }

View File

@ -105,7 +105,7 @@ static void *tQWorkerThreadFp(SQueueWorker *worker) {
taosUpdateItemSize(qinfo.queue, 1); taosUpdateItemSize(qinfo.queue, 1);
} }
destroyThreadLocalGeosCtx(); // destroyThreadLocalGeosCtx();
DestoryThreadLocalRegComp(); DestoryThreadLocalRegComp();
return NULL; return NULL;
@ -665,7 +665,7 @@ static void *tQueryAutoQWorkerThreadFp(SQueryAutoQWorker *worker) {
} }
} }
destroyThreadLocalGeosCtx(); // destroyThreadLocalGeosCtx();
DestoryThreadLocalRegComp(); DestoryThreadLocalRegComp();
return NULL; return NULL;