From 14952d7fb4af670770bfb99a5841073d898ba8ce Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 14 Dec 2021 15:08:06 +0800 Subject: [PATCH] [td-10564] fix bug in taos_close --- source/client/inc/clientInt.h | 4 ++-- source/client/src/clientImpl.c | 6 +++-- source/client/src/clientmain.c | 3 +++ source/client/src/tscEnv.c | 35 ++++++++++++++---------------- source/client/test/clientTests.cpp | 5 ++++- 5 files changed, 29 insertions(+), 24 deletions(-) diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index 749894444e..3180923aff 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -58,7 +58,7 @@ typedef struct SAppInstInfo { SCorEpSet mgmtEp; SInstanceActivity summary; SList *pConnList; // STscObj linked list - char clusterId[TSDB_CLUSTER_ID_LEN]; + uint32_t clusterId; void *pTransporter; } SAppInstInfo; @@ -127,7 +127,7 @@ void* createTscObj(const char* user, const char* auth, const char *ip, uint32_t void destroyTscObj(void*pObj); void* createRequest(STscObj* pObj, __taos_async_fn_t fp, void* param, int32_t type); -void destroyRequest(void* p); +void destroyRequest(SRequestObj* pRequest); TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, const char *auth, const char *db, uint16_t port); diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 0f9b40e262..47b2c61ffc 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -172,7 +172,7 @@ STscObj* taosConnectImpl(const char *ip, const char *user, const char *auth, con taos_close(pTscObj); pTscObj = NULL; } else { - tscDebug("%p connection is opening, dnodeConn:%p", pTscObj, pTscObj->pTransporter); + tscDebug("0x%"PRIx64" connection is opening, connId:%d, dnodeConn:%p", pTscObj->id, pTscObj->connId, pTscObj->pTransporter); destroyRequest(pRequest); } @@ -267,6 +267,8 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { tstrerror(pMsg->code), pMsg->contLen); } - taosReleaseRef(requestRefId, requestRefId); + taosReleaseRef(tscReqRef, requestRefId); rpcFreeCont(pMsg->pCont); + + sem_post(&pRequest->body.rspSem); } diff --git a/source/client/src/clientmain.c b/source/client/src/clientmain.c index bda2311f97..ba80135850 100644 --- a/source/client/src/clientmain.c +++ b/source/client/src/clientmain.c @@ -70,7 +70,10 @@ void taos_close(TAOS* taos) { return; } + STscObj *pTscObj = (STscObj *)taos; + tscDebug("0x%"PRIx64" try to close connection, numOfReq:%d", pTscObj->id, pTscObj->numOfReqs); + taosRemoveRef(tscConnRef, pTscObj->id); } const char *taos_errstr(TAOS_RES *res) { diff --git a/source/client/src/tscEnv.c b/source/client/src/tscEnv.c index 7565dd0c53..d37eb1ebcb 100644 --- a/source/client/src/tscEnv.c +++ b/source/client/src/tscEnv.c @@ -36,8 +36,6 @@ int32_t tscConnRef = -1; void *tscQhandle = NULL; int32_t tsNumOfThreads = 1; - -pthread_mutex_t rpcObjMutex; // mutex to protect open the rpc obj concurrently volatile int32_t tscInitRes = 0; static void registerRequest(SRequestObj* pRequest) { @@ -65,12 +63,10 @@ static void deregisterRequest(SRequestObj* pRequest) { STscObj* pTscObj = pRequest->pTscObj; SInstanceActivity* pActivity = &pTscObj->pAppInfo->summary; - taosReleaseRef(tscReqRef, pRequest->self); - int32_t currentInst = atomic_sub_fetch_32(&pActivity->currentRequests, 1); int32_t num = atomic_sub_fetch_32(&pTscObj->numOfReqs, 1); - tscDebug("0x%"PRIx64" free Request from 0x%"PRIx64", current:%d, app current:%d", pRequest->self, pTscObj->id, num, currentInst); + tscDebug("0x%"PRIx64" free Request from connObj: 0x%"PRIx64", current:%d, app current:%d", pRequest->self, pTscObj->id, num, currentInst); taosReleaseRef(tscConnRef, pTscObj->id); } @@ -90,15 +86,6 @@ static void tscInitLogFile() { } } -void tscFreeRpcObj(void *param) { -#if 0 - assert(param); - SRpcObj *pRpcObj = (SRpcObj *)(param); - tscDebug("free rpcObj:%p and free pDnodeConn: %p", pRpcObj, pRpcObj->pDnodeConn); - rpcClose(pRpcObj->pDnodeConn); -#endif -} - void closeTransporter(STscObj* pTscObj) { if (pTscObj == NULL || pTscObj->pTransporter == NULL) { return; @@ -136,7 +123,9 @@ void* openTransporter(const char *user, const char *auth) { void destroyTscObj(void *pObj) { STscObj *pTscObj = pObj; - tscDebug("connect obj destroyed, 0x%"PRIx64, pTscObj->id); + + atomic_sub_fetch_64(&pTscObj->pAppInfo->numOfConns, 1); + tscDebug("connObj 0x%"PRIx64" destroyed, totalConn:%"PRId64, pTscObj->id, pTscObj->pAppInfo->numOfConns); closeTransporter(pTscObj); pthread_mutex_destroy(&pTscObj->mutex); @@ -161,7 +150,7 @@ void* createTscObj(const char* user, const char* auth, const char *ip, uint32_t pthread_mutex_init(&pObj->mutex, NULL); pObj->id = taosAddRef(tscConnRef, pObj); - tscDebug("connect obj created, 0x%"PRIx64, pObj->id); + tscDebug("connObj created, 0x%"PRIx64, pObj->id); return pObj; } @@ -189,9 +178,9 @@ void* createRequest(STscObj* pObj, __taos_async_fn_t fp, void* param, int32_t ty return pRequest; } -void destroyRequest(void* p) { +static void doDestroyRequest(void* p) { assert(p != NULL); - SRequestObj* pRequest = *(SRequestObj**)p; + SRequestObj* pRequest = (SRequestObj*)p; assert(RID_VALID(pRequest->self)); @@ -202,6 +191,14 @@ void destroyRequest(void* p) { deregisterRequest(pRequest); } +void destroyRequest(SRequestObj* pRequest) { + if (pRequest == NULL) { + return; + } + + taosReleaseRef(tscReqRef, pRequest->self); +} + void taos_init_imp(void) { // In the APIs of other program language, taos_cleanup is not available yet. // So, to make sure taos_cleanup will be invoked to clean up the allocated resource to suppress the valgrind warning. @@ -242,7 +239,7 @@ void taos_init_imp(void) { tscDebug("client task queue is initialized, numOfThreads: %d", numOfThreads); tscConnRef = taosOpenRef(200, destroyTscObj); - tscReqRef = taosOpenRef(40960, destroyRequest); + tscReqRef = taosOpenRef(40960, doDestroyRequest); taosGetAppName(appInfo.appName, NULL); appInfo.pid = taosGetPId(); diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index ca6e2b44a8..8da4caf7eb 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -33,5 +33,8 @@ int main(int argc, char** argv) { } TEST(testCase, driverInit_Test) { - TAOS* pTaos = taos_connect("ubuntu", "root", "taosdata", NULL, 0); + TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0); + assert(pConn != NULL); + + taos_close(pConn); } \ No newline at end of file