Merge branch '3.0' of github.com:taosdata/TDengine into fix/ly_res

This commit is contained in:
54liuyao 2024-08-29 16:20:25 +08:00
commit 694a454f40
14 changed files with 258 additions and 77 deletions

View File

@ -22,8 +22,12 @@
extern "C" { extern "C" {
#endif #endif
#if defined(WINDOWS) && !defined(__USE_PTHREAD) #ifdef WINDOWS
#include <tlhelp32.h>
#include <windows.h> #include <windows.h>
#endif
#if defined(WINDOWS) && !defined(__USE_PTHREAD)
#define __USE_WIN_THREAD #define __USE_WIN_THREAD
// https://learn.microsoft.com/en-us/windows/win32/winprog/using-the-windows-headers // https://learn.microsoft.com/en-us/windows/win32/winprog/using-the-windows-headers
// #ifndef _WIN32_WINNT // #ifndef _WIN32_WINNT
@ -275,6 +279,10 @@ int32_t taosThreadSpinUnlock(TdThreadSpinlock *lock);
void taosThreadTestCancel(void); void taosThreadTestCancel(void);
void taosThreadClear(TdThread *thread); void taosThreadClear(TdThread *thread);
#ifdef WINDOWS
bool taosThreadIsMain();
#endif
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -38,8 +38,9 @@ typedef struct SGeosContext {
char errMsg[512]; char errMsg[512];
} SGeosContext; } SGeosContext;
SGeosContext* getThreadLocalGeosCtx(); SGeosContext *acquireThreadLocalGeosCtx();
void destroyThreadLocalGeosCtx(); int32_t getThreadLocalGeosCtx(SGeosContext **ppCtx);
const char *getGeosErrMsg(int32_t code);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -72,23 +72,42 @@ void s3CleanUp() { /*s3End();*/
static int32_t s3ListBucket(char const *bucketname); static int32_t s3ListBucket(char const *bucketname);
static void s3DumpCfgByEp(int8_t epIndex) {
// clang-format off
(void)fprintf(stdout,
"%-24s %s\n"
"%-24s %s\n"
"%-24s %s\n"
"%-24s %s\n"
"%-24s %s\n"
"%-24s %s\n",
"hostName", tsS3Hostname[epIndex],
"bucketName", tsS3BucketName,
"protocol", (protocolG[epIndex] == S3ProtocolHTTPS ? "https" : "http"),
"uristyle", (uriStyleG[epIndex] == S3UriStyleVirtualHost ? "virtualhost" : "path"),
"accessKey", tsS3AccessKeyId[epIndex],
"accessKeySecret", tsS3AccessKeySecret[epIndex]);
// clang-format on
}
int32_t s3CheckCfg() { int32_t s3CheckCfg() {
int32_t code = 0, lino = 0; int32_t code = 0, lino = 0;
int8_t i = 0; int8_t i = 0;
if (!tsS3Enabled) { if (!tsS3Enabled) {
(void)fprintf(stderr, "s3 not configured.\n"); (void)fprintf(stderr, "s3 not configured.\n");
goto _exit; TAOS_RETURN(code);
} }
code = s3Begin(); code = s3Begin();
if (code != 0) { if (code != 0) {
(void)fprintf(stderr, "failed to initialize s3.\n"); (void)fprintf(stderr, "failed to initialize s3.\n");
TAOS_CHECK_GOTO(code, &lino, _exit); TAOS_RETURN(code);
} }
for (; i < tsS3EpNum; i++) { for (; i < tsS3EpNum; i++) {
(void)fprintf(stdout, "test s3 ep: %d/%d.\n", i + 1, tsS3EpNum); (void)fprintf(stdout, "test s3 ep (%d/%d):\n", i + 1, tsS3EpNum);
s3DumpCfgByEp(i);
// test put // test put
char testdata[17] = "0123456789abcdef"; char testdata[17] = "0123456789abcdef";
@ -109,15 +128,15 @@ int32_t s3CheckCfg() {
if (!fp) { if (!fp) {
(void)fprintf(stderr, "failed to open test file: %s.\n", path); (void)fprintf(stderr, "failed to open test file: %s.\n", path);
// uError("ERROR: %s Failed to open %s", __func__, path); // uError("ERROR: %s Failed to open %s", __func__, path);
TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _exit); TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _next);
} }
if (taosWriteFile(fp, testdata, strlen(testdata)) < 0) { if (taosWriteFile(fp, testdata, strlen(testdata)) < 0) {
(void)fprintf(stderr, "failed to write test file: %s.\n", path); (void)fprintf(stderr, "failed to write test file: %s.\n", path);
TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _exit); TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _next);
} }
if (taosFsyncFile(fp) < 0) { if (taosFsyncFile(fp) < 0) {
(void)fprintf(stderr, "failed to fsync test file: %s.\n", path); (void)fprintf(stderr, "failed to fsync test file: %s.\n", path);
TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _exit); TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _next);
} }
(void)taosCloseFile(&fp); (void)taosCloseFile(&fp);
@ -125,7 +144,7 @@ int32_t s3CheckCfg() {
code = s3PutObjectFromFileOffsetByEp(path, objectname[0], 0, 16, i); code = s3PutObjectFromFileOffsetByEp(path, objectname[0], 0, 16, i);
if (code != 0) { if (code != 0) {
(void)fprintf(stderr, "put object %s : failed.\n", objectname[0]); (void)fprintf(stderr, "put object %s : failed.\n", objectname[0]);
TAOS_CHECK_GOTO(code, &lino, _exit); TAOS_CHECK_GOTO(code, &lino, _next);
} }
(void)fprintf(stderr, "put object %s: success.\n\n", objectname[0]); (void)fprintf(stderr, "put object %s: success.\n\n", objectname[0]);
@ -134,7 +153,7 @@ int32_t s3CheckCfg() {
code = s3ListBucketByEp(tsS3BucketName, i); code = s3ListBucketByEp(tsS3BucketName, i);
if (code != 0) { if (code != 0) {
(void)fprintf(stderr, "listing bucket %s : failed.\n", tsS3BucketName); (void)fprintf(stderr, "listing bucket %s : failed.\n", tsS3BucketName);
TAOS_CHECK_GOTO(code, &lino, _exit); TAOS_CHECK_GOTO(code, &lino, _next);
} }
(void)fprintf(stderr, "listing bucket %s: success.\n\n", tsS3BucketName); (void)fprintf(stderr, "listing bucket %s: success.\n\n", tsS3BucketName);
@ -147,7 +166,7 @@ int32_t s3CheckCfg() {
code = s3GetObjectBlockByEp(objectname[0], c_offset, c_len, true, &pBlock, i); code = s3GetObjectBlockByEp(objectname[0], c_offset, c_len, true, &pBlock, i);
if (code != 0) { if (code != 0) {
(void)fprintf(stderr, "get object %s : failed.\n", objectname[0]); (void)fprintf(stderr, "get object %s : failed.\n", objectname[0]);
TAOS_CHECK_GOTO(code, &lino, _exit); TAOS_CHECK_GOTO(code, &lino, _next);
} }
char buf[7] = {0}; char buf[7] = {0};
(void)memcpy(buf, pBlock, c_len); (void)memcpy(buf, pBlock, c_len);
@ -160,18 +179,24 @@ int32_t s3CheckCfg() {
code = s3DeleteObjectsByEp(objectname, 1, i); code = s3DeleteObjectsByEp(objectname, 1, i);
if (code != 0) { if (code != 0) {
(void)fprintf(stderr, "delete object %s : failed.\n", objectname[0]); (void)fprintf(stderr, "delete object %s : failed.\n", objectname[0]);
TAOS_CHECK_GOTO(code, &lino, _exit); TAOS_CHECK_GOTO(code, &lino, _next);
} }
(void)fprintf(stderr, "delete object %s: success.\n\n", objectname[0]); (void)fprintf(stderr, "delete object %s: success.\n\n", objectname[0]);
_next:
if (fp) {
(void)taosCloseFile(&fp);
} }
s3End();
_exit:
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
(void)fprintf(stderr, "s3 check failed, code: %d, line: %d, index: %d.\n", code, lino, i); (void)fprintf(stderr, "s3 check failed, code: %d, line: %d, index: %d.\n", code, lino, i);
} }
(void)fprintf(stdout, "=================================================================\n");
}
s3End();
TAOS_RETURN(code); TAOS_RETURN(code);
} }

