fix: memory leak of geos
This commit is contained in:
parent
641fccaa93
commit
b072a83bbd
|
@ -32,14 +32,16 @@ typedef struct SGeosContext {
|
|||
GEOSWKBReader *WKBReader;
|
||||
GEOSWKBWriter *WKBWriter;
|
||||
|
||||
pcre2_code *WKTRegex;
|
||||
pcre2_code *WKTRegex;
|
||||
pcre2_match_data *WKTMatchData;
|
||||
|
||||
char errMsg[512];
|
||||
} SGeosContext;
|
||||
|
||||
SGeosContext* getThreadLocalGeosCtx();
|
||||
void destroyThreadLocalGeosCtx();
|
||||
SGeosContext *acquireThreadLocalGeosCtx();
|
||||
SGeosContext *getThreadLocalGeosCtx();
|
||||
const char *getGeosErrMsg(int32_t code);
|
||||
void taosGeosDestroy();
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@
|
|||
#include "query.h"
|
||||
#include "scheduler.h"
|
||||
#include "tdatablock.h"
|
||||
#include "tgeosctx.h"
|
||||
#include "tglobal.h"
|
||||
#include "tmsg.h"
|
||||
#include "tref.h"
|
||||
|
@ -86,6 +87,7 @@ void taos_cleanup(void) {
|
|||
tscDebug("rpc cleanup");
|
||||
|
||||
taosConvDestroy();
|
||||
taosGeosDestroy();
|
||||
|
||||
tmqMgmtClose();
|
||||
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
#include "index.h"
|
||||
#include "qworker.h"
|
||||
#include "tcompression.h"
|
||||
#include "tgeosctx.h"
|
||||
#include "tglobal.h"
|
||||
#include "tgrant.h"
|
||||
#include "tstream.h"
|
||||
|
@ -121,6 +122,7 @@ void dmCleanupDnode(SDnode *pDnode) {
|
|||
streamMetaCleanup();
|
||||
indexCleanup();
|
||||
taosConvDestroy();
|
||||
taosGeosDestroy();
|
||||
|
||||
// compress destroy
|
||||
tsCompressExit();
|
||||
|
|
|
@ -979,7 +979,7 @@ static int32_t sysTableGetGeomText(char* iGeom, int32_t nGeom, char** output, in
|
|||
|
||||
if (TSDB_CODE_SUCCESS != (code = initCtxAsText()) ||
|
||||
TSDB_CODE_SUCCESS != (code = doAsText(iGeom, nGeom, &outputWKT))) {
|
||||
qError("geo text for systable failed:%s", getThreadLocalGeosCtx()->errMsg);
|
||||
qError("geo text for systable failed:%s", getGeosErrMsg(code));
|
||||
*output = NULL;
|
||||
*nOutput = 0;
|
||||
return code;
|
||||
|
|
|
@ -23,7 +23,8 @@ typedef char (*_geosPreparedRelationFunc_t)(GEOSContextHandle_t handle, const GE
|
|||
|
||||
void geosFreeBuffer(void *buffer) {
|
||||
if (buffer) {
|
||||
GEOSFree_r(getThreadLocalGeosCtx()->handle, buffer);
|
||||
SGeosContext *pCtx = acquireThreadLocalGeosCtx();
|
||||
if (pCtx) GEOSFree_r(pCtx->handle, buffer);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -418,7 +419,8 @@ int32_t readGeometry(const unsigned char *input, GEOSGeometry **outputGeom,
|
|||
}
|
||||
|
||||
void destroyGeometry(GEOSGeometry **geom, const GEOSPreparedGeometry **preparedGeom) {
|
||||
SGeosContext *geosCtx = getThreadLocalGeosCtx();
|
||||
SGeosContext *geosCtx = acquireThreadLocalGeosCtx();
|
||||
if (!geosCtx) return;
|
||||
|
||||
if (preparedGeom && *preparedGeom) {
|
||||
GEOSPreparedGeom_destroy_r(geosCtx->handle, *preparedGeom);
|
||||
|
|
|
@ -655,7 +655,7 @@ static int32_t parseTagToken(const char** end, SToken* pToken, SSchema* pSchema,
|
|||
|
||||
code = parseGeometry(pToken, &output, &size);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
code = buildSyntaxErrMsg(pMsgBuf, getThreadLocalGeosCtx()->errMsg, pToken->z);
|
||||
code = buildSyntaxErrMsg(pMsgBuf, getGeosErrMsg(code), pToken->z);
|
||||
} else if (size + VARSTR_HEADER_SIZE > pSchema->bytes) {
|
||||
// Too long values will raise the invalid sql error message
|
||||
code = generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_VALUE_TOO_LONG, pSchema->name);
|
||||
|
@ -1646,7 +1646,7 @@ static int32_t parseValueTokenImpl(SInsertParseContext* pCxt, const char** pSql,
|
|||
|
||||
code = parseGeometry(pToken, &output, &size);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
code = buildSyntaxErrMsg(&pCxt->msg, getThreadLocalGeosCtx()->errMsg, pToken->z);
|
||||
code = buildSyntaxErrMsg(&pCxt->msg, getGeosErrMsg(code), pToken->z);
|
||||
}
|
||||
// Too long values will raise the invalid sql error message
|
||||
else if (size + VARSTR_HEADER_SIZE > pSchema->bytes) {
|
||||
|
|
|
@ -446,12 +446,12 @@ static FORCE_INLINE int32_t varToGeometry(char *buf, SScalarParam *pOut, int32_t
|
|||
unsigned char *t = NULL;
|
||||
char *output = NULL;
|
||||
|
||||
if (initCtxGeomFromText()) {
|
||||
sclError("failed to init geometry ctx, %s", getThreadLocalGeosCtx()->errMsg);
|
||||
if ((code = initCtxGeomFromText()) != 0) {
|
||||
sclError("failed to init geometry ctx, %s", getGeosErrMsg(code));
|
||||
SCL_ERR_JRET(TSDB_CODE_APP_ERROR);
|
||||
}
|
||||
if (doGeomFromText(buf, &t, &len)) {
|
||||
sclInfo("failed to convert text to geometry, %s", getThreadLocalGeosCtx()->errMsg);
|
||||
if ((code = doGeomFromText(buf, &t, &len)) != 0) {
|
||||
sclInfo("failed to convert text to geometry, %s", getGeosErrMsg(code));
|
||||
SCL_ERR_JRET(TSDB_CODE_SCALAR_CONVERT_ERROR);
|
||||
}
|
||||
|
||||
|
|
|
@ -14,39 +14,102 @@
|
|||
*/
|
||||
|
||||
#include "tgeosctx.h"
|
||||
#include "tarray.h"
|
||||
#include "tdef.h"
|
||||
#include "tlockfree.h"
|
||||
#include "tlog.h"
|
||||
|
||||
static threadlocal SGeosContext tlGeosCtx = {0};
|
||||
#define GEOS_POOL_CAPACITY 64
|
||||
typedef struct {
|
||||
SArray *poolArray; // totalSize: (GEOS_POOL_CAPACITY * (taosArrayGetSize(poolArray) - 1)) + size
|
||||
SGeosContext *pool; // current SGeosContext pool
|
||||
int32_t size; // size of current SGeosContext pool, size <= GEOS_POOL_CAPACITY
|
||||
SRWLatch lock;
|
||||
} SGeosContextPool;
|
||||
|
||||
SGeosContext* getThreadLocalGeosCtx() { return &tlGeosCtx; }
|
||||
static SGeosContextPool sGeosPool = {0};
|
||||
static threadlocal SGeosContext *tlGeosCtx = NULL;
|
||||
|
||||
void destroyThreadLocalGeosCtx() {
|
||||
if (tlGeosCtx.WKTReader) {
|
||||
GEOSWKTReader_destroy_r(tlGeosCtx.handle, tlGeosCtx.WKTReader);
|
||||
tlGeosCtx.WKTReader = NULL;
|
||||
SGeosContext *acquireThreadLocalGeosCtx() { return tlGeosCtx; }
|
||||
|
||||
SGeosContext *getThreadLocalGeosCtx() {
|
||||
if (tlGeosCtx) return tlGeosCtx;
|
||||
|
||||
taosWLockLatch(&sGeosPool.lock);
|
||||
if (!sGeosPool.pool || sGeosPool.size >= GEOS_POOL_CAPACITY) {
|
||||
if (!(sGeosPool.pool = (SGeosContext *)taosMemoryCalloc(GEOS_POOL_CAPACITY, sizeof(SGeosContext)))) {
|
||||
taosWUnLockLatch(&sGeosPool.lock);
|
||||
return NULL;
|
||||
}
|
||||
if (!sGeosPool.poolArray) {
|
||||
if (!(sGeosPool.poolArray = taosArrayInit(16, POINTER_BYTES))) {
|
||||
taosMemoryFreeClear(sGeosPool.pool);
|
||||
taosWUnLockLatch(&sGeosPool.lock);
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
if (!taosArrayPush(sGeosPool.poolArray, &sGeosPool.pool)) {
|
||||
taosMemoryFreeClear(sGeosPool.pool);
|
||||
taosWUnLockLatch(&sGeosPool.lock);
|
||||
return NULL;
|
||||
}
|
||||
sGeosPool.size = 0;
|
||||
}
|
||||
tlGeosCtx = sGeosPool.pool + sGeosPool.size;
|
||||
++sGeosPool.size;
|
||||
taosWUnLockLatch(&sGeosPool.lock);
|
||||
|
||||
if (tlGeosCtx.WKTWriter) {
|
||||
GEOSWKTWriter_destroy_r(tlGeosCtx.handle, tlGeosCtx.WKTWriter);
|
||||
tlGeosCtx.WKTWriter = NULL;
|
||||
}
|
||||
return tlGeosCtx;
|
||||
}
|
||||
|
||||
if (tlGeosCtx.WKBReader) {
|
||||
GEOSWKBReader_destroy_r(tlGeosCtx.handle, tlGeosCtx.WKBReader);
|
||||
tlGeosCtx.WKBReader = NULL;
|
||||
}
|
||||
const char *getGeosErrMsg(int32_t code) { return tlGeosCtx ? tlGeosCtx->errMsg : (code != 0 ? tstrerror(code) : ""); }
|
||||
|
||||
if (tlGeosCtx.WKBWriter) {
|
||||
GEOSWKBWriter_destroy_r(tlGeosCtx.handle, tlGeosCtx.WKBWriter);
|
||||
tlGeosCtx.WKBWriter = NULL;
|
||||
}
|
||||
static void destroyGeosCtx(SGeosContext *pCtx) {
|
||||
if (pCtx) {
|
||||
if (pCtx->WKTReader) {
|
||||
GEOSWKTReader_destroy_r(pCtx->handle, pCtx->WKTReader);
|
||||
pCtx->WKTReader = NULL;
|
||||
}
|
||||
|
||||
if (tlGeosCtx.WKTRegex) {
|
||||
destroyRegexes(tlGeosCtx.WKTRegex, tlGeosCtx.WKTMatchData);
|
||||
}
|
||||
if (pCtx->WKTWriter) {
|
||||
GEOSWKTWriter_destroy_r(pCtx->handle, pCtx->WKTWriter);
|
||||
pCtx->WKTWriter = NULL;
|
||||
}
|
||||
|
||||
if (tlGeosCtx.handle) {
|
||||
GEOS_finish_r(tlGeosCtx.handle);
|
||||
tlGeosCtx.handle = NULL;
|
||||
if (pCtx->WKBReader) {
|
||||
GEOSWKBReader_destroy_r(pCtx->handle, pCtx->WKBReader);
|
||||
pCtx->WKBReader = NULL;
|
||||
}
|
||||
|
||||
if (pCtx->WKBWriter) {
|
||||
GEOSWKBWriter_destroy_r(pCtx->handle, pCtx->WKBWriter);
|
||||
pCtx->WKBWriter = NULL;
|
||||
}
|
||||
|
||||
if (pCtx->WKTRegex) {
|
||||
destroyRegexes(pCtx->WKTRegex, pCtx->WKTMatchData);
|
||||
pCtx->WKTRegex = NULL;
|
||||
pCtx->WKTMatchData = NULL;
|
||||
}
|
||||
|
||||
if (pCtx->handle) {
|
||||
GEOS_finish_r(pCtx->handle);
|
||||
pCtx->handle = NULL;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void taosGeosDestroy() {
|
||||
uInfo("geos is cleaned up");
|
||||
int32_t size = taosArrayGetSize(sGeosPool.poolArray);
|
||||
for (int32_t i = 0; i < size; ++i) {
|
||||
SGeosContext *pool = *(SGeosContext **)TARRAY_GET_ELEM(sGeosPool.poolArray, i);
|
||||
int32_t poolSize = i == size - 1 ? sGeosPool.size : GEOS_POOL_CAPACITY;
|
||||
for (int32_t j = 0; j < poolSize; ++j) {
|
||||
destroyGeosCtx(pool + j);
|
||||
}
|
||||
taosMemoryFree(pool);
|
||||
}
|
||||
taosArrayDestroy(sGeosPool.poolArray);
|
||||
sGeosPool.poolArray = NULL;
|
||||
}
|
|
@ -178,7 +178,6 @@ void *taosProcessSchedQueue(void *scheduler) {
|
|||
(*(msg.tfp))(msg.ahandle, msg.thandle);
|
||||
}
|
||||
|
||||
destroyThreadLocalGeosCtx();
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
|
|
@ -105,7 +105,6 @@ static void *tQWorkerThreadFp(SQueueWorker *worker) {
|
|||
taosUpdateItemSize(qinfo.queue, 1);
|
||||
}
|
||||
|
||||
destroyThreadLocalGeosCtx();
|
||||
DestoryThreadLocalRegComp();
|
||||
|
||||
return NULL;
|
||||
|
@ -665,7 +664,6 @@ static void *tQueryAutoQWorkerThreadFp(SQueryAutoQWorker *worker) {
|
|||
}
|
||||
}
|
||||
|
||||
destroyThreadLocalGeosCtx();
|
||||
DestoryThreadLocalRegComp();
|
||||
|
||||
return NULL;
|
||||
|
|
|
@ -611,14 +611,14 @@ void shellPrintGeometry(const unsigned char *val, int32_t length, int32_t width)
|
|||
|
||||
code = initCtxAsText();
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
shellPrintString(getThreadLocalGeosCtx()->errMsg, width);
|
||||
shellPrintString(destroyThreadLocalGeosCtx(), width);
|
||||
return;
|
||||
}
|
||||
|
||||
char *outputWKT = NULL;
|
||||
code = doAsText(val, length, &outputWKT);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
shellPrintString(getThreadLocalGeosCtx()->errMsg, width); // should NOT happen
|
||||
shellPrintString(destroyThreadLocalGeosCtx(), width); // should NOT happen
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -1282,7 +1282,6 @@ void *shellThreadLoop(void *arg) {
|
|||
taosResetTerminalMode();
|
||||
} while (shellRunCommand(command, true) == 0);
|
||||
|
||||
destroyThreadLocalGeosCtx();
|
||||
taosMemoryFreeClear(command);
|
||||
shellWriteHistory();
|
||||
shellExit();
|
||||
|
|
Loading…
Reference in New Issue