enhance interface
This commit is contained in:
parent
fbdb47fe6a
commit
c901a21a8c
|
@ -134,10 +134,12 @@ typedef struct {
|
|||
// int16_t numOfTry; // number of try for different servers
|
||||
// int8_t oldInUse; // server EP inUse passed by app
|
||||
// int8_t redirect; // flag to indicate redirect
|
||||
int8_t connType; // connection type
|
||||
int64_t rid; // refId returned by taosAddRef
|
||||
SRpcMsg* pRsp; // for synchronous API
|
||||
tsem_t* pSem; // for synchronous API
|
||||
int8_t connType; // connection type
|
||||
int64_t rid; // refId returned by taosAddRef
|
||||
|
||||
SRpcMsg* pRsp; // for synchronous API
|
||||
tsem_t* pSem; // for synchronous API
|
||||
|
||||
char* ip;
|
||||
uint32_t port;
|
||||
// SEpSet* pSet; // for synchronous API
|
||||
|
|
|
@ -813,8 +813,8 @@ static SRpcConn *rpcSetupConnToServer(SRpcReqContext *pContext) {
|
|||
SRpcInfo *pRpc = pContext->pRpc;
|
||||
SEpSet * pEpSet = &pContext->epSet;
|
||||
|
||||
pConn =
|
||||
rpcGetConnFromCache(pRpc->pCache, pEpSet->eps[pEpSet->inUse].fqdn, pEpSet->eps[pEpSet->inUse].port, pContext->connType);
|
||||
pConn = rpcGetConnFromCache(pRpc->pCache, pEpSet->eps[pEpSet->inUse].fqdn, pEpSet->eps[pEpSet->inUse].port,
|
||||
pContext->connType);
|
||||
if (pConn == NULL || pConn->user[0] == 0) {
|
||||
pConn = rpcOpenConn(pRpc, pEpSet->eps[pEpSet->inUse].fqdn, pEpSet->eps[pEpSet->inUse].port, pContext->connType);
|
||||
}
|
||||
|
|
|
@ -63,17 +63,41 @@ void rpcFreeCont(void* cont) {
|
|||
}
|
||||
free((char*)cont - TRANS_MSG_OVERHEAD);
|
||||
}
|
||||
void* rpcReallocCont(void* ptr, int contLen) { return NULL; }
|
||||
void* rpcReallocCont(void* ptr, int contLen) {
|
||||
if (ptr == NULL) {
|
||||
return rpcMallocCont(contLen);
|
||||
}
|
||||
char* st = (char*)ptr - TRANS_MSG_OVERHEAD;
|
||||
int sz = contLen + TRANS_MSG_OVERHEAD;
|
||||
st = realloc(st, sz);
|
||||
if (st == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
return st + TRANS_MSG_OVERHEAD;
|
||||
}
|
||||
|
||||
void rpcSendRedirectRsp(void* thandle, const SEpSet* pEpSet) {
|
||||
SRpcMsg rpcMsg;
|
||||
memset(&rpcMsg, 0, sizeof(rpcMsg));
|
||||
|
||||
rpcMsg.contLen = sizeof(SEpSet);
|
||||
rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
|
||||
if (rpcMsg.pCont == NULL) return;
|
||||
|
||||
memcpy(rpcMsg.pCont, pEpSet, sizeof(SEpSet));
|
||||
|
||||
rpcMsg.code = TSDB_CODE_RPC_REDIRECT;
|
||||
rpcMsg.handle = thandle;
|
||||
|
||||
rpcSendResponse(&rpcMsg);
|
||||
}
|
||||
|
||||
void rpcSendRedirectRsp(void* pConn, const SEpSet* pEpSet) {}
|
||||
int rpcGetConnInfo(void* thandle, SRpcConnInfo* pInfo) { return -1; }
|
||||
void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pReq, SRpcMsg* pRsp) { return; }
|
||||
int rpcReportProgress(void* pConn, char* pCont, int contLen) { return -1; }
|
||||
void rpcCancelRequest(int64_t rid) { return; }
|
||||
|
||||
int32_t rpcInit(void) {
|
||||
// impl later
|
||||
return -1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
void rpcCleanup(void) {
|
||||
|
|
|
@ -123,9 +123,14 @@ static void clientHandleResp(SCliConn* conn) {
|
|||
rpcMsg.code = pHead->code;
|
||||
rpcMsg.msgType = pHead->msgType;
|
||||
rpcMsg.ahandle = pCtx->ahandle;
|
||||
|
||||
tDebug("conn %p handle resp", conn);
|
||||
(pRpc->cfp)(NULL, &rpcMsg, NULL);
|
||||
if (pCtx->pSem == NULL) {
|
||||
tDebug("conn %p handle resp", conn);
|
||||
(pRpc->cfp)(NULL, &rpcMsg, NULL);
|
||||
} else {
|
||||
tDebug("conn %p handle resp", conn);
|
||||
memcpy((char*)pCtx->pRsp, (char*)&rpcMsg, sizeof(rpcMsg));
|
||||
tsem_post(pCtx->pSem);
|
||||
}
|
||||
conn->notifyCount += 1;
|
||||
|
||||
// buf's mem alread translated to rpcMsg.pCont
|
||||
|
@ -159,14 +164,20 @@ static void clientHandleExcept(SCliConn* pConn) {
|
|||
SRpcMsg rpcMsg = {0};
|
||||
rpcMsg.ahandle = pCtx->ahandle;
|
||||
rpcMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
|
||||
// SRpcInfo* pRpc = pMsg->ctx->pRpc;
|
||||
(pCtx->pTransInst->cfp)(NULL, &rpcMsg, NULL);
|
||||
pConn->notifyCount += 1;
|
||||
if (pCtx->pSem == NULL) {
|
||||
// SRpcInfo* pRpc = pMsg->ctx->pRpc;
|
||||
(pCtx->pTransInst->cfp)(NULL, &rpcMsg, NULL);
|
||||
} else {
|
||||
memcpy((char*)(pCtx->pRsp), (char*)(&rpcMsg), sizeof(rpcMsg));
|
||||
// SRpcMsg rpcMsg
|
||||
tsem_post(pCtx->pSem);
|
||||
}
|
||||
|
||||
destroyCmsg(pMsg);
|
||||
pConn->data = NULL;
|
||||
// transDestroyConnCtx(pCtx);
|
||||
clientConnDestroy(pConn, true);
|
||||
pConn->notifyCount += 1;
|
||||
}
|
||||
|
||||
static void clientTimeoutCb(uv_timer_t* handle) {
|
||||
|
@ -463,6 +474,7 @@ static void clientAsyncCb(uv_async_t* handle) {
|
|||
|
||||
static void* clientThread(void* arg) {
|
||||
SCliThrdObj* pThrd = (SCliThrdObj*)arg;
|
||||
setThreadName("trans-client-work");
|
||||
uv_run(pThrd->loop, UV_RUN_DEFAULT);
|
||||
}
|
||||
|
||||
|
@ -568,8 +580,8 @@ void taosCloseClient(void* arg) {
|
|||
}
|
||||
void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) {
|
||||
// impl later
|
||||
char* ip = (char*)(pEpSet->fqdn[pEpSet->inUse]);
|
||||
uint32_t port = pEpSet->port[pEpSet->inUse];
|
||||
char* ip = (char*)(pEpSet->eps[pEpSet->inUse].fqdn);
|
||||
uint32_t port = pEpSet->eps[pEpSet->inUse].port;
|
||||
|
||||
SRpcInfo* pRpc = (SRpcInfo*)shandle;
|
||||
|
||||
|
@ -609,4 +621,45 @@ void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t*
|
|||
// int end = taosGetTimestampUs() - start;
|
||||
// tError("client sent to rpc, time cost: %d", (int)end);
|
||||
}
|
||||
void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pReq, SRpcMsg* pRsp) {
|
||||
char* ip = (char*)(pEpSet->eps[pEpSet->inUse].fqdn);
|
||||
uint32_t port = pEpSet->eps[pEpSet->inUse].port;
|
||||
|
||||
SRpcInfo* pRpc = (SRpcInfo*)shandle;
|
||||
|
||||
STransConnCtx* pCtx = calloc(1, sizeof(STransConnCtx));
|
||||
pCtx->pTransInst = (SRpcInfo*)shandle;
|
||||
pCtx->ahandle = pReq->ahandle;
|
||||
pCtx->msgType = pReq->msgType;
|
||||
pCtx->ip = strdup(ip);
|
||||
pCtx->port = port;
|
||||
pCtx->pSem = calloc(1, sizeof(tsem_t));
|
||||
pCtx->pRsp = pRsp;
|
||||
tsem_init(pCtx->pSem, 0, 0);
|
||||
|
||||
int64_t index = pRpc->index;
|
||||
if (pRpc->index++ >= pRpc->numOfThreads) {
|
||||
pRpc->index = 0;
|
||||
}
|
||||
SCliMsg* cliMsg = malloc(sizeof(SCliMsg));
|
||||
cliMsg->ctx = pCtx;
|
||||
cliMsg->msg = *pReq;
|
||||
cliMsg->st = taosGetTimestampUs();
|
||||
|
||||
SCliThrdObj* thrd = ((SClientObj*)pRpc->tcphandle)->pThreadObj[index % pRpc->numOfThreads];
|
||||
|
||||
// pthread_mutex_lock(&thrd->msgMtx);
|
||||
// QUEUE_PUSH(&thrd->msg, &cliMsg->q);
|
||||
// pthread_mutex_unlock(&thrd->msgMtx);
|
||||
|
||||
// int start = taosGetTimestampUs();
|
||||
transSendAsync(thrd->asyncPool, &(cliMsg->q));
|
||||
|
||||
tsem_t* pSem = pCtx->pSem;
|
||||
tsem_wait(pSem);
|
||||
tsem_destroy(pSem);
|
||||
free(pSem);
|
||||
|
||||
return;
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -33,6 +33,8 @@ typedef struct SSrvConn {
|
|||
void* hostThrd;
|
||||
void* pSrvMsg;
|
||||
|
||||
struct sockaddr peername;
|
||||
|
||||
// SRpcMsg sendMsg;
|
||||
// del later
|
||||
char secured;
|
||||
|
@ -487,7 +489,13 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
|
|||
uv_os_fd_t fd;
|
||||
uv_fileno((const uv_handle_t*)pConn->pTcp, &fd);
|
||||
tDebug("conn %p created, fd: %d", pConn, fd);
|
||||
uv_read_start((uv_stream_t*)(pConn->pTcp), uvAllocReadBufferCb, uvOnReadCb);
|
||||
int namelen = sizeof(pConn->peername);
|
||||
if (0 != uv_tcp_getpeername(pConn->pTcp, &pConn->peername, &namelen)) {
|
||||
tError("failed to get peer name");
|
||||
destroyConn(pConn, true);
|
||||
} else {
|
||||
uv_read_start((uv_stream_t*)(pConn->pTcp), uvAllocReadBufferCb, uvOnReadCb);
|
||||
}
|
||||
} else {
|
||||
tDebug("failed to create new connection");
|
||||
destroyConn(pConn, true);
|
||||
|
@ -496,6 +504,7 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
|
|||
|
||||
void* acceptThread(void* arg) {
|
||||
// opt
|
||||
setThreadName("trans-accept");
|
||||
SServerObj* srv = (SServerObj*)arg;
|
||||
uv_run(srv->loop, UV_RUN_DEFAULT);
|
||||
}
|
||||
|
@ -548,6 +557,7 @@ static bool addHandleToAcceptloop(void* arg) {
|
|||
return true;
|
||||
}
|
||||
void* workerThread(void* arg) {
|
||||
setThreadName("trans-worker");
|
||||
SWorkThrdObj* pThrd = (SWorkThrdObj*)arg;
|
||||
uv_run(pThrd->loop, UV_RUN_DEFAULT);
|
||||
}
|
||||
|
@ -723,4 +733,16 @@ void rpcSendResponse(const SRpcMsg* pMsg) {
|
|||
// uv_async_send(pThrd->workerAsync);
|
||||
}
|
||||
|
||||
int rpcGetConnInfo(void* thandle, SRpcConnInfo* pInfo) {
|
||||
SSrvConn* pConn = thandle;
|
||||
struct sockaddr* pPeerName = &pConn->peername;
|
||||
|
||||
struct sockaddr_in caddr = *(struct sockaddr_in*)(pPeerName);
|
||||
pInfo->clientIp = (uint32_t)(caddr.sin_addr.s_addr);
|
||||
pInfo->clientPort = ntohs(caddr.sin_port);
|
||||
|
||||
tstrncpy(pInfo->user, pConn->user, sizeof(pInfo->user));
|
||||
return 0;
|
||||
}
|
||||
|
||||
#endif
|
||||
|
|
|
@ -2,6 +2,7 @@ add_executable(transportTest "")
|
|||
add_executable(client "")
|
||||
add_executable(server "")
|
||||
add_executable(transUT "")
|
||||
add_executable(syncClient "")
|
||||
|
||||
target_sources(transUT
|
||||
PRIVATE
|
||||
|
@ -20,6 +21,10 @@ target_sources (server
|
|||
PRIVATE
|
||||
"rserver.c"
|
||||
)
|
||||
target_sources (syncClient
|
||||
PRIVATE
|
||||
"syncClient.c"
|
||||
)
|
||||
|
||||
target_include_directories(transportTest
|
||||
PUBLIC
|
||||
|
@ -67,7 +72,6 @@ target_include_directories(transUT
|
|||
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
|
||||
)
|
||||
|
||||
|
||||
target_link_libraries (server
|
||||
os
|
||||
util
|
||||
|
@ -75,4 +79,17 @@ target_link_libraries (server
|
|||
gtest_main
|
||||
transport
|
||||
)
|
||||
target_include_directories(syncClient
|
||||
PUBLIC
|
||||
"${CMAKE_SOURCE_DIR}/include/libs/transport"
|
||||
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
|
||||
)
|
||||
target_link_libraries (syncClient
|
||||
os
|
||||
util
|
||||
common
|
||||
gtest_main
|
||||
transport
|
||||
)
|
||||
|
||||
|
||||
|
|
|
@ -33,7 +33,6 @@ typedef struct {
|
|||
pthread_t thread;
|
||||
void * pRpc;
|
||||
} SInfo;
|
||||
|
||||
static void processResponse(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||
SInfo *pInfo = (SInfo *)pMsg->ahandle;
|
||||
tDebug("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, pMsg->msgType, pMsg->contLen,
|
||||
|
|
|
@ -0,0 +1,220 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
#include <sys/time.h>
|
||||
|
||||
#include <tep.h>
|
||||
#include "os.h"
|
||||
#include "rpcLog.h"
|
||||
#include "taoserror.h"
|
||||
#include "tglobal.h"
|
||||
#include "trpc.h"
|
||||
#include "tutil.h"
|
||||
|
||||
typedef struct {
|
||||
int index;
|
||||
SEpSet epSet;
|
||||
int num;
|
||||
int numOfReqs;
|
||||
int msgSize;
|
||||
tsem_t rspSem;
|
||||
tsem_t * pOverSem;
|
||||
pthread_t thread;
|
||||
void * pRpc;
|
||||
} SInfo;
|
||||
static void processResponse(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||
SInfo *pInfo = (SInfo *)pMsg->ahandle;
|
||||
tDebug("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, pMsg->msgType, pMsg->contLen,
|
||||
pMsg->code);
|
||||
|
||||
if (pEpSet) pInfo->epSet = *pEpSet;
|
||||
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
// tsem_post(&pInfo->rspSem);
|
||||
tsem_post(&pInfo->rspSem);
|
||||
}
|
||||
|
||||
static int tcount = 0;
|
||||
|
||||
static void *sendRequest(void *param) {
|
||||
SInfo * pInfo = (SInfo *)param;
|
||||
SRpcMsg rpcMsg = {0};
|
||||
|
||||
tDebug("thread:%d, start to send request", pInfo->index);
|
||||
|
||||
tDebug("thread:%d, reqs: %d", pInfo->index, pInfo->numOfReqs);
|
||||
int u100 = 0;
|
||||
int u500 = 0;
|
||||
int u1000 = 0;
|
||||
int u10000 = 0;
|
||||
SRpcMsg respMsg = {0};
|
||||
while (pInfo->numOfReqs == 0 || pInfo->num < pInfo->numOfReqs) {
|
||||
pInfo->num++;
|
||||
rpcMsg.pCont = rpcMallocCont(pInfo->msgSize);
|
||||
rpcMsg.contLen = pInfo->msgSize;
|
||||
rpcMsg.ahandle = pInfo;
|
||||
rpcMsg.msgType = 1;
|
||||
// tDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num);
|
||||
int64_t start = taosGetTimestampUs();
|
||||
rpcSendRecv(pInfo->pRpc, &pInfo->epSet, &rpcMsg, &respMsg);
|
||||
// rpcSendRequest(pInfo->pRpc, &pInfo->epSet, &rpcMsg, NULL);
|
||||
if (pInfo->num % 20000 == 0) tInfo("thread:%d, %d requests have been sent", pInfo->index, pInfo->num);
|
||||
// tsem_wait(&pInfo->rspSem);
|
||||
// wtsem_wait(&pInfo->rspSem);
|
||||
int64_t end = taosGetTimestampUs() - start;
|
||||
if (end <= 100) {
|
||||
u100++;
|
||||
} else if (end > 100 && end <= 500) {
|
||||
u500++;
|
||||
} else if (end > 500 && end < 1000) {
|
||||
u1000++;
|
||||
} else {
|
||||
u10000++;
|
||||
}
|
||||
|
||||
tDebug("recv response succefully");
|
||||
|
||||
// usleep(100000000);
|
||||
}
|
||||
|
||||
tError("send and recv sum: %d, %d, %d, %d", u100, u500, u1000, u10000);
|
||||
tDebug("thread:%d, it is over", pInfo->index);
|
||||
tcount++;
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
int main(int argc, char *argv[]) {
|
||||
SRpcInit rpcInit;
|
||||
SEpSet epSet = {0};
|
||||
int msgSize = 128;
|
||||
int numOfReqs = 0;
|
||||
int appThreads = 1;
|
||||
char serverIp[40] = "127.0.0.1";
|
||||
char secret[20] = "mypassword";
|
||||
struct timeval systemTime;
|
||||
int64_t startTime, endTime;
|
||||
pthread_attr_t thattr;
|
||||
|
||||
// server info
|
||||
epSet.inUse = 0;
|
||||
addEpIntoEpSet(&epSet, serverIp, 7000);
|
||||
addEpIntoEpSet(&epSet, "192.168.0.1", 7000);
|
||||
|
||||
// client info
|
||||
memset(&rpcInit, 0, sizeof(rpcInit));
|
||||
rpcInit.localPort = 0;
|
||||
rpcInit.label = "APP";
|
||||
rpcInit.numOfThreads = 1;
|
||||
rpcInit.cfp = processResponse;
|
||||
rpcInit.sessions = 100;
|
||||
rpcInit.idleTime = 100;
|
||||
rpcInit.user = "michael";
|
||||
rpcInit.secret = secret;
|
||||
rpcInit.ckey = "key";
|
||||
rpcInit.spi = 1;
|
||||
rpcInit.connType = TAOS_CONN_CLIENT;
|
||||
|
||||
for (int i = 1; i < argc; ++i) {
|
||||
if (strcmp(argv[i], "-p") == 0 && i < argc - 1) {
|
||||
epSet.eps[0].port = atoi(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-i") == 0 && i < argc - 1) {
|
||||
tstrncpy(epSet.eps[0].fqdn, argv[++i], sizeof(epSet.eps[0].fqdn));
|
||||
} else if (strcmp(argv[i], "-t") == 0 && i < argc - 1) {
|
||||
rpcInit.numOfThreads = atoi(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-m") == 0 && i < argc - 1) {
|
||||
msgSize = atoi(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-s") == 0 && i < argc - 1) {
|
||||
rpcInit.sessions = atoi(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-n") == 0 && i < argc - 1) {
|
||||
numOfReqs = atoi(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-a") == 0 && i < argc - 1) {
|
||||
appThreads = atoi(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-o") == 0 && i < argc - 1) {
|
||||
tsCompressMsgSize = atoi(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-u") == 0 && i < argc - 1) {
|
||||
rpcInit.user = argv[++i];
|
||||
} else if (strcmp(argv[i], "-k") == 0 && i < argc - 1) {
|
||||
rpcInit.secret = argv[++i];
|
||||
} else if (strcmp(argv[i], "-spi") == 0 && i < argc - 1) {
|
||||
rpcInit.spi = atoi(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-d") == 0 && i < argc - 1) {
|
||||
rpcDebugFlag = atoi(argv[++i]);
|
||||
} else {
|
||||
printf("\nusage: %s [options] \n", argv[0]);
|
||||
printf(" [-i ip]: first server IP address, default is:%s\n", serverIp);
|
||||
printf(" [-p port]: server port number, default is:%d\n", epSet.eps[0].port);
|
||||
printf(" [-t threads]: number of rpc threads, default is:%d\n", rpcInit.numOfThreads);
|
||||
printf(" [-s sessions]: number of rpc sessions, default is:%d\n", rpcInit.sessions);
|
||||
printf(" [-m msgSize]: message body size, default is:%d\n", msgSize);
|
||||
printf(" [-a threads]: number of app threads, default is:%d\n", appThreads);
|
||||
printf(" [-n requests]: number of requests per thread, default is:%d\n", numOfReqs);
|
||||
printf(" [-o compSize]: compression message size, default is:%d\n", tsCompressMsgSize);
|
||||
printf(" [-u user]: user name for the connection, default is:%s\n", rpcInit.user);
|
||||
printf(" [-k secret]: password for the connection, default is:%s\n", rpcInit.secret);
|
||||
printf(" [-spi SPI]: security parameter index, default is:%d\n", rpcInit.spi);
|
||||
printf(" [-d debugFlag]: debug flag, default:%d\n", rpcDebugFlag);
|
||||
printf(" [-h help]: print out this help\n\n");
|
||||
exit(0);
|
||||
}
|
||||
}
|
||||
|
||||
taosInitLog("client.log", 100000, 10);
|
||||
|
||||
void *pRpc = rpcOpen(&rpcInit);
|
||||
if (pRpc == NULL) {
|
||||
tError("failed to initialize RPC");
|
||||
return -1;
|
||||
}
|
||||
|
||||
tInfo("client is initialized");
|
||||
tInfo("threads:%d msgSize:%d requests:%d", appThreads, msgSize, numOfReqs);
|
||||
|
||||
gettimeofday(&systemTime, NULL);
|
||||
startTime = systemTime.tv_sec * 1000000 + systemTime.tv_usec;
|
||||
|
||||
SInfo *pInfo = (SInfo *)calloc(1, sizeof(SInfo) * appThreads);
|
||||
|
||||
pthread_attr_init(&thattr);
|
||||
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
|
||||
|
||||
for (int i = 0; i < appThreads; ++i) {
|
||||
pInfo->index = i;
|
||||
pInfo->epSet = epSet;
|
||||
pInfo->numOfReqs = numOfReqs;
|
||||
pInfo->msgSize = msgSize;
|
||||
tsem_init(&pInfo->rspSem, 0, 0);
|
||||
pInfo->pRpc = pRpc;
|
||||
pthread_create(&pInfo->thread, &thattr, sendRequest, pInfo);
|
||||
pInfo++;
|
||||
}
|
||||
|
||||
do {
|
||||
usleep(1);
|
||||
} while (tcount < appThreads);
|
||||
|
||||
gettimeofday(&systemTime, NULL);
|
||||
endTime = systemTime.tv_sec * 1000000 + systemTime.tv_usec;
|
||||
float usedTime = (endTime - startTime) / 1000.0f; // mseconds
|
||||
|
||||
tInfo("it takes %.3f mseconds to send %d requests to server", usedTime, numOfReqs * appThreads);
|
||||
tInfo("Performance: %.3f requests per second, msgSize:%d bytes", 1000.0 * numOfReqs * appThreads / usedTime, msgSize);
|
||||
|
||||
int ch = getchar();
|
||||
UNUSED(ch);
|
||||
|
||||
taosCloseLog();
|
||||
|
||||
return 0;
|
||||
}
|
|
@ -15,6 +15,7 @@
|
|||
#include <gtest/gtest.h>
|
||||
#include <cstdio>
|
||||
#include <cstring>
|
||||
#include "tep.h"
|
||||
#include "trpc.h"
|
||||
using namespace std;
|
||||
|
||||
|
@ -50,6 +51,25 @@ class TransObj {
|
|||
trans = rpcOpen(&rpcInit);
|
||||
return trans != NULL ? true : false;
|
||||
}
|
||||
|
||||
bool sendAndRecv() {
|
||||
SEpSet epSet = {0};
|
||||
epSet.inUse = 0;
|
||||
addEpIntoEpSet(&epSet, "192.168.1.1", 7000);
|
||||
addEpIntoEpSet(&epSet, "192.168.0.1", 7000);
|
||||
|
||||
if (trans == NULL) {
|
||||
return false;
|
||||
}
|
||||
SRpcMsg rpcMsg = {0}, reqMsg = {0};
|
||||
reqMsg.pCont = rpcMallocCont(10);
|
||||
reqMsg.contLen = 10;
|
||||
reqMsg.ahandle = NULL;
|
||||
rpcSendRecv(trans, &epSet, &reqMsg, &rpcMsg);
|
||||
int code = rpcMsg.code;
|
||||
std::cout << tstrerror(code) << std::endl;
|
||||
return true;
|
||||
}
|
||||
bool stop() {
|
||||
rpcClose(trans);
|
||||
trans = NULL;
|
||||
|
@ -75,6 +95,7 @@ class TransEnv : public ::testing::Test {
|
|||
};
|
||||
TEST_F(TransEnv, test_start_stop) {
|
||||
assert(tr->startCli());
|
||||
assert(tr->sendAndRecv());
|
||||
assert(tr->stop());
|
||||
|
||||
assert(tr->startSrv());
|
||||
|
|
Loading…
Reference in New Issue