From ddc13d294cef52d81451604498fe8d87accbcedc Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sat, 2 Jan 2021 02:57:58 +0000 Subject: [PATCH 01/11] share rpc obj --- src/client/inc/tsclient.h | 10 +++- src/client/src/tscSql.c | 16 +++--- src/client/src/tscSystem.c | 106 ++++++++++++++++++++++++++++--------- src/client/src/tscUtil.c | 7 +-- 4 files changed, 100 insertions(+), 39 deletions(-) diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 6879470145..0f2f0513a3 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -307,6 +307,7 @@ typedef struct STscObj { struct SSqlStream *streamList; SRpcCorEpSet *tscCorMgmtEpSet; void* pDnodeConn; + void* pRpcObj; pthread_mutex_t mutex; int32_t numOfObj; // number of sqlObj from this tscObj } STscObj; @@ -377,7 +378,14 @@ typedef struct SSqlStream { void tscSetStreamDestTable(SSqlStream* pStream, const char* dstTable); -int32_t tscInitRpc(const char *user, const char *secret, void** pDnodeConn); +typedef struct { + char key[512]; + void *pDnodeConn; +} SRpcObj; + +void *tscAcquireRpc(const char *insKey); +void tscReleaseRpc(void *param); +int32_t tscInitRpc(const char *key, const char *user, const char *secret, void **pRpcObj, void** pDnodeConn); void tscInitMsgsFp(); int tsParseSql(SSqlObj *pSql, bool initial); diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index fa7bc99a9f..e70db12a7b 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -90,9 +90,12 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa } else { if (tscSetMgmtEpSetFromCfg(tsFirst, tsSecond, &corMgmtEpSet) < 0) return NULL; } + char rpcKey[512] = {0}; + sprintf(rpcKey, "%s:%s:%s:%d", user, pass, ip, port); + void *pRpcObj = NULL; void *pDnodeConn = NULL; - if (tscInitRpc(user, secretEncrypt, &pDnodeConn) != 0) { + if (tscInitRpc(rpcKey, user, secretEncrypt, &pRpcObj, &pDnodeConn) != 0) { terrno = TSDB_CODE_RPC_NETWORK_UNAVAIL; return NULL; } @@ -100,20 +103,21 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa STscObj *pObj = (STscObj *)calloc(1, sizeof(STscObj)); if (NULL == pObj) { terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; - rpcClose(pDnodeConn); + tscReleaseRpc(pRpcObj); return NULL; } // set up tscObj's mgmtEpSet pObj->tscCorMgmtEpSet = (SRpcCorEpSet *)malloc(sizeof(SRpcCorEpSet)); if (NULL == pObj->tscCorMgmtEpSet) { terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; - rpcClose(pDnodeConn); + tscReleaseRpc(pRpcObj); free(pObj->tscCorMgmtEpSet); free(pObj); } memcpy(pObj->tscCorMgmtEpSet, &corMgmtEpSet, sizeof(SRpcCorEpSet)); pObj->signature = pObj; + pObj->pRpcObj = pRpcObj; pObj->pDnodeConn = pDnodeConn; tstrncpy(pObj->user, user, sizeof(pObj->user)); @@ -125,7 +129,7 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa /* db name is too long */ if (len >= TSDB_DB_NAME_LEN) { terrno = TSDB_CODE_TSC_INVALID_DB_LENGTH; - rpcClose(pDnodeConn); + tscReleaseRpc(pRpcObj); free(pObj->tscCorMgmtEpSet); free(pObj); return NULL; @@ -143,7 +147,7 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj)); if (NULL == pSql) { terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; - rpcClose(pDnodeConn); + tscReleaseRpc(pRpcObj); free(pObj->tscCorMgmtEpSet); free(pObj); return NULL; @@ -160,7 +164,7 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa if (TSDB_CODE_SUCCESS != tscAllocPayload(&pSql->cmd, TSDB_DEFAULT_PAYLOAD_SIZE)) { terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; - rpcClose(pDnodeConn); + tscReleaseRpc(pRpcObj); free(pSql); free(pObj->tscCorMgmtEpSet); free(pObj); diff --git a/src/client/src/tscSystem.c b/src/client/src/tscSystem.c index ff605dad72..4e23bd8a03 100644 --- a/src/client/src/tscSystem.c +++ b/src/client/src/tscSystem.c @@ -40,39 +40,81 @@ void *tscCheckDiskUsageTmr; int tscRefId = -1; int tscNumOfObj = 0; // number of sqlObj in current process. +int32_t tscNumOfThreads = 1; +void * tscRpcCache; +static pthread_mutex_t rpcObjMutex; static pthread_once_t tscinit = PTHREAD_ONCE_INIT; void tscCheckDiskUsage(void *UNUSED_PARAM(para), void* UNUSED_PARAM(param)) { taosGetDisk(); taosTmrReset(tscCheckDiskUsage, 1000, NULL, tscTmr, &tscCheckDiskUsageTmr); } - -int32_t tscInitRpc(const char *user, const char *secretEncrypt, void **pDnodeConn) { - SRpcInit rpcInit; - - if (*pDnodeConn == NULL) { - memset(&rpcInit, 0, sizeof(rpcInit)); - rpcInit.localPort = 0; - rpcInit.label = "TSC"; - rpcInit.numOfThreads = 1; // every DB connection has only one thread - rpcInit.cfp = tscProcessMsgFromServer; - rpcInit.sessions = tsMaxConnections; - rpcInit.connType = TAOS_CONN_CLIENT; - rpcInit.user = (char *)user; - rpcInit.idleTime = 2000; - rpcInit.ckey = "key"; - rpcInit.spi = 1; - rpcInit.secret = (char *)secretEncrypt; - - *pDnodeConn = rpcOpen(&rpcInit); - if (*pDnodeConn == NULL) { - tscError("failed to init connection to TDengine"); - return -1; - } else { - tscDebug("dnodeConn:%p is created, user:%s", *pDnodeConn, user); - } +void tscFreeRpcObj(void *param) { + assert(param); + SRpcObj *pRpcObj = (SRpcObj *)(param); + rpcClose(pRpcObj->pDnodeConn); +} +void *tscAcquireRpc(const char *key) { + SRpcObj *pRpcObj = taosCacheAcquireByKey(tscRpcCache, key, strlen(key)); + if (pRpcObj == NULL) { + return NULL; } + return pRpcObj; +} +void tscReleaseRpc(void *param) { + if (param == NULL) { + return; + } + pthread_mutex_lock(&rpcObjMutex); + taosCacheRelease(tscRpcCache, (void *)¶m, false); + pthread_mutex_unlock(&rpcObjMutex); +} + +int32_t tscInitRpc(const char *key, const char *user, const char *secretEncrypt, void **ppRpcObj, void **pDnodeConn) { + pthread_mutex_lock(&rpcObjMutex); + + SRpcObj *pRpcObj = (SRpcObj *)tscAcquireRpc(key); + if (pRpcObj != NULL) { + *ppRpcObj = pRpcObj; + *pDnodeConn = pRpcObj->pDnodeConn; + pthread_mutex_unlock(&rpcObjMutex); + return 0; + } + + SRpcInit rpcInit; + memset(&rpcInit, 0, sizeof(rpcInit)); + rpcInit.localPort = 0; + rpcInit.label = "TSC"; + rpcInit.numOfThreads = 1; + rpcInit.cfp = tscProcessMsgFromServer; + rpcInit.sessions = tsMaxConnections; + rpcInit.connType = TAOS_CONN_CLIENT; + rpcInit.user = (char *)user; + rpcInit.idleTime = 2000; + rpcInit.ckey = "key"; + rpcInit.spi = 1; + rpcInit.secret = (char *)secretEncrypt; + + SRpcObj rpcObj; + memset(&rpcObj, 0, sizeof(rpcObj)); + strncpy(rpcObj.key, key, strlen(key)); + rpcObj.pDnodeConn = rpcOpen(&rpcInit); + if (rpcObj.pDnodeConn == NULL) { + pthread_mutex_unlock(&rpcObjMutex); + tscError("failed to init connection to TDengine"); + return -1; + } + pRpcObj = taosCachePut(tscRpcCache, rpcObj.key, strlen(rpcObj.key), &rpcObj, sizeof(rpcObj), 1000*10); + if (pRpcObj == NULL) { + rpcClose(rpcObj.pDnodeConn); + pthread_mutex_unlock(&rpcObjMutex); + return -1; + } + + *ppRpcObj = pRpcObj; + *pDnodeConn = pRpcObj->pDnodeConn; + pthread_mutex_unlock(&rpcObjMutex); return 0; } @@ -113,7 +155,7 @@ void taos_init_imp(void) { int queueSize = tsMaxConnections*2; double factor = (tscEmbedded == 0)? 2.0:4.0; - int32_t tscNumOfThreads = (int)(tsNumOfCores * tsNumOfThreadsPerCore / factor); + tscNumOfThreads = (int)(tsNumOfCores * tsNumOfThreadsPerCore / factor); if (tscNumOfThreads < 2) { tscNumOfThreads = 2; } @@ -135,6 +177,8 @@ void taos_init_imp(void) { tscObjRef = taosOpenRef(40960, tscFreeRegisteredSqlObj); tscHashMap = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); } + tscRpcCache = taosCacheInit(TSDB_DATA_TYPE_BINARY, refreshTime, true, tscFreeRpcObj, "rpcObj"); + pthread_mutex_init(&rpcObjMutex, NULL); tscRefId = taosOpenRef(200, tscCloseTscObj); @@ -169,6 +213,16 @@ void taos_cleanup(void) { taosCloseRef(tscRefId); taosCleanupKeywordsTable(); taosCloseLog(); + + m = tscRpcCache; + if (m != NULL && atomic_val_compare_exchange_ptr(&tscRpcCache, m, 0) == m) { + pthread_mutex_lock(&rpcObjMutex); + taosCacheCleanup(tscRpcCache); + tscRpcCache = NULL; + pthread_mutex_unlock(&rpcObjMutex); + pthread_mutex_destroy(&rpcObjMutex); + } + if (tscEmbedded == 0) rpcCleanup(); m = tscTmr; diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 52e26fe95a..7936c31634 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -895,15 +895,10 @@ void tscCloseTscObj(void *param) { pObj->signature = NULL; taosTmrStopA(&(pObj->pTimer)); - void* p = pObj->pDnodeConn; - if (pObj->pDnodeConn != NULL) { - rpcClose(pObj->pDnodeConn); - pObj->pDnodeConn = NULL; - } + tscReleaseRpc(pObj->pRpcObj); tfree(pObj->tscCorMgmtEpSet); pthread_mutex_destroy(&pObj->mutex); - tscDebug("%p DB connection is closed, dnodeConn:%p", pObj, p); tfree(pObj); } From ea28d23512387d910fb30d3282015c3c9ad02081 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sat, 2 Jan 2021 05:48:53 +0000 Subject: [PATCH 02/11] avoid invalid read --- src/client/src/tscUtil.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 7936c31634..c6182dcb5f 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -458,14 +458,14 @@ void tscFreeRegisteredSqlObj(void *pSql) { SSqlObj* p = *(SSqlObj**)pSql; STscObj* pTscObj = p->pTscObj; + int32_t num = atomic_sub_fetch_32(&pTscObj->numOfObj, 1); + int32_t total = atomic_sub_fetch_32(&tscNumOfObj, 1); + tscDebug("%p free SqlObj, total in tscObj:%d, total:%d", pSql, num, total); assert(RID_VALID(p->self)); tscFreeSqlObj(p); taosReleaseRef(tscRefId, pTscObj->rid); - int32_t num = atomic_sub_fetch_32(&pTscObj->numOfObj, 1); - int32_t total = atomic_sub_fetch_32(&tscNumOfObj, 1); - tscDebug("%p free SqlObj, total in tscObj:%d, total:%d", pSql, num, total); } void tscFreeTableMetaHelper(void *pTableMeta) { From b00ac2f5bb57211a26415639f6affb86e18ced5e Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sat, 2 Jan 2021 10:49:13 +0000 Subject: [PATCH 03/11] share rpc obj --- src/client/inc/tsclient.h | 18 ++--- src/client/src/tscServer.c | 24 ++++--- src/client/src/tscSql.c | 27 ++------ src/client/src/tscSystem.c | 12 ++-- src/client/src/tscUtil.c | 1 - src/rpc/src/rpcTcp.c | 130 +++++++++++++++++++++++++------------ 6 files changed, 125 insertions(+), 87 deletions(-) diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 0f2f0513a3..c11c524f96 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -290,6 +290,12 @@ typedef struct { struct SLocalReducer *pLocalReducer; } SSqlRes; +typedef struct { + char key[512]; + SRpcCorEpSet *tscCorMgmtEpSet; + void *pDnodeConn; +} SRpcObj; + typedef struct STscObj { void * signature; void * pTimer; @@ -305,9 +311,7 @@ typedef struct STscObj { int64_t hbrid; struct SSqlObj * sqlList; struct SSqlStream *streamList; - SRpcCorEpSet *tscCorMgmtEpSet; - void* pDnodeConn; - void* pRpcObj; + SRpcObj *pRpcObj; pthread_mutex_t mutex; int32_t numOfObj; // number of sqlObj from this tscObj } STscObj; @@ -378,14 +382,10 @@ typedef struct SSqlStream { void tscSetStreamDestTable(SSqlStream* pStream, const char* dstTable); -typedef struct { - char key[512]; - void *pDnodeConn; -} SRpcObj; -void *tscAcquireRpc(const char *insKey); +void *tscAcquireRpc(const char *key); void tscReleaseRpc(void *param); -int32_t tscInitRpc(const char *key, const char *user, const char *secret, void **pRpcObj, void** pDnodeConn); +int32_t tscInitRpc(const char *key, const char *user, const char *secret, void **pRpcObj); void tscInitMsgsFp(); int tsParseSql(SSqlObj *pSql, bool initial); diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index f7a2236262..8a56ccdc9b 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -72,7 +72,7 @@ static void tscSetDnodeEpSet(SRpcEpSet* pEpSet, SVgroupInfo* pVgroupInfo) { } static void tscDumpMgmtEpSet(SSqlObj *pSql) { - SRpcCorEpSet *pCorEpSet = pSql->pTscObj->tscCorMgmtEpSet; + SRpcCorEpSet *pCorEpSet = pSql->pTscObj->pRpcObj->tscCorMgmtEpSet; taosCorBeginRead(&pCorEpSet->version); pSql->epSet = pCorEpSet->epSet; taosCorEndRead(&pCorEpSet->version); @@ -95,7 +95,7 @@ bool tscEpSetIsEqual(SRpcEpSet *s1, SRpcEpSet *s2) { } void tscUpdateMgmtEpSet(SSqlObj *pSql, SRpcEpSet *pEpSet) { // no need to update if equal - SRpcCorEpSet *pCorEpSet = pSql->pTscObj->tscCorMgmtEpSet; + SRpcCorEpSet *pCorEpSet = pSql->pTscObj->pRpcObj->tscCorMgmtEpSet; taosCorBeginWrite(&pCorEpSet->version); pCorEpSet->epSet = *pEpSet; taosCorEndWrite(&pCorEpSet->version); @@ -151,13 +151,16 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { SRpcEpSet *epSet = &pRsp->epSet; if (epSet->numOfEps > 0) { tscEpSetHtons(epSet); - if (!tscEpSetIsEqual(&pSql->pTscObj->tscCorMgmtEpSet->epSet, epSet)) { - tscTrace("%p updating epset: numOfEps: %d, inUse: %d", pSql, epSet->numOfEps, epSet->inUse); - for (int8_t i = 0; i < epSet->numOfEps; i++) { - tscTrace("endpoint %d: fqdn=%s, port=%d", i, epSet->fqdn[i], epSet->port[i]); - } - tscUpdateMgmtEpSet(pSql, epSet); - } + + //SRpcCorEpSet *pCorEpSet = pSql->pTscObj->pRpcObj->tscCorMgmtEpSet; + //if (!tscEpSetIsEqual(&pCorEpSet->epSet, epSet)) { + // tscTrace("%p updating epset: numOfEps: %d, inUse: %d", pSql, epSet->numOfEps, epSet->inUse); + // for (int8_t i = 0; i < epSet->numOfEps; i++) { + // tscTrace("endpoint %d: fqdn=%s, port=%d", i, epSet->fqdn[i], epSet->port[i]); + // } + //} + //concurrency problem, update mgmt epset anyway + tscUpdateMgmtEpSet(pSql, epSet); } pSql->pTscObj->connId = htonl(pRsp->connId); @@ -264,7 +267,8 @@ int tscSendMsgToServer(SSqlObj *pSql) { .code = 0 }; - rpcSendRequest(pObj->pDnodeConn, &pSql->epSet, &rpcMsg, &pSql->rpcRid); + + rpcSendRequest(pObj->pRpcObj->pDnodeConn, &pSql->epSet, &rpcMsg, &pSql->rpcRid); return TSDB_CODE_SUCCESS; } diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index e70db12a7b..0f1824c1ed 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -94,8 +94,7 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa sprintf(rpcKey, "%s:%s:%s:%d", user, pass, ip, port); void *pRpcObj = NULL; - void *pDnodeConn = NULL; - if (tscInitRpc(rpcKey, user, secretEncrypt, &pRpcObj, &pDnodeConn) != 0) { + if (tscInitRpc(rpcKey, user, secretEncrypt, &pRpcObj) != 0) { terrno = TSDB_CODE_RPC_NETWORK_UNAVAIL; return NULL; } @@ -106,20 +105,9 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa tscReleaseRpc(pRpcObj); return NULL; } - // set up tscObj's mgmtEpSet - pObj->tscCorMgmtEpSet = (SRpcCorEpSet *)malloc(sizeof(SRpcCorEpSet)); - if (NULL == pObj->tscCorMgmtEpSet) { - terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; - tscReleaseRpc(pRpcObj); - free(pObj->tscCorMgmtEpSet); - free(pObj); - } - memcpy(pObj->tscCorMgmtEpSet, &corMgmtEpSet, sizeof(SRpcCorEpSet)); - pObj->signature = pObj; - pObj->pRpcObj = pRpcObj; - pObj->pDnodeConn = pDnodeConn; - + pObj->pRpcObj = (SRpcObj *)pRpcObj; + memcpy(pObj->pRpcObj->tscCorMgmtEpSet, &corMgmtEpSet, sizeof(SRpcCorEpSet)); tstrncpy(pObj->user, user, sizeof(pObj->user)); secretEncryptLen = MIN(secretEncryptLen, sizeof(pObj->pass)); memcpy(pObj->pass, secretEncrypt, secretEncryptLen); @@ -130,7 +118,6 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa if (len >= TSDB_DB_NAME_LEN) { terrno = TSDB_CODE_TSC_INVALID_DB_LENGTH; tscReleaseRpc(pRpcObj); - free(pObj->tscCorMgmtEpSet); free(pObj); return NULL; } @@ -148,7 +135,6 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa if (NULL == pSql) { terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; tscReleaseRpc(pRpcObj); - free(pObj->tscCorMgmtEpSet); free(pObj); return NULL; } @@ -166,7 +152,6 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; tscReleaseRpc(pRpcObj); free(pSql); - free(pObj->tscCorMgmtEpSet); free(pObj); return NULL; } @@ -205,7 +190,7 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, return NULL; } - tscDebug("%p DB connection is opening, dnodeConn:%p", pObj, pObj->pDnodeConn); + tscDebug("%p DB connection is opening, rpcObj: %p, dnodeConn:%p", pObj, pObj->pRpcObj, pObj->pRpcObj->pDnodeConn); taos_free_result(pSql); // version compare only requires the first 3 segments of the version string @@ -282,7 +267,7 @@ void taos_close(TAOS *taos) { return; } - tscDebug("%p try to free tscObj and close dnodeConn:%p", pObj, pObj->pDnodeConn); + tscDebug("%p try to free tscObj", pObj); if (pObj->signature != pObj) { tscDebug("%p already closed or invalid tscObj", pObj); return; @@ -302,7 +287,7 @@ void taos_close(TAOS *taos) { } } - tscDebug("%p all sqlObj are freed, free tscObj and close dnodeConn:%p", pObj, pObj->pDnodeConn); + tscDebug("%p all sqlObj are freed, free tscObj", pObj); taosRemoveRef(tscRefId, pObj->rid); } diff --git a/src/client/src/tscSystem.c b/src/client/src/tscSystem.c index 4e23bd8a03..f37daee3ca 100644 --- a/src/client/src/tscSystem.c +++ b/src/client/src/tscSystem.c @@ -53,6 +53,7 @@ void tscFreeRpcObj(void *param) { assert(param); SRpcObj *pRpcObj = (SRpcObj *)(param); rpcClose(pRpcObj->pDnodeConn); + tfree(pRpcObj->tscCorMgmtEpSet); } void *tscAcquireRpc(const char *key) { SRpcObj *pRpcObj = taosCacheAcquireByKey(tscRpcCache, key, strlen(key)); @@ -71,13 +72,12 @@ void tscReleaseRpc(void *param) { pthread_mutex_unlock(&rpcObjMutex); } -int32_t tscInitRpc(const char *key, const char *user, const char *secretEncrypt, void **ppRpcObj, void **pDnodeConn) { +int32_t tscInitRpc(const char *key, const char *user, const char *secretEncrypt, void **ppRpcObj) { pthread_mutex_lock(&rpcObjMutex); SRpcObj *pRpcObj = (SRpcObj *)tscAcquireRpc(key); if (pRpcObj != NULL) { *ppRpcObj = pRpcObj; - *pDnodeConn = pRpcObj->pDnodeConn; pthread_mutex_unlock(&rpcObjMutex); return 0; } @@ -86,7 +86,7 @@ int32_t tscInitRpc(const char *key, const char *user, const char *secretEncrypt, memset(&rpcInit, 0, sizeof(rpcInit)); rpcInit.localPort = 0; rpcInit.label = "TSC"; - rpcInit.numOfThreads = 1; + rpcInit.numOfThreads = tscNumOfThreads * 2; rpcInit.cfp = tscProcessMsgFromServer; rpcInit.sessions = tsMaxConnections; rpcInit.connType = TAOS_CONN_CLIENT; @@ -111,9 +111,13 @@ int32_t tscInitRpc(const char *key, const char *user, const char *secretEncrypt, pthread_mutex_unlock(&rpcObjMutex); return -1; } + pRpcObj->tscCorMgmtEpSet = malloc(sizeof(SRpcCorEpSet)); + if (pRpcObj->tscCorMgmtEpSet == NULL) { + rpcClose(rpcObj.pDnodeConn); + pthread_mutex_unlock(&rpcObjMutex); + } *ppRpcObj = pRpcObj; - *pDnodeConn = pRpcObj->pDnodeConn; pthread_mutex_unlock(&rpcObjMutex); return 0; } diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index c6182dcb5f..5507748720 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -896,7 +896,6 @@ void tscCloseTscObj(void *param) { taosTmrStopA(&(pObj->pTimer)); tscReleaseRpc(pObj->pRpcObj); - tfree(pObj->tscCorMgmtEpSet); pthread_mutex_destroy(&pObj->mutex); tfree(pObj); diff --git a/src/rpc/src/rpcTcp.c b/src/rpc/src/rpcTcp.c index 178b96c423..b82e197bf7 100644 --- a/src/rpc/src/rpcTcp.c +++ b/src/rpc/src/rpcTcp.c @@ -55,6 +55,13 @@ typedef struct SThreadObj { void *(*processData)(SRecvInfo *pPacket); } SThreadObj; +typedef struct { + char label[TSDB_LABEL_LEN]; + int32_t index; + int numOfThreads; + SThreadObj **pThreadObj; +} SClientObj; + typedef struct { SOCKET fd; uint32_t ip; @@ -121,6 +128,7 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread pThreadObj->processData = fp; tstrncpy(pThreadObj->label, label, sizeof(pThreadObj->label)); pThreadObj->shandle = shandle; + pThreadObj->stop = false; } // initialize mutex, thread, fd which may fail @@ -171,6 +179,7 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread } static void taosStopTcpThread(SThreadObj* pThreadObj) { + if (pThreadObj == NULL) { return;} // save thread into local variable and signal thread to stop pthread_t thread = pThreadObj->thread; if (!taosCheckPthreadValid(thread)) { @@ -275,48 +284,77 @@ static void *taosAcceptTcpConnection(void *arg) { return NULL; } -void *taosInitTcpClient(uint32_t ip, uint16_t port, char *label, int num, void *fp, void *shandle) { - SThreadObj *pThreadObj; +void *taosInitTcpClient(uint32_t ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle) { + SClientObj *pClientObj = (SClientObj *)calloc(1, sizeof(SClientObj)); + if (pClientObj == NULL) { + tError("TCP:%s no enough memory", label); + terrno = TAOS_SYSTEM_ERROR(errno); + return NULL; + } + + + tstrncpy(pClientObj->label, label, sizeof(pClientObj->label)); + pClientObj->numOfThreads = numOfThreads; + pClientObj->pThreadObj = (SThreadObj **)calloc(numOfThreads, sizeof(SThreadObj*)); + if (pClientObj->pThreadObj == NULL) { + tError("TCP:%s no enough memory", label); + tfree(pClientObj); + terrno = TAOS_SYSTEM_ERROR(errno); + } + + int code = 0; pthread_attr_t thattr; - - pThreadObj = (SThreadObj *)malloc(sizeof(SThreadObj)); - memset(pThreadObj, 0, sizeof(SThreadObj)); - tstrncpy(pThreadObj->label, label, sizeof(pThreadObj->label)); - pThreadObj->ip = ip; - pThreadObj->shandle = shandle; - - if (pthread_mutex_init(&(pThreadObj->mutex), NULL) < 0) { - tError("%s failed to init TCP client mutex(%s)", label, strerror(errno)); - free(pThreadObj); - terrno = TAOS_SYSTEM_ERROR(errno); - return NULL; - } - - pThreadObj->pollFd = (SOCKET)epoll_create(10); // size does not matter - if (pThreadObj->pollFd < 0) { - tError("%s failed to create TCP client epoll", label); - free(pThreadObj); - terrno = TAOS_SYSTEM_ERROR(errno); - return NULL; - } - - pThreadObj->processData = fp; - pthread_attr_init(&thattr); pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); - int code = pthread_create(&(pThreadObj->thread), &thattr, taosProcessTcpData, (void *)(pThreadObj)); - pthread_attr_destroy(&thattr); - if (code != 0) { - taosCloseSocket(pThreadObj->pollFd); - free(pThreadObj); - terrno = TAOS_SYSTEM_ERROR(errno); - tError("%s failed to create TCP read data thread(%s)", label, strerror(errno)); - return NULL; + + for (int i = 0; i < numOfThreads; ++i) { + SThreadObj *pThreadObj = (SThreadObj *)calloc(1, sizeof(SThreadObj)); + if (pThreadObj == NULL) { + tError("TCP:%s no enough memory", label); + terrno = TAOS_SYSTEM_ERROR(errno); + for (int j=0; jpThreadObj[j]); + free(pClientObj); + pthread_attr_destroy(&thattr); + return NULL; + } + pClientObj->pThreadObj[i] = pThreadObj; + taosResetPthread(&pThreadObj->thread); + pThreadObj->ip = ip; + pThreadObj->stop = false; + tstrncpy(pThreadObj->label, label, sizeof(pThreadObj->label)); + pThreadObj->shandle = shandle; + pThreadObj->processData = fp; } - tDebug("%s TCP client is initialized, ip:%u:%hu", label, ip, port); + // initialize mutex, thread, fd which may fail + for (int i = 0; i < numOfThreads; ++i) { + SThreadObj *pThreadObj = pClientObj->pThreadObj[i]; + code = pthread_mutex_init(&(pThreadObj->mutex), NULL); + if (code < 0) { + tError("%s failed to init TCP process data mutex(%s)", label, strerror(errno)); + break; + } - return pThreadObj; + pThreadObj->pollFd = (int64_t)epoll_create(10); // size does not matter + if (pThreadObj->pollFd < 0) { + tError("%s failed to create TCP epoll", label); + code = -1; + break; + } + + code = pthread_create(&(pThreadObj->thread), &thattr, taosProcessTcpData, (void *)(pThreadObj)); + if (code != 0) { + tError("%s failed to create TCP process data thread(%s)", label, strerror(errno)); + break; + } + pThreadObj->threadId = i; + } + if (code != 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + taosCleanUpTcpClient(pClientObj); + pClientObj = NULL; + } + return pClientObj; } void taosStopTcpClient(void *chandle) { @@ -327,15 +365,23 @@ void taosStopTcpClient(void *chandle) { } void taosCleanUpTcpClient(void *chandle) { - SThreadObj *pThreadObj = chandle; - if (pThreadObj == NULL) return; - - tDebug ("%s TCP client will be cleaned up", pThreadObj->label); - taosStopTcpThread(pThreadObj); + SClientObj *pClientObj = chandle; + if (pClientObj == NULL) return; + for (int i = 0; i < pClientObj->numOfThreads; ++i) { + SThreadObj *pThreadObj= pClientObj->pThreadObj[i]; + taosStopTcpThread(pThreadObj); + } + + tDebug("%s TCP client is cleaned up", pClientObj->label); + tfree(pClientObj->pThreadObj); + tfree(pClientObj); } void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uint16_t port) { - SThreadObj * pThreadObj = shandle; + SClientObj * pClientObj = shandle; + int32_t index = atomic_load_32(&pClientObj->index) % pClientObj->numOfThreads; + atomic_store_32(&pClientObj->index, index + 1); + SThreadObj *pThreadObj = pClientObj->pThreadObj[index]; SOCKET fd = taosOpenTcpClientSocket(ip, port, pThreadObj->ip); if (fd < 0) return NULL; From 19c3c7c6f3de40d02ef34593c6909a065a3f6969 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sat, 2 Jan 2021 11:05:57 +0000 Subject: [PATCH 04/11] add debug log --- src/client/src/tscSystem.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/client/src/tscSystem.c b/src/client/src/tscSystem.c index f37daee3ca..e4a1cbc7d5 100644 --- a/src/client/src/tscSystem.c +++ b/src/client/src/tscSystem.c @@ -52,6 +52,7 @@ void tscCheckDiskUsage(void *UNUSED_PARAM(para), void* UNUSED_PARAM(param)) { void tscFreeRpcObj(void *param) { assert(param); SRpcObj *pRpcObj = (SRpcObj *)(param); + tscDebug("free rpcObj:%p and free pDnodeConn: %p", pRpcObj, pRpcObj->pDnodeConn); rpcClose(pRpcObj->pDnodeConn); tfree(pRpcObj->tscCorMgmtEpSet); } From 3414548772e69ff920a4f159f380a98dab87438c Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sat, 2 Jan 2021 12:11:30 +0000 Subject: [PATCH 05/11] share mgmt ep set --- src/client/inc/tsclient.h | 2 +- src/client/src/tscSql.c | 3 +-- src/client/src/tscSystem.c | 3 ++- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index c11c524f96..d6cf9a6553 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -385,7 +385,7 @@ void tscSetStreamDestTable(SSqlStream* pStream, const char* dstTable); void *tscAcquireRpc(const char *key); void tscReleaseRpc(void *param); -int32_t tscInitRpc(const char *key, const char *user, const char *secret, void **pRpcObj); +int32_t tscInitRpc(const char *key, const char *user, const char *secret, void **pRpcObj, SRpcCorEpSet *corMgmtEpSet); void tscInitMsgsFp(); int tsParseSql(SSqlObj *pSql, bool initial); diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 0f1824c1ed..653574c8d0 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -94,7 +94,7 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa sprintf(rpcKey, "%s:%s:%s:%d", user, pass, ip, port); void *pRpcObj = NULL; - if (tscInitRpc(rpcKey, user, secretEncrypt, &pRpcObj) != 0) { + if (tscInitRpc(rpcKey, user, secretEncrypt, &pRpcObj, &corMgmtEpSet) != 0) { terrno = TSDB_CODE_RPC_NETWORK_UNAVAIL; return NULL; } @@ -107,7 +107,6 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa } pObj->signature = pObj; pObj->pRpcObj = (SRpcObj *)pRpcObj; - memcpy(pObj->pRpcObj->tscCorMgmtEpSet, &corMgmtEpSet, sizeof(SRpcCorEpSet)); tstrncpy(pObj->user, user, sizeof(pObj->user)); secretEncryptLen = MIN(secretEncryptLen, sizeof(pObj->pass)); memcpy(pObj->pass, secretEncrypt, secretEncryptLen); diff --git a/src/client/src/tscSystem.c b/src/client/src/tscSystem.c index e4a1cbc7d5..41c0ae1bad 100644 --- a/src/client/src/tscSystem.c +++ b/src/client/src/tscSystem.c @@ -73,7 +73,7 @@ void tscReleaseRpc(void *param) { pthread_mutex_unlock(&rpcObjMutex); } -int32_t tscInitRpc(const char *key, const char *user, const char *secretEncrypt, void **ppRpcObj) { +int32_t tscInitRpc(const char *key, const char *user, const char *secretEncrypt, void **ppRpcObj, SRpcCorEpSet *corMgmtEpSet) { pthread_mutex_lock(&rpcObjMutex); SRpcObj *pRpcObj = (SRpcObj *)tscAcquireRpc(key); @@ -117,6 +117,7 @@ int32_t tscInitRpc(const char *key, const char *user, const char *secretEncrypt, rpcClose(rpcObj.pDnodeConn); pthread_mutex_unlock(&rpcObjMutex); } + memcpy(pRpcObj->tscCorMgmtEpSet, corMgmtEpSet, sizeof(*corMgmtEpSet)); *ppRpcObj = pRpcObj; pthread_mutex_unlock(&rpcObjMutex); From 31534ad1976858d9346561cd49713265c198b9ec Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sat, 2 Jan 2021 21:32:25 +0000 Subject: [PATCH 06/11] refactor rpc code --- src/client/inc/tsclient.h | 5 ++--- src/client/src/tscSql.c | 2 +- src/client/src/tscSystem.c | 11 ++--------- 3 files changed, 5 insertions(+), 13 deletions(-) diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index d6cf9a6553..8c8e18c825 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -383,10 +383,9 @@ typedef struct SSqlStream { void tscSetStreamDestTable(SSqlStream* pStream, const char* dstTable); -void *tscAcquireRpc(const char *key); +int tscAcquireRpc(const char *key, const char *user, const char *secret, SRpcCorEpSet *corMgmtEpSet, void **pRpcObj); void tscReleaseRpc(void *param); -int32_t tscInitRpc(const char *key, const char *user, const char *secret, void **pRpcObj, SRpcCorEpSet *corMgmtEpSet); -void tscInitMsgsFp(); +void tscInitMsgsFp(); int tsParseSql(SSqlObj *pSql, bool initial); diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 653574c8d0..0730fffdd5 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -94,7 +94,7 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa sprintf(rpcKey, "%s:%s:%s:%d", user, pass, ip, port); void *pRpcObj = NULL; - if (tscInitRpc(rpcKey, user, secretEncrypt, &pRpcObj, &corMgmtEpSet) != 0) { + if (tscAcquireRpc(rpcKey, user, secretEncrypt,&corMgmtEpSet, &pRpcObj) != 0) { terrno = TSDB_CODE_RPC_NETWORK_UNAVAIL; return NULL; } diff --git a/src/client/src/tscSystem.c b/src/client/src/tscSystem.c index 41c0ae1bad..7a92b4fc05 100644 --- a/src/client/src/tscSystem.c +++ b/src/client/src/tscSystem.c @@ -56,13 +56,6 @@ void tscFreeRpcObj(void *param) { rpcClose(pRpcObj->pDnodeConn); tfree(pRpcObj->tscCorMgmtEpSet); } -void *tscAcquireRpc(const char *key) { - SRpcObj *pRpcObj = taosCacheAcquireByKey(tscRpcCache, key, strlen(key)); - if (pRpcObj == NULL) { - return NULL; - } - return pRpcObj; -} void tscReleaseRpc(void *param) { if (param == NULL) { @@ -73,10 +66,10 @@ void tscReleaseRpc(void *param) { pthread_mutex_unlock(&rpcObjMutex); } -int32_t tscInitRpc(const char *key, const char *user, const char *secretEncrypt, void **ppRpcObj, SRpcCorEpSet *corMgmtEpSet) { +int32_t tscAcquireRpc(const char *key, const char *user, const char *secretEncrypt, SRpcCorEpSet *corMgmtEpSet, void **ppRpcObj) { pthread_mutex_lock(&rpcObjMutex); - SRpcObj *pRpcObj = (SRpcObj *)tscAcquireRpc(key); + SRpcObj *pRpcObj = (SRpcObj *)taosCacheAcquireByKey(tscRpcCache, key, strlen(key)); if (pRpcObj != NULL) { *ppRpcObj = pRpcObj; pthread_mutex_unlock(&rpcObjMutex); From 3d0ce022f03b10c8998c1ec9722601682d49df60 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sun, 3 Jan 2021 12:43:02 +0000 Subject: [PATCH 07/11] refactor code --- src/client/src/tscSql.c | 2 +- src/client/src/tscSystem.c | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 0730fffdd5..2390df5f21 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -91,7 +91,7 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa if (tscSetMgmtEpSetFromCfg(tsFirst, tsSecond, &corMgmtEpSet) < 0) return NULL; } char rpcKey[512] = {0}; - sprintf(rpcKey, "%s:%s:%s:%d", user, pass, ip, port); + snprintf(rpcKey, sizeof(rpcKey), "%s:%s:%s:%d", user, pass, ip, port); void *pRpcObj = NULL; if (tscAcquireRpc(rpcKey, user, secretEncrypt,&corMgmtEpSet, &pRpcObj) != 0) { diff --git a/src/client/src/tscSystem.c b/src/client/src/tscSystem.c index 7a92b4fc05..66fe4100b2 100644 --- a/src/client/src/tscSystem.c +++ b/src/client/src/tscSystem.c @@ -109,6 +109,7 @@ int32_t tscAcquireRpc(const char *key, const char *user, const char *secretEncry if (pRpcObj->tscCorMgmtEpSet == NULL) { rpcClose(rpcObj.pDnodeConn); pthread_mutex_unlock(&rpcObjMutex); + return -1; } memcpy(pRpcObj->tscCorMgmtEpSet, corMgmtEpSet, sizeof(*corMgmtEpSet)); From b740129fcf19a7fb97f8afd40bdcff1835cb4519 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sun, 3 Jan 2021 13:20:56 +0000 Subject: [PATCH 08/11] resolve conflicts --- src/client/src/tscSystem.c | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/client/src/tscSystem.c b/src/client/src/tscSystem.c index ac68d6e31d..27d23b8ee5 100644 --- a/src/client/src/tscSystem.c +++ b/src/client/src/tscSystem.c @@ -223,7 +223,6 @@ void taos_cleanup(void) { taosCleanupKeywordsTable(); taosCloseLog(); -<<<<<<< HEAD m = tscRpcCache; if (m != NULL && atomic_val_compare_exchange_ptr(&tscRpcCache, m, 0) == m) { @@ -235,8 +234,6 @@ void taos_cleanup(void) { } if (tscEmbedded == 0) rpcCleanup(); -======= ->>>>>>> ca3888c190d0415c151964a89652811e37089afd if (tscEmbedded == 0) { rpcCleanup(); From 289ede045ea2826cc637c5df603681c7fcaf780a Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sun, 3 Jan 2021 13:41:17 +0000 Subject: [PATCH 09/11] handle conflict --- src/client/src/tscSystem.c | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/src/client/src/tscSystem.c b/src/client/src/tscSystem.c index 27d23b8ee5..ce0ceb689d 100644 --- a/src/client/src/tscSystem.c +++ b/src/client/src/tscSystem.c @@ -181,6 +181,8 @@ void taos_init_imp(void) { tscTableMetaInfo = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); tscDebug("TableMeta:%p", tscTableMetaInfo); } + + int refreshTime = 5; tscRpcCache = taosCacheInit(TSDB_DATA_TYPE_BINARY, refreshTime, true, tscFreeRpcObj, "rpcObj"); pthread_mutex_init(&rpcObjMutex, NULL); @@ -224,12 +226,11 @@ void taos_cleanup(void) { taosCleanupKeywordsTable(); taosCloseLog(); - m = tscRpcCache; - if (m != NULL && atomic_val_compare_exchange_ptr(&tscRpcCache, m, 0) == m) { - pthread_mutex_lock(&rpcObjMutex); - taosCacheCleanup(tscRpcCache); - tscRpcCache = NULL; - pthread_mutex_unlock(&rpcObjMutex); + p = tscRpcCache; + tscRpcCache = NULL; + + if (p != NULL) { + taosCacheCleanup(p); pthread_mutex_destroy(&rpcObjMutex); } From d1d1b4c80a1aca05406212fe26028fb2aa4b2c59 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sun, 3 Jan 2021 15:40:03 +0000 Subject: [PATCH 10/11] delete redundancy code --- src/client/src/tscSystem.c | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/client/src/tscSystem.c b/src/client/src/tscSystem.c index ce0ceb689d..bb10f9fae9 100644 --- a/src/client/src/tscSystem.c +++ b/src/client/src/tscSystem.c @@ -236,10 +236,6 @@ void taos_cleanup(void) { if (tscEmbedded == 0) rpcCleanup(); - if (tscEmbedded == 0) { - rpcCleanup(); - } - p = tscTmr; tscTmr = NULL; taosTmrCleanUp(p); From 236c46548798add8d37c33757b7bbe3ed8c367a1 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 22 Jan 2021 23:40:35 +0000 Subject: [PATCH 11/11] fix case failure --- src/client/inc/tsclient.h | 4 ++-- src/client/src/tscServer.c | 6 +++--- src/client/src/tscSql.c | 12 +++++++++++- src/client/src/tscSystem.c | 10 +--------- src/client/src/tscUtil.c | 1 + 5 files changed, 18 insertions(+), 15 deletions(-) diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index e8b03c0db9..c9702ad1fc 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -299,7 +299,6 @@ typedef struct { typedef struct { char key[512]; - SRpcCorEpSet *tscCorMgmtEpSet; void *pDnodeConn; } SRpcObj; @@ -319,6 +318,7 @@ typedef struct STscObj { struct SSqlObj * sqlList; struct SSqlStream *streamList; SRpcObj *pRpcObj; + SRpcCorEpSet *tscCorMgmtEpSet; pthread_mutex_t mutex; int32_t numOfObj; // number of sqlObj from this tscObj } STscObj; @@ -396,7 +396,7 @@ typedef struct SSqlStream { void tscSetStreamDestTable(SSqlStream* pStream, const char* dstTable); -int tscAcquireRpc(const char *key, const char *user, const char *secret, SRpcCorEpSet *corMgmtEpSet, void **pRpcObj); +int tscAcquireRpc(const char *key, const char *user, const char *secret,void **pRpcObj); void tscReleaseRpc(void *param); void tscInitMsgsFp(); diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 381ee15912..a81e47f3ad 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -69,7 +69,7 @@ static void tscSetDnodeEpSet(SRpcEpSet* pEpSet, SVgroupInfo* pVgroupInfo) { } static void tscDumpMgmtEpSet(SSqlObj *pSql) { - SRpcCorEpSet *pCorEpSet = pSql->pTscObj->pRpcObj->tscCorMgmtEpSet; + SRpcCorEpSet *pCorEpSet = pSql->pTscObj->tscCorMgmtEpSet; taosCorBeginRead(&pCorEpSet->version); pSql->epSet = pCorEpSet->epSet; taosCorEndRead(&pCorEpSet->version); @@ -94,7 +94,7 @@ bool tscEpSetIsEqual(SRpcEpSet *s1, SRpcEpSet *s2) { void tscUpdateMgmtEpSet(SSqlObj *pSql, SRpcEpSet *pEpSet) { // no need to update if equal - SRpcCorEpSet *pCorEpSet = pSql->pTscObj->pRpcObj->tscCorMgmtEpSet; + SRpcCorEpSet *pCorEpSet = pSql->pTscObj->tscCorMgmtEpSet; taosCorBeginWrite(&pCorEpSet->version); pCorEpSet->epSet = *pEpSet; taosCorEndWrite(&pCorEpSet->version); @@ -158,7 +158,7 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { if (epSet->numOfEps > 0) { tscEpSetHtons(epSet); - //SRpcCorEpSet *pCorEpSet = pSql->pTscObj->pRpcObj->tscCorMgmtEpSet; + //SRpcCorEpSet *pCorEpSet = pSql->pTscObj->tscCorMgmtEpSet; //if (!tscEpSetIsEqual(&pCorEpSet->epSet, epSet)) { // tscTrace("%p updating epset: numOfEps: %d, inUse: %d", pSql, epSet->numOfEps, epSet->inUse); // for (int8_t i = 0; i < epSet->numOfEps; i++) { diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 04e7e764f4..448eea16bf 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -94,7 +94,7 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa snprintf(rpcKey, sizeof(rpcKey), "%s:%s:%s:%d", user, pass, ip, port); void *pRpcObj = NULL; - if (tscAcquireRpc(rpcKey, user, secretEncrypt,&corMgmtEpSet, &pRpcObj) != 0) { + if (tscAcquireRpc(rpcKey, user, secretEncrypt, &pRpcObj) != 0) { terrno = TSDB_CODE_RPC_NETWORK_UNAVAIL; return NULL; } @@ -105,6 +105,16 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa tscReleaseRpc(pRpcObj); return NULL; } + + pObj->tscCorMgmtEpSet = malloc(sizeof(SRpcCorEpSet)); + if (pObj->tscCorMgmtEpSet == NULL) { + terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; + tscReleaseRpc(pRpcObj); + free(pObj); + return NULL; + } + memcpy(pObj->tscCorMgmtEpSet, &corMgmtEpSet, sizeof(corMgmtEpSet)); + pObj->signature = pObj; pObj->pRpcObj = (SRpcObj *)pRpcObj; tstrncpy(pObj->user, user, sizeof(pObj->user)); diff --git a/src/client/src/tscSystem.c b/src/client/src/tscSystem.c index bb10f9fae9..fe1c45bd39 100644 --- a/src/client/src/tscSystem.c +++ b/src/client/src/tscSystem.c @@ -58,7 +58,6 @@ void tscFreeRpcObj(void *param) { SRpcObj *pRpcObj = (SRpcObj *)(param); tscDebug("free rpcObj:%p and free pDnodeConn: %p", pRpcObj, pRpcObj->pDnodeConn); rpcClose(pRpcObj->pDnodeConn); - tfree(pRpcObj->tscCorMgmtEpSet); } void tscReleaseRpc(void *param) { @@ -70,7 +69,7 @@ void tscReleaseRpc(void *param) { pthread_mutex_unlock(&rpcObjMutex); } -int32_t tscAcquireRpc(const char *key, const char *user, const char *secretEncrypt, SRpcCorEpSet *corMgmtEpSet, void **ppRpcObj) { +int32_t tscAcquireRpc(const char *key, const char *user, const char *secretEncrypt, void **ppRpcObj) { pthread_mutex_lock(&rpcObjMutex); SRpcObj *pRpcObj = (SRpcObj *)taosCacheAcquireByKey(tscRpcCache, key, strlen(key)); @@ -109,13 +108,6 @@ int32_t tscAcquireRpc(const char *key, const char *user, const char *secretEncry pthread_mutex_unlock(&rpcObjMutex); return -1; } - pRpcObj->tscCorMgmtEpSet = malloc(sizeof(SRpcCorEpSet)); - if (pRpcObj->tscCorMgmtEpSet == NULL) { - rpcClose(rpcObj.pDnodeConn); - pthread_mutex_unlock(&rpcObjMutex); - return -1; - } - memcpy(pRpcObj->tscCorMgmtEpSet, corMgmtEpSet, sizeof(*corMgmtEpSet)); *ppRpcObj = pRpcObj; pthread_mutex_unlock(&rpcObjMutex); diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 756dc93098..c77d5e5764 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -897,6 +897,7 @@ void tscCloseTscObj(void *param) { pObj->signature = NULL; taosTmrStopA(&(pObj->pTimer)); + tfree(pObj->tscCorMgmtEpSet); tscReleaseRpc(pObj->pRpcObj); pthread_mutex_destroy(&pObj->mutex);