From bf97ce3d9d348cf205cae0a93ba7311ccb3da5bb Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 10 Dec 2021 16:28:51 +0800 Subject: [PATCH] [td-10564] refactor and add test cases. --- source/client/src/tscEnv.c | 76 ++++++++++++++++++-------------------- 1 file changed, 36 insertions(+), 40 deletions(-) diff --git a/source/client/src/tscEnv.c b/source/client/src/tscEnv.c index 26ae9a8006..0d667b0d19 100644 --- a/source/client/src/tscEnv.c +++ b/source/client/src/tscEnv.c @@ -13,8 +13,6 @@ * along with this program. If not, see . */ -#include "clientInt.h" -#include "trpc.h" #include "os.h" #include "taosmsg.h" #include "tcache.h" @@ -25,23 +23,52 @@ #include "tscLog.h" #include "tsched.h" #include "ttime.h" +#include "trpc.h" #include "ttimezone.h" +#include "clientInt.h" #define TSC_VAR_NOT_RELEASE 1 #define TSC_VAR_RELEASED 0 SAppInfo appInfo; -int32_t tscReqRef = -1; -void *tscQhandle; +int32_t tscReqRef = -1; int32_t tscConnRef = -1; -void *tscRpcCache; // TODO removed from here. +void *tscQhandle = NULL; +void *tscRpcCache= NULL; // TODO removed from here. pthread_mutex_t rpcObjMutex; // mutex to protect open the rpc obj concurrently -static pthread_once_t tscinit = PTHREAD_ONCE_INIT; -static pthread_mutex_t setConfMutex = PTHREAD_MUTEX_INITIALIZER; +volatile int32_t tscInitRes = 0; -// pthread_once can not return result code, so result code is set to a global variable. -static volatile int tscInitRes = 0; +static void registerRequest(SRequestObj* pRequest) { + STscObj*pTscObj = (STscObj*) taosAcquireRef(tscConnRef, pRequest->pTscObj->id); + assert(pTscObj != NULL); + + // connection has been released already, abort creating request. + pRequest->self = taosAddRef(tscReqRef, pRequest); + + int32_t num = atomic_add_fetch_32(&pTscObj->numOfReqs, 1); + + SInstanceActivity* pActivity = &pTscObj->pAppInfo->summary; + int32_t total = atomic_add_fetch_32(&pActivity->totalRequests, 1); + int32_t currentInst = atomic_add_fetch_32(&pActivity->currentRequests, 1); + + tscDebug("0x%"PRIx64" new Request from 0x%"PRIx64", current:%d, app current:%d, total:%d", pRequest->self, pRequest->pTscObj->id, num, currentInst, total); +} + +static void deregisterRequest(SRequestObj* pRequest) { + assert(pRequest != NULL); + + STscObj* pTscObj = pRequest->pTscObj; + SInstanceActivity* pActivity = &pTscObj->pAppInfo->summary; + + taosReleaseRef(tscReqRef, pRequest->self); + + int32_t currentInst = atomic_sub_fetch_32(&pActivity->currentRequests, 1); + int32_t num = atomic_sub_fetch_32(&pTscObj->numOfReqs, 1); + + tscDebug("0x%"PRIx64" free Request from 0x%"PRIx64", current:%d, app current:%d", pRequest->self, pTscObj->id, num, currentInst); + taosReleaseRef(tscConnRef, pTscObj->id); +} void tscFreeRpcObj(void *param) { #if 0 @@ -139,22 +166,6 @@ void* createTscObj(const char* user, const char* auth, const char *ip, uint32_t pObj->id = taosAddRef(tscConnRef, pObj); } -static void registerRequest(SRequestObj* pRequest) { - STscObj*pTscObj = (STscObj*) taosAcquireRef(tscConnRef, pRequest->pTscObj->id); - assert(pTscObj != NULL); - - // connection has been released already, abort creating request. - pRequest->self = taosAddRef(tscReqRef, pRequest); - - int32_t num = atomic_add_fetch_32(&pTscObj->numOfReqs, 1); - - SInstanceActivity* pActivity = &pTscObj->pAppInfo->summary; - int32_t total = atomic_add_fetch_32(&pActivity->totalRequests, 1); - int32_t currentInst = atomic_add_fetch_32(&pActivity->currentRequests, 1); - - tscDebug("0x%"PRIx64" new Request from 0x%"PRIx64", current:%d, app current:%d, total:%d", pRequest->self, pRequest->pTscObj->id, num, currentInst, total); -} - void* createRequest(STscObj* pObj, __taos_async_fn_t fp, void* param, int32_t type) { assert(pObj != NULL); @@ -176,21 +187,6 @@ void* createRequest(STscObj* pObj, __taos_async_fn_t fp, void* param, int32_t ty registerRequest(pRequest); } -static void deregisterRequest(SRequestObj* pRequest) { - assert(pRequest != NULL); - - STscObj* pTscObj = pRequest->pTscObj; - SInstanceActivity* pActivity = &pTscObj->pAppInfo->summary; - - taosReleaseRef(tscReqRef, pRequest->self); - - int32_t currentInst = atomic_sub_fetch_32(&pActivity->currentRequests, 1); - int32_t num = atomic_sub_fetch_32(&pTscObj->numOfReqs, 1); - - tscDebug("0x%"PRIx64" free Request from 0x%"PRIx64", current:%d, app current:%d", pRequest->self, pTscObj->id, num, currentInst); - taosReleaseRef(tscConnRef, pTscObj->id); -} - void destroyRequest(void* p) { assert(p != NULL); SRequestObj* pRequest = *(SRequestObj**)p;