From 73dfd1173a914c2e15cb0f7b277ed3d0cd592b41 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Mon, 27 Jun 2022 09:50:42 +0800 Subject: [PATCH] enh: stop query --- include/client/taos.h | 6 ++--- source/client/inc/clientInt.h | 2 +- source/client/src/clientEnv.c | 22 ++++++++---------- source/client/src/clientImpl.c | 34 ++++++++++++++++++---------- source/client/src/clientMain.c | 18 +++++++++++---- source/client/src/clientMsgHandler.c | 4 ++-- source/client/src/clientSml.c | 22 +++++++++++------- 7 files changed, 65 insertions(+), 43 deletions(-) diff --git a/include/client/taos.h b/include/client/taos.h index 5e7f12de0a..669512e9b1 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -130,10 +130,10 @@ DLL_EXPORT void taos_cleanup(void); DLL_EXPORT int taos_options(TSDB_OPTION option, const void *arg, ...); DLL_EXPORT int taos_init(void); DLL_EXPORT TAOS *taos_connect(const char *ip, const char *user, const char *pass, const char *db, uint16_t port); -DLL_EXPORT TAOS *taos_connect_auth(const char *ip, const char *user, const char *auth, const char *db, uint16_t port); -DLL_EXPORT void taos_close(TAOS *taos); +DLL_EXPORT TAOS *taos_connect_auth(const char *ip, const char *user, const char *auth, const char *db, uint16_t port); +DLL_EXPORT void taos_close(TAOS *taos); -const char *taos_data_type(int type); +const char *taos_data_type(int type); DLL_EXPORT TAOS_STMT *taos_stmt_init(TAOS *taos); DLL_EXPORT int taos_stmt_prepare(TAOS_STMT *stmt, const char *sql, unsigned long length); diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index cad262a00f..48efe89361 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -139,7 +139,7 @@ typedef struct STscObj { int8_t connType; int32_t acctId; uint32_t connId; - TAOS* id; // ref ID returned by taosAddRef + int64_t id; // ref ID returned by taosAddRef TdThreadMutex mutex; // used to protect the operation on db int32_t numOfReqs; // number of sqlObj bound to this connection SAppInstInfo* pAppInfo; diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index 500933b68b..cad1b4b1cf 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -38,7 +38,7 @@ static TdThreadOnce tscinit = PTHREAD_ONCE_INIT; volatile int32_t tscInitRes = 0; static int32_t registerRequest(SRequestObj *pRequest) { - STscObj *pTscObj = acquireTscObj(*(int64_t *)pRequest->pTscObj->id); + STscObj *pTscObj = acquireTscObj(pRequest->pTscObj->id); if (NULL == pTscObj) { terrno = TSDB_CODE_TSC_DISCONNECTED; return terrno; @@ -56,7 +56,7 @@ static int32_t registerRequest(SRequestObj *pRequest) { int32_t currentInst = atomic_add_fetch_64((int64_t *)&pSummary->currentRequests, 1); tscDebug("0x%" PRIx64 " new Request from connObj:0x%" PRIx64 ", current:%d, app current:%d, total:%d, reqId:0x%" PRIx64, - pRequest->self, *(int64_t *)pRequest->pTscObj->id, num, currentInst, total, pRequest->requestId); + pRequest->self, pRequest->pTscObj->id, num, currentInst, total, pRequest->requestId); } return TSDB_CODE_SUCCESS; @@ -74,8 +74,8 @@ static void deregisterRequest(SRequestObj *pRequest) { int64_t duration = taosGetTimestampUs() - pRequest->metric.start; tscDebug("0x%" PRIx64 " free Request from connObj: 0x%" PRIx64 ", reqId:0x%" PRIx64 " elapsed:%" PRIu64 " ms, current:%d, app current:%d", - pRequest->self, *(int64_t *)pTscObj->id, pRequest->requestId, duration / 1000, num, currentInst); - releaseTscObj(*(int64_t *)pTscObj->id); + pRequest->self, pTscObj->id, pRequest->requestId, duration / 1000, num, currentInst); + releaseTscObj(pTscObj->id); } // todo close the transporter properly @@ -84,7 +84,7 @@ void closeTransporter(STscObj *pTscObj) { return; } - tscDebug("free transporter:%p in connObj: 0x%" PRIx64, pTscObj->pAppInfo->pTransporter, *(int64_t *)pTscObj->id); + tscDebug("free transporter:%p in connObj: 0x%" PRIx64, pTscObj->pAppInfo->pTransporter, pTscObj->id); rpcClose(pTscObj->pAppInfo->pTransporter); } @@ -133,16 +133,15 @@ void closeAllRequests(SHashObj *pRequests) { void destroyTscObj(void *pObj) { STscObj *pTscObj = pObj; - SClientHbKey connKey = {.tscRid = *(int64_t *)pTscObj->id, .connType = pTscObj->connType}; + SClientHbKey connKey = {.tscRid = pTscObj->id, .connType = pTscObj->connType}; hbDeregisterConn(pTscObj->pAppInfo->pAppHbMgr, connKey); int64_t connNum = atomic_sub_fetch_64(&pTscObj->pAppInfo->numOfConns, 1); closeAllRequests(pTscObj->pRequests); schedulerStopQueryHb(pTscObj->pAppInfo->pTransporter); if (0 == connNum) { - // TODO - // closeTransporter(pTscObj); + closeTransporter(pTscObj); } - tscDebug("connObj 0x%" PRIx64 " destroyed, totalConn:%" PRId64, *(int64_t *)pTscObj->id, + tscDebug("connObj 0x%" PRIx64 " p:%p destroyed, totalConn:%" PRId64, pTscObj->id, pTscObj, pTscObj->pAppInfo->numOfConns); taosThreadMutexDestroy(&pTscObj->mutex); taosMemoryFreeClear(pTscObj); @@ -172,11 +171,10 @@ void *createTscObj(const char *user, const char *auth, const char *db, int32_t c } taosThreadMutexInit(&pObj->mutex, NULL); - pObj->id = taosMemoryMalloc(sizeof(int64_t)); - *(int64_t *)pObj->id = taosAddRef(clientConnRefPool, pObj); + pObj->id = taosAddRef(clientConnRefPool, pObj); pObj->schemalessType = 1; - tscDebug("connObj created, 0x%" PRIx64, *(int64_t *)pObj->id); + tscDebug("connObj created, 0x%" PRIx64 ",p:%p", pObj->id, pObj); return pObj; } diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 00bc7200b1..d3510b91ae 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -1170,7 +1170,7 @@ STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __t taos_close_internal(pTscObj); pTscObj = NULL; } else { - tscDebug("0x%" PRIx64 " connection is opening, connId:%u, dnodeConn:%p, reqId:0x%" PRIx64, *(int64_t*)pTscObj->id, + tscDebug("0x%" PRIx64 " connection is opening, connId:%u, dnodeConn:%p, reqId:0x%" PRIx64, pTscObj->id, pTscObj->connId, pTscObj->pAppInfo->pTransporter, pRequest->requestId); destroyRequest(pRequest); } @@ -1333,7 +1333,9 @@ TAOS* taos_connect_auth(const char* ip, const char* user, const char* auth, cons STscObj* pObj = taos_connect_internal(ip, user, NULL, auth, db, port, CONN_TYPE__QUERY); if (pObj) { - return pObj->id; + int64_t *rid = taosMemoryCalloc(1, sizeof(int64_t)); + *rid = pObj->id; + return (TAOS*)rid; } return NULL; @@ -2016,11 +2018,18 @@ void syncQueryFn(void* param, void* res, int32_t code) { } void taosAsyncQueryImpl(TAOS* taos, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly) { - STscObj* pTscObj = acquireTscObj(*(int64_t*)taos); + if (NULL == taos) { + terrno = TSDB_CODE_TSC_DISCONNECTED; + fp(param, NULL, terrno); + return; + } + + int64_t rid = *(int64_t*)taos; + STscObj* pTscObj = acquireTscObj(rid); if (pTscObj == NULL || sql == NULL || NULL == fp) { terrno = TSDB_CODE_INVALID_PARA; if (pTscObj) { - releaseTscObj(*(int64_t*)taos); + releaseTscObj(rid); } else { terrno = TSDB_CODE_TSC_DISCONNECTED; } @@ -2032,7 +2041,7 @@ void taosAsyncQueryImpl(TAOS* taos, const char* sql, __taos_async_fn_t fp, void* if (sqlLen > (size_t)TSDB_MAX_ALLOWED_SQL_LEN) { tscError("sql string exceeds max length:%d", TSDB_MAX_ALLOWED_SQL_LEN); terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT; - releaseTscObj(*(int64_t *)taos); + releaseTscObj(rid); fp(param, NULL, terrno); return; @@ -2042,7 +2051,7 @@ void taosAsyncQueryImpl(TAOS* taos, const char* sql, __taos_async_fn_t fp, void* int32_t code = buildRequest(pTscObj, sql, sqlLen, &pRequest); if (code != TSDB_CODE_SUCCESS) { terrno = code; - releaseTscObj(*(int64_t *)taos); + releaseTscObj(rid); fp(param, NULL, terrno); return; } @@ -2051,7 +2060,7 @@ void taosAsyncQueryImpl(TAOS* taos, const char* sql, __taos_async_fn_t fp, void* pRequest->body.queryFp = fp; pRequest->body.param = param; doAsyncQuery(pRequest, false); - releaseTscObj(*(int64_t *)taos); + releaseTscObj(rid); } TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly) { @@ -2060,7 +2069,8 @@ TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly) { return NULL; } - STscObj* pTscObj = acquireTscObj(*(int64_t*)taos); + int64_t rid = *(int64_t*)taos; + STscObj* pTscObj = acquireTscObj(rid); if (pTscObj == NULL || sql == NULL) { terrno = TSDB_CODE_TSC_DISCONNECTED; return NULL; @@ -2070,16 +2080,16 @@ TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly) { SSyncQueryParam* param = taosMemoryCalloc(1, sizeof(SSyncQueryParam)); tsem_init(¶m->sem, 0, 0); - taosAsyncQueryImpl(taos, sql, syncQueryFn, param, validateOnly); + taosAsyncQueryImpl((TAOS*)&rid, sql, syncQueryFn, param, validateOnly); tsem_wait(¶m->sem); - releaseTscObj(*(int64_t*)taos); + releaseTscObj(rid); return param->pRequest; #else size_t sqlLen = strlen(sql); if (sqlLen > (size_t)TSDB_MAX_ALLOWED_SQL_LEN) { - releaseTscObj(*(int64_t*)taos); + releaseTscObj(rid); tscError("sql string exceeds max length:%d", TSDB_MAX_ALLOWED_SQL_LEN); terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT; return NULL; @@ -2087,7 +2097,7 @@ TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly) { TAOS_RES* pRes = execQuery(pTscObj, sql, sqlLen, validateOnly); - releaseTscObj(*(int64_t*)taos); + releaseTscObj(rid); return pRes; #endif diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index bbd477fa3b..a24e6faf7f 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -93,7 +93,9 @@ TAOS *taos_connect(const char *ip, const char *user, const char *pass, const cha STscObj *pObj = taos_connect_internal(ip, user, pass, NULL, db, port, CONN_TYPE__QUERY); if (pObj) { - return pObj->id; + int64_t *rid = taosMemoryCalloc(1, sizeof(int64_t)); + *rid = pObj->id; + return (TAOS*)rid; } return NULL; @@ -105,9 +107,9 @@ void taos_close_internal(void *taos) { } STscObj *pTscObj = (STscObj *)taos; - tscDebug("0x%" PRIx64 " try to close connection, numOfReq:%d", *(int64_t *)pTscObj->id, pTscObj->numOfReqs); + tscDebug("0x%" PRIx64 " try to close connection, numOfReq:%d", pTscObj->id, pTscObj->numOfReqs); - taosRemoveRef(clientConnRefPool, *(int64_t *)pTscObj->id); + taosRemoveRef(clientConnRefPool, pTscObj->id); } void taos_close(TAOS *taos) { @@ -880,6 +882,12 @@ void taos_unsubscribe(TAOS_SUB *tsub, int keepProgress) { } int taos_load_table_info(TAOS *taos, const char *tableNameList) { + if (NULL == taos) { + terrno = TSDB_CODE_TSC_DISCONNECTED; + return terrno; + } + + int64_t rid = *(int64_t*)taos; const int32_t MAX_TABLE_NAME_LENGTH = 12 * 1024 * 1024; // 12MB list int32_t code = 0; SRequestObj *pRequest = NULL; @@ -897,7 +905,7 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) { return TSDB_CODE_TSC_INVALID_OPERATION; } - STscObj *pTscObj = acquireTscObj(*(int64_t *)taos); + STscObj *pTscObj = acquireTscObj(rid); if (pTscObj == NULL) { terrno = TSDB_CODE_TSC_DISCONNECTED; return terrno; @@ -942,7 +950,7 @@ _return: taosArrayDestroy(catalogReq.pTableMeta); destroyRequest(pRequest); - releaseTscObj(*(int64_t *)taos); + releaseTscObj(rid); return code; } diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index 45a525d124..818762eff6 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -77,7 +77,7 @@ int32_t processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) { for (int32_t i = 0; i < connectRsp.epSet.numOfEps; ++i) { tscDebug("0x%" PRIx64 " epSet.fqdn[%d]:%s port:%d, connObj:0x%" PRIx64, pRequest->requestId, i, - connectRsp.epSet.eps[i].fqdn, connectRsp.epSet.eps[i].port, *(int64_t*)pTscObj->id); + connectRsp.epSet.eps[i].fqdn, connectRsp.epSet.eps[i].port, pTscObj->id); } pTscObj->connId = connectRsp.connId; @@ -91,7 +91,7 @@ int32_t processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) { pTscObj->connType = connectRsp.connType; - hbRegisterConn(pTscObj->pAppInfo->pAppHbMgr, *(int64_t*)pTscObj->id, connectRsp.clusterId, connectRsp.connType); + hbRegisterConn(pTscObj->pAppInfo->pAppHbMgr, pTscObj->id, connectRsp.clusterId, connectRsp.connType); // pRequest->body.resInfo.pRspMsg = pMsg->pData; tscDebug("0x%" PRIx64 " clusterId:%" PRId64 ", totalConn:%" PRId64, pRequest->requestId, connectRsp.clusterId, diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index 7d2bf019d2..1cb0e2d54b 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -309,7 +309,7 @@ static int32_t smlApplySchemaAction(SSmlHandle *info, SSchemaAction *action) { case SCHEMA_ACTION_ADD_COLUMN: { int n = sprintf(result, "alter stable `%s` add column ", action->alterSTable.sTableName); smlBuildColumnDescription(action->alterSTable.field, result + n, capacity - n, &outBytes); - TAOS_RES *res = taos_query(info->taos->id, result); // TODO async doAsyncQuery + TAOS_RES *res = taos_query((TAOS*)&info->taos->id, result); // TODO async doAsyncQuery code = taos_errno(res); const char *errStr = taos_errstr(res); if (code != TSDB_CODE_SUCCESS) { @@ -323,7 +323,7 @@ static int32_t smlApplySchemaAction(SSmlHandle *info, SSchemaAction *action) { case SCHEMA_ACTION_ADD_TAG: { int n = sprintf(result, "alter stable `%s` add tag ", action->alterSTable.sTableName); smlBuildColumnDescription(action->alterSTable.field, result + n, capacity - n, &outBytes); - TAOS_RES *res = taos_query(info->taos->id, result); // TODO async doAsyncQuery + TAOS_RES *res = taos_query((TAOS*)&info->taos->id, result); // TODO async doAsyncQuery code = taos_errno(res); const char *errStr = taos_errstr(res); if (code != TSDB_CODE_SUCCESS) { @@ -337,7 +337,7 @@ static int32_t smlApplySchemaAction(SSmlHandle *info, SSchemaAction *action) { case SCHEMA_ACTION_CHANGE_COLUMN_SIZE: { int n = sprintf(result, "alter stable `%s` modify column ", action->alterSTable.sTableName); smlBuildColumnDescription(action->alterSTable.field, result + n, capacity - n, &outBytes); - TAOS_RES *res = taos_query(info->taos->id, result); // TODO async doAsyncQuery + TAOS_RES *res = taos_query((TAOS*)&info->taos->id, result); // TODO async doAsyncQuery code = taos_errno(res); if (code != TSDB_CODE_SUCCESS) { uError("SML:0x%" PRIx64 " apply schema action. error : %s", info->id, taos_errstr(res)); @@ -350,7 +350,7 @@ static int32_t smlApplySchemaAction(SSmlHandle *info, SSchemaAction *action) { case SCHEMA_ACTION_CHANGE_TAG_SIZE: { int n = sprintf(result, "alter stable `%s` modify tag ", action->alterSTable.sTableName); smlBuildColumnDescription(action->alterSTable.field, result + n, capacity - n, &outBytes); - TAOS_RES *res = taos_query(info->taos->id, result); // TODO async doAsyncQuery + TAOS_RES *res = taos_query((TAOS*)&info->taos->id, result); // TODO async doAsyncQuery code = taos_errno(res); if (code != TSDB_CODE_SUCCESS) { uError("SML:0x%" PRIx64 " apply schema action. error : %s", info->id, taos_errstr(res)); @@ -405,7 +405,7 @@ static int32_t smlApplySchemaAction(SSmlHandle *info, SSchemaAction *action) { pos--; ++freeBytes; outBytes = snprintf(pos, freeBytes, ")"); - TAOS_RES *res = taos_query(info->taos->id, result); + TAOS_RES *res = taos_query((TAOS*)&info->taos->id, result); code = taos_errno(res); if (code != TSDB_CODE_SUCCESS) { uError("SML:0x%" PRIx64 " apply schema action. error : %s", info->id, taos_errstr(res)); @@ -2436,7 +2436,13 @@ static void smlInsertCallback(void *param, void *res, int32_t code) { */ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int protocol, int precision) { - STscObj* pTscObj = acquireTscObj(*(int64_t*)taos); + if (NULL == taos) { + terrno = TSDB_CODE_TSC_DISCONNECTED; + return NULL; + } + + int64_t rid = *(int64_t*)taos; + STscObj* pTscObj = acquireTscObj(rid); if (NULL == pTscObj) { terrno = TSDB_CODE_TSC_DISCONNECTED; uError("SML:taos_schemaless_insert invalid taos"); @@ -2445,7 +2451,7 @@ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int pr SRequestObj* request = (SRequestObj*)createRequest(pTscObj, TSDB_SQL_INSERT); if(!request){ - releaseTscObj(*(int64_t*)taos); + releaseTscObj(rid); uError("SML:taos_schemaless_insert error request is null"); return NULL; } @@ -2533,6 +2539,6 @@ end: // ((STscObj *)taos)->schemalessType = 0; pTscObj->schemalessType = 1; uDebug("resultend:%s", request->msgBuf); - releaseTscObj(*(int64_t*)taos); + releaseTscObj(rid); return (TAOS_RES*)request; }