[td-10564] refactor and add test cases.
This commit is contained in:
parent
33e384a7aa
commit
bf97ce3d9d
|
@ -13,8 +13,6 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "clientInt.h"
|
|
||||||
#include "trpc.h"
|
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
#include "taosmsg.h"
|
#include "taosmsg.h"
|
||||||
#include "tcache.h"
|
#include "tcache.h"
|
||||||
|
@ -25,23 +23,52 @@
|
||||||
#include "tscLog.h"
|
#include "tscLog.h"
|
||||||
#include "tsched.h"
|
#include "tsched.h"
|
||||||
#include "ttime.h"
|
#include "ttime.h"
|
||||||
|
#include "trpc.h"
|
||||||
#include "ttimezone.h"
|
#include "ttimezone.h"
|
||||||
|
#include "clientInt.h"
|
||||||
|
|
||||||
#define TSC_VAR_NOT_RELEASE 1
|
#define TSC_VAR_NOT_RELEASE 1
|
||||||
#define TSC_VAR_RELEASED 0
|
#define TSC_VAR_RELEASED 0
|
||||||
|
|
||||||
SAppInfo appInfo;
|
SAppInfo appInfo;
|
||||||
int32_t tscReqRef = -1;
|
int32_t tscReqRef = -1;
|
||||||
void *tscQhandle;
|
|
||||||
int32_t tscConnRef = -1;
|
int32_t tscConnRef = -1;
|
||||||
void *tscRpcCache; // TODO removed from here.
|
void *tscQhandle = NULL;
|
||||||
|
void *tscRpcCache= NULL; // TODO removed from here.
|
||||||
|
|
||||||
pthread_mutex_t rpcObjMutex; // mutex to protect open the rpc obj concurrently
|
pthread_mutex_t rpcObjMutex; // mutex to protect open the rpc obj concurrently
|
||||||
static pthread_once_t tscinit = PTHREAD_ONCE_INIT;
|
volatile int32_t tscInitRes = 0;
|
||||||
static pthread_mutex_t setConfMutex = PTHREAD_MUTEX_INITIALIZER;
|
|
||||||
|
|
||||||
// pthread_once can not return result code, so result code is set to a global variable.
|
static void registerRequest(SRequestObj* pRequest) {
|
||||||
static volatile int tscInitRes = 0;
|
STscObj*pTscObj = (STscObj*) taosAcquireRef(tscConnRef, pRequest->pTscObj->id);
|
||||||
|
assert(pTscObj != NULL);
|
||||||
|
|
||||||
|
// connection has been released already, abort creating request.
|
||||||
|
pRequest->self = taosAddRef(tscReqRef, pRequest);
|
||||||
|
|
||||||
|
int32_t num = atomic_add_fetch_32(&pTscObj->numOfReqs, 1);
|
||||||
|
|
||||||
|
SInstanceActivity* pActivity = &pTscObj->pAppInfo->summary;
|
||||||
|
int32_t total = atomic_add_fetch_32(&pActivity->totalRequests, 1);
|
||||||
|
int32_t currentInst = atomic_add_fetch_32(&pActivity->currentRequests, 1);
|
||||||
|
|
||||||
|
tscDebug("0x%"PRIx64" new Request from 0x%"PRIx64", current:%d, app current:%d, total:%d", pRequest->self, pRequest->pTscObj->id, num, currentInst, total);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void deregisterRequest(SRequestObj* pRequest) {
|
||||||
|
assert(pRequest != NULL);
|
||||||
|
|
||||||
|
STscObj* pTscObj = pRequest->pTscObj;
|
||||||
|
SInstanceActivity* pActivity = &pTscObj->pAppInfo->summary;
|
||||||
|
|
||||||
|
taosReleaseRef(tscReqRef, pRequest->self);
|
||||||
|
|
||||||
|
int32_t currentInst = atomic_sub_fetch_32(&pActivity->currentRequests, 1);
|
||||||
|
int32_t num = atomic_sub_fetch_32(&pTscObj->numOfReqs, 1);
|
||||||
|
|
||||||
|
tscDebug("0x%"PRIx64" free Request from 0x%"PRIx64", current:%d, app current:%d", pRequest->self, pTscObj->id, num, currentInst);
|
||||||
|
taosReleaseRef(tscConnRef, pTscObj->id);
|
||||||
|
}
|
||||||
|
|
||||||
void tscFreeRpcObj(void *param) {
|
void tscFreeRpcObj(void *param) {
|
||||||
#if 0
|
#if 0
|
||||||
|
@ -139,22 +166,6 @@ void* createTscObj(const char* user, const char* auth, const char *ip, uint32_t
|
||||||
pObj->id = taosAddRef(tscConnRef, pObj);
|
pObj->id = taosAddRef(tscConnRef, pObj);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void registerRequest(SRequestObj* pRequest) {
|
|
||||||
STscObj*pTscObj = (STscObj*) taosAcquireRef(tscConnRef, pRequest->pTscObj->id);
|
|
||||||
assert(pTscObj != NULL);
|
|
||||||
|
|
||||||
// connection has been released already, abort creating request.
|
|
||||||
pRequest->self = taosAddRef(tscReqRef, pRequest);
|
|
||||||
|
|
||||||
int32_t num = atomic_add_fetch_32(&pTscObj->numOfReqs, 1);
|
|
||||||
|
|
||||||
SInstanceActivity* pActivity = &pTscObj->pAppInfo->summary;
|
|
||||||
int32_t total = atomic_add_fetch_32(&pActivity->totalRequests, 1);
|
|
||||||
int32_t currentInst = atomic_add_fetch_32(&pActivity->currentRequests, 1);
|
|
||||||
|
|
||||||
tscDebug("0x%"PRIx64" new Request from 0x%"PRIx64", current:%d, app current:%d, total:%d", pRequest->self, pRequest->pTscObj->id, num, currentInst, total);
|
|
||||||
}
|
|
||||||
|
|
||||||
void* createRequest(STscObj* pObj, __taos_async_fn_t fp, void* param, int32_t type) {
|
void* createRequest(STscObj* pObj, __taos_async_fn_t fp, void* param, int32_t type) {
|
||||||
assert(pObj != NULL);
|
assert(pObj != NULL);
|
||||||
|
|
||||||
|
@ -176,21 +187,6 @@ void* createRequest(STscObj* pObj, __taos_async_fn_t fp, void* param, int32_t ty
|
||||||
registerRequest(pRequest);
|
registerRequest(pRequest);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void deregisterRequest(SRequestObj* pRequest) {
|
|
||||||
assert(pRequest != NULL);
|
|
||||||
|
|
||||||
STscObj* pTscObj = pRequest->pTscObj;
|
|
||||||
SInstanceActivity* pActivity = &pTscObj->pAppInfo->summary;
|
|
||||||
|
|
||||||
taosReleaseRef(tscReqRef, pRequest->self);
|
|
||||||
|
|
||||||
int32_t currentInst = atomic_sub_fetch_32(&pActivity->currentRequests, 1);
|
|
||||||
int32_t num = atomic_sub_fetch_32(&pTscObj->numOfReqs, 1);
|
|
||||||
|
|
||||||
tscDebug("0x%"PRIx64" free Request from 0x%"PRIx64", current:%d, app current:%d", pRequest->self, pTscObj->id, num, currentInst);
|
|
||||||
taosReleaseRef(tscConnRef, pTscObj->id);
|
|
||||||
}
|
|
||||||
|
|
||||||
void destroyRequest(void* p) {
|
void destroyRequest(void* p) {
|
||||||
assert(p != NULL);
|
assert(p != NULL);
|
||||||
SRequestObj* pRequest = *(SRequestObj**)p;
|
SRequestObj* pRequest = *(SRequestObj**)p;
|
||||||
|
|
Loading…
Reference in New Issue