View File

@ -1012,7 +1012,7 @@ static int32_t sysTableGetGeomText(char* iGeom, int32_t nGeom, char** output, in
if (TSDB_CODE_SUCCESS != (code = initCtxAsText()) || if (TSDB_CODE_SUCCESS != (code = initCtxAsText()) ||
TSDB_CODE_SUCCESS != (code = doAsText(iGeom, nGeom, &outputWKT))) { TSDB_CODE_SUCCESS != (code = doAsText(iGeom, nGeom, &outputWKT))) {
qError("geo text for systable failed:%s", getThreadLocalGeosCtx()->errMsg); qError("geo text for systable failed:%s", getGeosErrMsg(code));
*output = NULL; *output = NULL;
*nOutput = 0; *nOutput = 0;
return code; return code;

View File

@ -14,7 +14,7 @@
*/ */
#include "geosWrapper.h" #include "geosWrapper.h"
#include "tdef.h" #include "tutil.h"
#include "types.h" #include "types.h"
typedef char (*_geosRelationFunc_t)(GEOSContextHandle_t handle, const GEOSGeometry *g1, const GEOSGeometry *g2); typedef char (*_geosRelationFunc_t)(GEOSContextHandle_t handle, const GEOSGeometry *g1, const GEOSGeometry *g2);
@ -23,7 +23,8 @@ typedef char (*_geosPreparedRelationFunc_t)(GEOSContextHandle_t handle, const GE
void geosFreeBuffer(void *buffer) { void geosFreeBuffer(void *buffer) {
if (buffer) { if (buffer) {
GEOSFree_r(getThreadLocalGeosCtx()->handle, buffer); SGeosContext *pCtx = acquireThreadLocalGeosCtx();
if (pCtx) GEOSFree_r(pCtx->handle, buffer);
} }
} }
@ -34,7 +35,9 @@ void geosErrMsgeHandler(const char *errMsg, void *userData) {
int32_t initCtxMakePoint() { int32_t initCtxMakePoint() {
int32_t code = TSDB_CODE_FAILED; int32_t code = TSDB_CODE_FAILED;
SGeosContext *geosCtx = getThreadLocalGeosCtx(); SGeosContext *geosCtx = NULL;
TAOS_CHECK_RETURN(getThreadLocalGeosCtx(&geosCtx));
if (geosCtx->handle == NULL) { if (geosCtx->handle == NULL) {
geosCtx->handle = GEOS_init_r(); geosCtx->handle = GEOS_init_r();
@ -59,7 +62,9 @@ int32_t initCtxMakePoint() {
// need to call geosFreeBuffer(*outputGeom) later // need to call geosFreeBuffer(*outputGeom) later
int32_t doMakePoint(double x, double y, unsigned char **outputGeom, size_t *size) { int32_t doMakePoint(double x, double y, unsigned char **outputGeom, size_t *size) {
int32_t code = TSDB_CODE_FAILED; int32_t code = TSDB_CODE_FAILED;
SGeosContext *geosCtx = getThreadLocalGeosCtx(); SGeosContext *geosCtx = NULL;
TAOS_CHECK_RETURN(getThreadLocalGeosCtx(&geosCtx));
GEOSGeometry *geom = NULL; GEOSGeometry *geom = NULL;
unsigned char *wkb = NULL; unsigned char *wkb = NULL;
@ -164,7 +169,9 @@ static int32_t initWktRegex(pcre2_code **ppRegex, pcre2_match_data **ppMatchData
int32_t initCtxGeomFromText() { int32_t initCtxGeomFromText() {
int32_t code = TSDB_CODE_FAILED; int32_t code = TSDB_CODE_FAILED;
SGeosContext *geosCtx = getThreadLocalGeosCtx(); SGeosContext *geosCtx = NULL;
TAOS_CHECK_RETURN(getThreadLocalGeosCtx(&geosCtx));
if (geosCtx->handle == NULL) { if (geosCtx->handle == NULL) {
geosCtx->handle = GEOS_init_r(); geosCtx->handle = GEOS_init_r();
@ -200,7 +207,9 @@ int32_t initCtxGeomFromText() {
// need to call geosFreeBuffer(*outputGeom) later // need to call geosFreeBuffer(*outputGeom) later
int32_t doGeomFromText(const char *inputWKT, unsigned char **outputGeom, size_t *size) { int32_t doGeomFromText(const char *inputWKT, unsigned char **outputGeom, size_t *size) {
int32_t code = TSDB_CODE_FAILED; int32_t code = TSDB_CODE_FAILED;
SGeosContext *geosCtx = getThreadLocalGeosCtx(); SGeosContext *geosCtx = NULL;
TAOS_CHECK_RETURN(getThreadLocalGeosCtx(&geosCtx));
GEOSGeometry *geom = NULL; GEOSGeometry *geom = NULL;
unsigned char *wkb = NULL; unsigned char *wkb = NULL;
@ -235,7 +244,9 @@ _exit:
int32_t initCtxAsText() { int32_t initCtxAsText() {
int32_t code = TSDB_CODE_FAILED; int32_t code = TSDB_CODE_FAILED;
SGeosContext *geosCtx = getThreadLocalGeosCtx(); SGeosContext *geosCtx = NULL;
TAOS_CHECK_RETURN(getThreadLocalGeosCtx(&geosCtx));
if (geosCtx->handle == NULL) { if (geosCtx->handle == NULL) {
geosCtx->handle = GEOS_init_r(); geosCtx->handle = GEOS_init_r();
@ -271,7 +282,9 @@ int32_t initCtxAsText() {
// need to call geosFreeBuffer(*outputWKT) later // need to call geosFreeBuffer(*outputWKT) later
int32_t doAsText(const unsigned char *inputGeom, size_t size, char **outputWKT) { int32_t doAsText(const unsigned char *inputGeom, size_t size, char **outputWKT) {
int32_t code = TSDB_CODE_FAILED; int32_t code = TSDB_CODE_FAILED;
SGeosContext *geosCtx = getThreadLocalGeosCtx(); SGeosContext *geosCtx = NULL;
TAOS_CHECK_RETURN(getThreadLocalGeosCtx(&geosCtx));
GEOSGeometry *geom = NULL; GEOSGeometry *geom = NULL;
char *wkt = NULL; char *wkt = NULL;
@ -302,7 +315,9 @@ _exit:
int32_t initCtxRelationFunc() { int32_t initCtxRelationFunc() {
int32_t code = TSDB_CODE_FAILED; int32_t code = TSDB_CODE_FAILED;
SGeosContext *geosCtx = getThreadLocalGeosCtx(); SGeosContext *geosCtx = NULL;
TAOS_CHECK_RETURN(getThreadLocalGeosCtx(&geosCtx));
if (geosCtx->handle == NULL) { if (geosCtx->handle == NULL) {
geosCtx->handle = GEOS_init_r(); geosCtx->handle = GEOS_init_r();
@ -327,7 +342,9 @@ int32_t doGeosRelation(const GEOSGeometry *geom1, const GEOSPreparedGeometry *pr
bool swapped, char *res, _geosRelationFunc_t relationFn, _geosRelationFunc_t swappedRelationFn, bool swapped, char *res, _geosRelationFunc_t relationFn, _geosRelationFunc_t swappedRelationFn,
_geosPreparedRelationFunc_t preparedRelationFn, _geosPreparedRelationFunc_t preparedRelationFn,
_geosPreparedRelationFunc_t swappedPreparedRelationFn) { _geosPreparedRelationFunc_t swappedPreparedRelationFn) {
SGeosContext *geosCtx = getThreadLocalGeosCtx(); SGeosContext *geosCtx = NULL;
TAOS_CHECK_RETURN(getThreadLocalGeosCtx(&geosCtx));
if (!preparedGeom1) { if (!preparedGeom1) {
if (!swapped) { if (!swapped) {
@ -397,11 +414,10 @@ int32_t doContainsProperly(const GEOSGeometry *geom1, const GEOSPreparedGeometry
// need to call destroyGeometry(outputGeom, outputPreparedGeom) later // need to call destroyGeometry(outputGeom, outputPreparedGeom) later
int32_t readGeometry(const unsigned char *input, GEOSGeometry **outputGeom, int32_t readGeometry(const unsigned char *input, GEOSGeometry **outputGeom,
const GEOSPreparedGeometry **outputPreparedGeom) { const GEOSPreparedGeometry **outputPreparedGeom) {
SGeosContext *geosCtx = getThreadLocalGeosCtx();
if (!outputGeom) { if (!outputGeom) {
return TSDB_CODE_FUNC_FUNTION_PARA_VALUE; return TSDB_CODE_FUNC_FUNTION_PARA_VALUE;
} }
*outputGeom = NULL; *outputGeom = NULL;
if (outputPreparedGeom) { // it means not to generate PreparedGeometry if outputPreparedGeom is NULL if (outputPreparedGeom) { // it means not to generate PreparedGeometry if outputPreparedGeom is NULL
@ -412,6 +428,8 @@ int32_t readGeometry(const unsigned char *input, GEOSGeometry **outputGeom,
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SGeosContext *geosCtx = NULL;
TAOS_CHECK_RETURN(getThreadLocalGeosCtx(&geosCtx));
*outputGeom = GEOSWKBReader_read_r(geosCtx->handle, geosCtx->WKBReader, varDataVal(input), varDataLen(input)); *outputGeom = GEOSWKBReader_read_r(geosCtx->handle, geosCtx->WKBReader, varDataVal(input), varDataLen(input));
if (*outputGeom == NULL) { if (*outputGeom == NULL) {
return TSDB_CODE_FUNC_FUNTION_PARA_VALUE; return TSDB_CODE_FUNC_FUNTION_PARA_VALUE;
@ -428,7 +446,8 @@ int32_t readGeometry(const unsigned char *input, GEOSGeometry **outputGeom,
} }
void destroyGeometry(GEOSGeometry **geom, const GEOSPreparedGeometry **preparedGeom) { void destroyGeometry(GEOSGeometry **geom, const GEOSPreparedGeometry **preparedGeom) {
SGeosContext *geosCtx = getThreadLocalGeosCtx(); SGeosContext *geosCtx = acquireThreadLocalGeosCtx();
if (!geosCtx) return;
if (preparedGeom && *preparedGeom) { if (preparedGeom && *preparedGeom) {
GEOSPreparedGeom_destroy_r(geosCtx->handle, *preparedGeom); GEOSPreparedGeom_destroy_r(geosCtx->handle, *preparedGeom);

View File

@ -115,7 +115,7 @@ void callMakePointAndCompareResult(int32_t type1, void *valueArray1, TDRowValT v
#define MAKE_POINT_FIRST_COLUMN_VALUES {2, 3, -4} #define MAKE_POINT_FIRST_COLUMN_VALUES {2, 3, -4}
#define MAKE_POINT_SECOND_COLUMN_VALUES {5, -6, -7} #define MAKE_POINT_SECOND_COLUMN_VALUES {5, -6, -7}
TEST(GeomIoFuncTest, makePointFunctionTwoColumns) { void geomIoFuncTestMakePointFunctionTwoColumns() {
const int32_t rowNum = 3; const int32_t rowNum = 3;
SScalarParam *pExpectedResult; SScalarParam *pExpectedResult;
TDRowValT valTypeArray[rowNum] = {TD_VTYPE_NORM, TD_VTYPE_NORM, TD_VTYPE_NORM}; TDRowValT valTypeArray[rowNum] = {TD_VTYPE_NORM, TD_VTYPE_NORM, TD_VTYPE_NORM};
@ -151,7 +151,7 @@ TEST(GeomIoFuncTest, makePointFunctionTwoColumns) {
destroyScalarParam(pExpectedResult, 1); destroyScalarParam(pExpectedResult, 1);
} }
TEST(GeomIoFuncTest, makePointFunctionConstant) { void geomIoFuncTestMakePointFunctionConstant() {
const int32_t rowNum = 3; const int32_t rowNum = 3;
SScalarParam *pExpectedResult; SScalarParam *pExpectedResult;
TDRowValT valTypeArray[rowNum] = {TD_VTYPE_NORM, TD_VTYPE_NORM, TD_VTYPE_NORM}; TDRowValT valTypeArray[rowNum] = {TD_VTYPE_NORM, TD_VTYPE_NORM, TD_VTYPE_NORM};
@ -188,7 +188,7 @@ TEST(GeomIoFuncTest, makePointFunctionConstant) {
destroyScalarParam(pExpectedResult, 1); destroyScalarParam(pExpectedResult, 1);
} }
TEST(GeomIoFuncTest, makePointFunctionWithNull) { void geomIoFuncTestMakePointFunctionWithNull() {
const int32_t rowNum = 3; const int32_t rowNum = 3;
SScalarParam *pExpectedResult; SScalarParam *pExpectedResult;
TDRowValT valTypeNormArray[rowNum] = {TD_VTYPE_NORM, TD_VTYPE_NORM, TD_VTYPE_NORM}; TDRowValT valTypeNormArray[rowNum] = {TD_VTYPE_NORM, TD_VTYPE_NORM, TD_VTYPE_NORM};
@ -244,7 +244,7 @@ TEST(GeomIoFuncTest, makePointFunctionWithNull) {
destroyScalarParam(pExpectedResult, 1); destroyScalarParam(pExpectedResult, 1);
} }
TEST(GeomIoFuncTest, geomFromTextFunction) { void geomIoFuncTestGeomFromTextFunction() {
const int32_t rowNum = 4; const int32_t rowNum = 4;
char strArray[rowNum][TSDB_MAX_BINARY_LEN]; char strArray[rowNum][TSDB_MAX_BINARY_LEN];
TDRowValT valTypeNormArray[rowNum] = {TD_VTYPE_NORM, TD_VTYPE_NORM, TD_VTYPE_NORM, TD_VTYPE_NORM}; TDRowValT valTypeNormArray[rowNum] = {TD_VTYPE_NORM, TD_VTYPE_NORM, TD_VTYPE_NORM, TD_VTYPE_NORM};
@ -293,7 +293,7 @@ TEST(GeomIoFuncTest, geomFromTextFunction) {
callGeomFromTextWrapper4(strArray, valTypeNormArray, 1, TSDB_CODE_FUNC_FUNTION_PARA_VALUE); callGeomFromTextWrapper4(strArray, valTypeNormArray, 1, TSDB_CODE_FUNC_FUNTION_PARA_VALUE);
} }
TEST(GeomIoFuncTest, asTextFunction) { void geomIoFuncTestAsTextFunction() {
// column input has been tested in geomFromTextFunction // column input has been tested in geomFromTextFunction
TDRowValT valTypeArray[1] = {TD_VTYPE_NORM}; TDRowValT valTypeArray[1] = {TD_VTYPE_NORM};
@ -319,3 +319,27 @@ TEST(GeomIoFuncTest, asTextFunction) {
STR_TO_VARSTR(strInput, "XXX"); STR_TO_VARSTR(strInput, "XXX");
callAsTextWrapper2(TSDB_DATA_TYPE_GEOMETRY, strInput, valTypeArray, 1, TSDB_CODE_FUNC_FUNTION_PARA_VALUE); callAsTextWrapper2(TSDB_DATA_TYPE_GEOMETRY, strInput, valTypeArray, 1, TSDB_CODE_FUNC_FUNTION_PARA_VALUE);
} }
static void geomIoFuncTestImpl() {
geomIoFuncTestMakePointFunctionTwoColumns();
geomIoFuncTestMakePointFunctionConstant();
geomIoFuncTestMakePointFunctionWithNull();
geomIoFuncTestGeomFromTextFunction();
geomIoFuncTestAsTextFunction();
}
static void *geomIoFuncTestFunc(void *arg) {
geomIoFuncTestImpl();
return nullptr;
}
static void geomIoFuncTestInThread() {
TdThread thread;
ASSERT_EQ(taosThreadCreate(&thread, nullptr, geomIoFuncTestFunc, NULL), 0);
ASSERT_EQ(taosThreadJoin(thread, nullptr), 0);
}
TEST(threadGeomFuncTest, threadFuncTest) {
geomIoFuncTestImpl();
geomIoFuncTestInThread();
}

View File

@ -154,7 +154,7 @@ void geomRelationFuncTest(FScalarExecProcess geomRelationFunc, int8_t expectedRe
callGeomRelationFuncAndCompareResult(geomRelationFunc, pInput, rowNum, TSDB_CODE_FUNC_FUNTION_PARA_VALUE, 0); callGeomRelationFuncAndCompareResult(geomRelationFunc, pInput, rowNum, TSDB_CODE_FUNC_FUNTION_PARA_VALUE, 0);
} }
TEST(GeomRelationFuncTest, intersectsFunction) { void geomRelationFuncTestIntersectsFunction() {
// 1: true, 0: false, -1: null // 1: true, 0: false, -1: null
int8_t expectedResults[6][6] = { int8_t expectedResults[6][6] = {
{1, 0, 1, 1, 1, 1}, // two columns {1, 0, 1, 1, 1, 1}, // two columns
@ -168,7 +168,7 @@ TEST(GeomRelationFuncTest, intersectsFunction) {
geomRelationFuncTest(intersectsFunction, expectedResults); geomRelationFuncTest(intersectsFunction, expectedResults);
} }
TEST(GeomRelationFuncTest, equalsFunction) { void geomRelationFuncTestEqualsFunction() {
// 1: true, 0: false, -1: null // 1: true, 0: false, -1: null
int8_t expectedResults[6][6] = { int8_t expectedResults[6][6] = {
{1, 0, 0, 0, 0, 0}, // two columns {1, 0, 0, 0, 0, 0}, // two columns
@ -182,7 +182,7 @@ TEST(GeomRelationFuncTest, equalsFunction) {
geomRelationFuncTest(equalsFunction, expectedResults); geomRelationFuncTest(equalsFunction, expectedResults);
} }
TEST(GeomRelationFuncTest, touchesFunction) { void geomRelationFuncTestTouchesFunction() {
// 1: true, 0: false, -1: null // 1: true, 0: false, -1: null
int8_t expectedResults[6][6] = { int8_t expectedResults[6][6] = {
{0, 0, 1, 0, 0, 1}, // two columns {0, 0, 1, 0, 0, 1}, // two columns
@ -196,7 +196,7 @@ TEST(GeomRelationFuncTest, touchesFunction) {
geomRelationFuncTest(touchesFunction, expectedResults); geomRelationFuncTest(touchesFunction, expectedResults);
} }
TEST(GeomRelationFuncTest, coversFunction) { void geomRelationFuncTestCoversFunction() {
// 1: true, 0: false, -1: null // 1: true, 0: false, -1: null
int8_t expectedResults[6][6] = { int8_t expectedResults[6][6] = {
{1, 0, 0, 0, 0, 0}, // two columns {1, 0, 0, 0, 0, 0}, // two columns
@ -210,7 +210,7 @@ TEST(GeomRelationFuncTest, coversFunction) {
geomRelationFuncTest(coversFunction, expectedResults); geomRelationFuncTest(coversFunction, expectedResults);
} }
TEST(GeomRelationFuncTest, containsFunction) { void geomRelationFuncTestContainsFunction() {
// 1: true, 0: false, -1: null // 1: true, 0: false, -1: null
int8_t expectedResults[6][6] = { int8_t expectedResults[6][6] = {
{1, 0, 0, 0, 0, 0}, // two columns {1, 0, 0, 0, 0, 0}, // two columns
@ -224,7 +224,7 @@ TEST(GeomRelationFuncTest, containsFunction) {
geomRelationFuncTest(containsFunction, expectedResults); geomRelationFuncTest(containsFunction, expectedResults);
} }
TEST(GeomRelationFuncTest, containsProperlyFunction) { void geomRelationFuncTestContainsProperlyFunction() {
// 1: true, 0: false, -1: null // 1: true, 0: false, -1: null
int8_t expectedResults[6][6] = { int8_t expectedResults[6][6] = {
{1, 0, 0, 0, 0, 0}, // two columns {1, 0, 0, 0, 0, 0}, // two columns
@ -237,3 +237,28 @@ TEST(GeomRelationFuncTest, containsProperlyFunction) {
geomRelationFuncTest(containsProperlyFunction, expectedResults); geomRelationFuncTest(containsProperlyFunction, expectedResults);
} }
static void geomRelationFuncTestImpl() {
geomRelationFuncTestIntersectsFunction();
geomRelationFuncTestEqualsFunction();
geomRelationFuncTestTouchesFunction();
geomRelationFuncTestCoversFunction();
geomRelationFuncTestContainsFunction();
geomRelationFuncTestContainsProperlyFunction();
}
static void *geomRelationFuncTestFunc(void *arg) {
geomRelationFuncTestImpl();
return nullptr;
}
static void geomRelationFuncTestInThread() {
TdThread thread;
ASSERT_EQ(taosThreadCreate(&thread, nullptr, geomRelationFuncTestFunc, NULL), 0);
ASSERT_EQ(taosThreadJoin(thread, nullptr), 0);
}
TEST(threadGeomRelationFuncTest, threadGeomRelationFuncTest) {
geomRelationFuncTestImpl();
geomRelationFuncTestInThread();
}

View File

@ -655,7 +655,7 @@ static int32_t parseTagToken(const char** end, SToken* pToken, SSchema* pSchema,
code = parseGeometry(pToken, &output, &size); code = parseGeometry(pToken, &output, &size);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
code = buildSyntaxErrMsg(pMsgBuf, getThreadLocalGeosCtx()->errMsg, pToken->z); code = buildSyntaxErrMsg(pMsgBuf, getGeosErrMsg(code), pToken->z);
} else if (size + VARSTR_HEADER_SIZE > pSchema->bytes) { } else if (size + VARSTR_HEADER_SIZE > pSchema->bytes) {
// Too long values will raise the invalid sql error message // Too long values will raise the invalid sql error message
code = generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_VALUE_TOO_LONG, pSchema->name); code = generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_VALUE_TOO_LONG, pSchema->name);
@ -1646,7 +1646,7 @@ static int32_t parseValueTokenImpl(SInsertParseContext* pCxt, const char** pSql,
code = parseGeometry(pToken, &output, &size); code = parseGeometry(pToken, &output, &size);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
code = buildSyntaxErrMsg(&pCxt->msg, getThreadLocalGeosCtx()->errMsg, pToken->z); code = buildSyntaxErrMsg(&pCxt->msg, getGeosErrMsg(code), pToken->z);
} }
// Too long values will raise the invalid sql error message // Too long values will raise the invalid sql error message
else if (size + VARSTR_HEADER_SIZE > pSchema->bytes) { else if (size + VARSTR_HEADER_SIZE > pSchema->bytes) {

View File

@ -446,12 +446,12 @@ static FORCE_INLINE int32_t varToGeometry(char *buf, SScalarParam *pOut, int32_t
unsigned char *t = NULL; unsigned char *t = NULL;
char *output = NULL; char *output = NULL;
if (initCtxGeomFromText()) { if ((code = initCtxGeomFromText()) != 0) {
sclError("failed to init geometry ctx, %s", getThreadLocalGeosCtx()->errMsg); sclError("failed to init geometry ctx, %s", getGeosErrMsg(code));
SCL_ERR_JRET(TSDB_CODE_APP_ERROR); SCL_ERR_JRET(TSDB_CODE_APP_ERROR);
} }
if (doGeomFromText(buf, &t, &len)) { if ((code = doGeomFromText(buf, &t, &len)) != 0) {
sclInfo("failed to convert text to geometry, %s", getThreadLocalGeosCtx()->errMsg); sclInfo("failed to convert text to geometry, %s", getGeosErrMsg(code));
SCL_ERR_JRET(TSDB_CODE_SCALAR_CONVERT_ERROR); SCL_ERR_JRET(TSDB_CODE_SCALAR_CONVERT_ERROR);
} }

View File

@ -842,3 +842,35 @@ void taosThreadTestCancel(void) {
void taosThreadClear(TdThread *thread) { void taosThreadClear(TdThread *thread) {
(void)memset(thread, 0, sizeof(TdThread)); (void)memset(thread, 0, sizeof(TdThread));
} }
#ifdef WINDOWS
bool taosThreadIsMain() {
DWORD curProcessId = GetCurrentProcessId();
DWORD curThreadId = GetCurrentThreadId();
DWORD dwThreadId = -1;
HANDLE hThreadSnapshot = CreateToolhelp32Snapshot(TH32CS_SNAPTHREAD, 0);
if (hThreadSnapshot == INVALID_HANDLE_VALUE) {
return false;
}
THREADENTRY32 te32;
te32.dwSize = sizeof(THREADENTRY32);
if (!Thread32First(hThreadSnapshot, &te32)) {
CloseHandle(hThreadSnapshot);
return false;
}
do {
if (te32.th32OwnerProcessID == curProcessId) {
dwThreadId = te32.th32ThreadID;
break;
}
} while (Thread32Next(hThreadSnapshot, &te32));
CloseHandle(hThreadSnapshot);
return curThreadId == dwThreadId;
}
#endif

View File

@ -14,39 +14,90 @@
*/ */
#include "tgeosctx.h" #include "tgeosctx.h"
#include "tdef.h" #include "tlog.h"
#include "tutil.h"
static threadlocal SGeosContext tlGeosCtx = {0}; static TdThreadKey tlGeosCtxKey = 0;
static int8_t tlGeosCtxKeyInited = 0;
SGeosContext* getThreadLocalGeosCtx() { return &tlGeosCtx; } static threadlocal SGeosContext *tlGeosCtx = NULL;
void destroyThreadLocalGeosCtx() { static void destroyThreadLocalGeosCtx(void *param) {
if (tlGeosCtx.WKTReader) { #ifdef WINDOWS
GEOSWKTReader_destroy_r(tlGeosCtx.handle, tlGeosCtx.WKTReader); if (taosThreadIsMain()) return;
tlGeosCtx.WKTReader = NULL; #endif
SGeosContext *pGeosCtx = (SGeosContext *)param;
if (!pGeosCtx) {
return;
}
if (pGeosCtx->WKTReader) {
GEOSWKTReader_destroy_r(pGeosCtx->handle, pGeosCtx->WKTReader);
pGeosCtx->WKTReader = NULL;
} }
if (tlGeosCtx.WKTWriter) { if (pGeosCtx->WKTWriter) {
GEOSWKTWriter_destroy_r(tlGeosCtx.handle, tlGeosCtx.WKTWriter); GEOSWKTWriter_destroy_r(pGeosCtx->handle, pGeosCtx->WKTWriter);
tlGeosCtx.WKTWriter = NULL; pGeosCtx->WKTWriter = NULL;
} }
if (tlGeosCtx.WKBReader) { if (pGeosCtx->WKBReader) {
GEOSWKBReader_destroy_r(tlGeosCtx.handle, tlGeosCtx.WKBReader); GEOSWKBReader_destroy_r(pGeosCtx->handle, pGeosCtx->WKBReader);
tlGeosCtx.WKBReader = NULL; pGeosCtx->WKBReader = NULL;
} }
if (tlGeosCtx.WKBWriter) { if (pGeosCtx->WKBWriter) {
GEOSWKBWriter_destroy_r(tlGeosCtx.handle, tlGeosCtx.WKBWriter); GEOSWKBWriter_destroy_r(pGeosCtx->handle, pGeosCtx->WKBWriter);
tlGeosCtx.WKBWriter = NULL; pGeosCtx->WKBWriter = NULL;
} }
if (tlGeosCtx.WKTRegex) { if (pGeosCtx->WKTRegex) {
destroyRegexes(tlGeosCtx.WKTRegex, tlGeosCtx.WKTMatchData); destroyRegexes(pGeosCtx->WKTRegex, pGeosCtx->WKTMatchData);
pGeosCtx->WKTRegex = NULL;
pGeosCtx->WKTMatchData = NULL;
} }
if (tlGeosCtx.handle) { if (pGeosCtx->handle) {
GEOS_finish_r(tlGeosCtx.handle); GEOS_finish_r(pGeosCtx->handle);
tlGeosCtx.handle = NULL; pGeosCtx->handle = NULL;
} }
taosMemoryFree(pGeosCtx);
}
SGeosContext *acquireThreadLocalGeosCtx() { return tlGeosCtx; }
int32_t getThreadLocalGeosCtx(SGeosContext **ppCtx) {
if ((*ppCtx = tlGeosCtx)) {
return 0;
}
int32_t code = 0, lino = 0;
if (atomic_val_compare_exchange_8(&tlGeosCtxKeyInited, 0, 1) == 0) {
if ((taosThreadKeyCreate(&tlGeosCtxKey, destroyThreadLocalGeosCtx)) != 0) {
atomic_store_8(&tlGeosCtxKeyInited, 0);
TAOS_CHECK_EXIT(TAOS_SYSTEM_ERROR(errno));
}
}
SGeosContext *tlGeosCtxObj = (SGeosContext *)taosMemoryCalloc(1, sizeof(SGeosContext));
if (!tlGeosCtxObj) {
TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
}
if ((taosThreadSetSpecific(tlGeosCtxKey, (const void *)tlGeosCtxObj)) != 0) {
taosMemoryFreeClear(tlGeosCtxObj);
TAOS_CHECK_EXIT(TAOS_SYSTEM_ERROR(errno));
}
*ppCtx = tlGeosCtx = tlGeosCtxObj;
_exit:
if (code != 0) {
*ppCtx = NULL;
uError("failed to get geos context at line:%d since %s", lino, tstrerror(code));
}
TAOS_RETURN(code);
}
const char *getGeosErrMsg(int32_t code) {
return (tlGeosCtx && tlGeosCtx->errMsg[0] != 0) ? tlGeosCtx->errMsg : (code ? tstrerror(code) : "");
} }

View File

@ -178,7 +178,6 @@ void *taosProcessSchedQueue(void *scheduler) {
(*(msg.tfp))(msg.ahandle, msg.thandle); (*(msg.tfp))(msg.ahandle, msg.thandle);
} }
destroyThreadLocalGeosCtx();
return NULL; return NULL;
} }

View File

@ -105,7 +105,6 @@ static void *tQWorkerThreadFp(SQueueWorker *worker) {
taosUpdateItemSize(qinfo.queue, 1); taosUpdateItemSize(qinfo.queue, 1);
} }
destroyThreadLocalGeosCtx();
DestoryThreadLocalRegComp(); DestoryThreadLocalRegComp();
return NULL; return NULL;
@ -665,7 +664,6 @@ static void *tQueryAutoQWorkerThreadFp(SQueryAutoQWorker *worker) {
} }
} }
destroyThreadLocalGeosCtx();
DestoryThreadLocalRegComp(); DestoryThreadLocalRegComp();
return NULL; return NULL;

View File

@ -613,14 +613,14 @@ void shellPrintGeometry(const unsigned char *val, int32_t length, int32_t width)
code = initCtxAsText(); code = initCtxAsText();
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
shellPrintString(getThreadLocalGeosCtx()->errMsg, width); shellPrintString(getGeosErrMsg(code), width);
return; return;
} }
char *outputWKT = NULL; char *outputWKT = NULL;
code = doAsText(val, length, &outputWKT); code = doAsText(val, length, &outputWKT);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
shellPrintString(getThreadLocalGeosCtx()->errMsg, width); // should NOT happen shellPrintString(getGeosErrMsg(code), width); // should NOT happen
return; return;
} }
@ -1284,7 +1284,6 @@ void *shellThreadLoop(void *arg) {
taosResetTerminalMode(); taosResetTerminalMode();
} while (shellRunCommand(command, true) == 0); } while (shellRunCommand(command, true) == 0);
destroyThreadLocalGeosCtx();
taosMemoryFreeClear(command); taosMemoryFreeClear(command);
shellWriteHistory(); shellWriteHistory();
shellExit(); shellExit();