Merge pull request #10212 from taosdata/feature/trans_impl
fix push crashed
This commit is contained in:
commit
ef2f9a3e41
|
@ -29,6 +29,8 @@ extern "C" {
|
||||||
|
|
||||||
extern int tsRpcHeadSize;
|
extern int tsRpcHeadSize;
|
||||||
|
|
||||||
|
typedef struct SRpcPush SRpcPush;
|
||||||
|
|
||||||
typedef struct SRpcConnInfo {
|
typedef struct SRpcConnInfo {
|
||||||
uint32_t clientIp;
|
uint32_t clientIp;
|
||||||
uint16_t clientPort;
|
uint16_t clientPort;
|
||||||
|
@ -43,8 +45,17 @@ typedef struct SRpcMsg {
|
||||||
int32_t code;
|
int32_t code;
|
||||||
void * handle; // rpc handle returned to app
|
void * handle; // rpc handle returned to app
|
||||||
void * ahandle; // app handle set by client
|
void * ahandle; // app handle set by client
|
||||||
|
int persist; // keep handle or not, default 0
|
||||||
|
|
||||||
|
SRpcPush *push;
|
||||||
|
|
||||||
} SRpcMsg;
|
} SRpcMsg;
|
||||||
|
|
||||||
|
typedef struct SRpcPush {
|
||||||
|
void *arg;
|
||||||
|
int (*callback)(void *arg, SRpcMsg *rpcMsg);
|
||||||
|
} SRpcPush;
|
||||||
|
|
||||||
typedef struct SRpcInit {
|
typedef struct SRpcInit {
|
||||||
uint16_t localPort; // local port
|
uint16_t localPort; // local port
|
||||||
char * label; // for debug purpose
|
char * label; // for debug purpose
|
||||||
|
@ -83,6 +94,7 @@ int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo);
|
||||||
void rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp);
|
void rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp);
|
||||||
int rpcReportProgress(void *pConn, char *pCont, int contLen);
|
int rpcReportProgress(void *pConn, char *pCont, int contLen);
|
||||||
void rpcCancelRequest(int64_t rid);
|
void rpcCancelRequest(int64_t rid);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -27,12 +27,16 @@ typedef struct SCliConn {
|
||||||
SConnBuffer readBuf;
|
SConnBuffer readBuf;
|
||||||
void* data;
|
void* data;
|
||||||
queue conn;
|
queue conn;
|
||||||
char spi;
|
|
||||||
char secured;
|
|
||||||
uint64_t expireTime;
|
uint64_t expireTime;
|
||||||
int8_t notifyCount; // timers already notify to client
|
int8_t notifyCount; // timers already notify to client
|
||||||
int32_t ref;
|
|
||||||
|
|
||||||
|
SRpcPush* push;
|
||||||
|
int persist; //
|
||||||
|
// spi configure
|
||||||
|
char spi;
|
||||||
|
char secured;
|
||||||
|
int32_t ref;
|
||||||
|
// debug and log info
|
||||||
struct sockaddr_in addr;
|
struct sockaddr_in addr;
|
||||||
} SCliConn;
|
} SCliConn;
|
||||||
|
|
||||||
|
@ -128,6 +132,10 @@ static void clientHandleResp(SCliConn* conn) {
|
||||||
|
|
||||||
tDebug("client conn %p %s received from %s:%d", conn, TMSG_INFO(pHead->msgType), inet_ntoa(conn->addr.sin_addr),
|
tDebug("client conn %p %s received from %s:%d", conn, TMSG_INFO(pHead->msgType), inet_ntoa(conn->addr.sin_addr),
|
||||||
ntohs(conn->addr.sin_port));
|
ntohs(conn->addr.sin_port));
|
||||||
|
|
||||||
|
if (conn->push != NULL && conn->notifyCount != 0) {
|
||||||
|
(*conn->push->callback)(conn->push->arg, &rpcMsg);
|
||||||
|
} else {
|
||||||
if (pCtx->pSem == NULL) {
|
if (pCtx->pSem == NULL) {
|
||||||
tTrace("client conn(sync) %p handle resp", conn);
|
tTrace("client conn(sync) %p handle resp", conn);
|
||||||
(pRpc->cfp)(pRpc->parent, &rpcMsg, NULL);
|
(pRpc->cfp)(pRpc->parent, &rpcMsg, NULL);
|
||||||
|
@ -136,6 +144,7 @@ static void clientHandleResp(SCliConn* conn) {
|
||||||
memcpy((char*)pCtx->pRsp, (char*)&rpcMsg, sizeof(rpcMsg));
|
memcpy((char*)pCtx->pRsp, (char*)&rpcMsg, sizeof(rpcMsg));
|
||||||
tsem_post(pCtx->pSem);
|
tsem_post(pCtx->pSem);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
conn->notifyCount += 1;
|
conn->notifyCount += 1;
|
||||||
|
|
||||||
// buf's mem alread translated to rpcMsg.pCont
|
// buf's mem alread translated to rpcMsg.pCont
|
||||||
|
@ -144,7 +153,10 @@ static void clientHandleResp(SCliConn* conn) {
|
||||||
uv_read_start((uv_stream_t*)conn->stream, clientAllocBufferCb, clientReadCb);
|
uv_read_start((uv_stream_t*)conn->stream, clientAllocBufferCb, clientReadCb);
|
||||||
|
|
||||||
SCliThrdObj* pThrd = conn->hostThrd;
|
SCliThrdObj* pThrd = conn->hostThrd;
|
||||||
|
// user owns conn->persist = 1
|
||||||
|
if (conn->push != NULL) {
|
||||||
addConnToPool(pThrd->pool, pCtx->ip, pCtx->port, conn);
|
addConnToPool(pThrd->pool, pCtx->ip, pCtx->port, conn);
|
||||||
|
}
|
||||||
|
|
||||||
destroyCmsg(pMsg);
|
destroyCmsg(pMsg);
|
||||||
conn->data = NULL;
|
conn->data = NULL;
|
||||||
|
@ -154,7 +166,7 @@ static void clientHandleResp(SCliConn* conn) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
static void clientHandleExcept(SCliConn* pConn) {
|
static void clientHandleExcept(SCliConn* pConn) {
|
||||||
if (pConn->data == NULL) {
|
if (pConn->data == NULL && pConn->push == NULL) {
|
||||||
// handle conn except in conn pool
|
// handle conn except in conn pool
|
||||||
clientConnDestroy(pConn, true);
|
clientConnDestroy(pConn, true);
|
||||||
return;
|
return;
|
||||||
|
@ -162,21 +174,26 @@ static void clientHandleExcept(SCliConn* pConn) {
|
||||||
tTrace("client conn %p start to destroy", pConn);
|
tTrace("client conn %p start to destroy", pConn);
|
||||||
SCliMsg* pMsg = pConn->data;
|
SCliMsg* pMsg = pConn->data;
|
||||||
|
|
||||||
destroyUserdata(&pMsg->msg);
|
|
||||||
|
|
||||||
STransConnCtx* pCtx = pMsg->ctx;
|
STransConnCtx* pCtx = pMsg->ctx;
|
||||||
|
|
||||||
SRpcMsg rpcMsg = {0};
|
SRpcMsg rpcMsg = {0};
|
||||||
rpcMsg.ahandle = pCtx->ahandle;
|
rpcMsg.ahandle = pCtx->ahandle;
|
||||||
rpcMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
|
rpcMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
|
||||||
|
|
||||||
|
if (pConn->push != NULL && pConn->notifyCount != 0) {
|
||||||
|
(*pConn->push->callback)(pConn->push->arg, &rpcMsg);
|
||||||
|
} else {
|
||||||
if (pCtx->pSem == NULL) {
|
if (pCtx->pSem == NULL) {
|
||||||
// SRpcInfo* pRpc = pMsg->ctx->pRpc;
|
|
||||||
(pCtx->pTransInst->cfp)(pCtx->pTransInst->parent, &rpcMsg, NULL);
|
(pCtx->pTransInst->cfp)(pCtx->pTransInst->parent, &rpcMsg, NULL);
|
||||||
} else {
|
} else {
|
||||||
memcpy((char*)(pCtx->pRsp), (char*)(&rpcMsg), sizeof(rpcMsg));
|
memcpy((char*)(pCtx->pRsp), (char*)(&rpcMsg), sizeof(rpcMsg));
|
||||||
// SRpcMsg rpcMsg
|
// SRpcMsg rpcMsg
|
||||||
tsem_post(pCtx->pSem);
|
tsem_post(pCtx->pSem);
|
||||||
}
|
}
|
||||||
|
if (pConn->push != NULL) {
|
||||||
|
(*pConn->push->callback)(pConn->push->arg, &rpcMsg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
destroyCmsg(pMsg);
|
destroyCmsg(pMsg);
|
||||||
pConn->data = NULL;
|
pConn->data = NULL;
|
||||||
|
@ -411,6 +428,10 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
|
||||||
tTrace("client msg tran time cost: %" PRIu64 "", el);
|
tTrace("client msg tran time cost: %" PRIu64 "", el);
|
||||||
et = taosGetTimestampUs();
|
et = taosGetTimestampUs();
|
||||||
|
|
||||||
|
// if (pMsg->msg.handle != NULL) {
|
||||||
|
// // handle
|
||||||
|
//}
|
||||||
|
|
||||||
STransConnCtx* pCtx = pMsg->ctx;
|
STransConnCtx* pCtx = pMsg->ctx;
|
||||||
SCliConn* conn = getConnFromPool(pThrd->pool, pCtx->ip, pCtx->port);
|
SCliConn* conn = getConnFromPool(pThrd->pool, pCtx->ip, pCtx->port);
|
||||||
if (conn != NULL) {
|
if (conn != NULL) {
|
||||||
|
@ -426,6 +447,8 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
|
||||||
}
|
}
|
||||||
clientWrite(conn);
|
clientWrite(conn);
|
||||||
|
|
||||||
|
conn->push = pMsg->msg.push;
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
SCliConn* conn = calloc(1, sizeof(SCliConn));
|
SCliConn* conn = calloc(1, sizeof(SCliConn));
|
||||||
conn->ref++;
|
conn->ref++;
|
||||||
|
@ -444,6 +467,8 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
|
||||||
conn->data = pMsg;
|
conn->data = pMsg;
|
||||||
conn->hostThrd = pThrd;
|
conn->hostThrd = pThrd;
|
||||||
|
|
||||||
|
conn->push = pMsg->msg.push;
|
||||||
|
|
||||||
struct sockaddr_in addr;
|
struct sockaddr_in addr;
|
||||||
uv_ip4_addr(pMsg->ctx->ip, pMsg->ctx->port, &addr);
|
uv_ip4_addr(pMsg->ctx->ip, pMsg->ctx->port, &addr);
|
||||||
// handle error in callback if fail to connect
|
// handle error in callback if fail to connect
|
||||||
|
|
|
@ -0,0 +1,239 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
#include <sys/time.h>
|
||||||
|
|
||||||
|
#include <tep.h>
|
||||||
|
#include "os.h"
|
||||||
|
#include "rpcLog.h"
|
||||||
|
#include "taoserror.h"
|
||||||
|
#include "tglobal.h"
|
||||||
|
#include "trpc.h"
|
||||||
|
#include "tutil.h"
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int index;
|
||||||
|
SEpSet epSet;
|
||||||
|
int num;
|
||||||
|
int numOfReqs;
|
||||||
|
int msgSize;
|
||||||
|
tsem_t rspSem;
|
||||||
|
tsem_t * pOverSem;
|
||||||
|
pthread_t thread;
|
||||||
|
void * pRpc;
|
||||||
|
} SInfo;
|
||||||
|
static void processResponse(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||||
|
SInfo *pInfo = (SInfo *)pMsg->ahandle;
|
||||||
|
tDebug("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, pMsg->msgType, pMsg->contLen,
|
||||||
|
pMsg->code);
|
||||||
|
|
||||||
|
if (pEpSet) pInfo->epSet = *pEpSet;
|
||||||
|
|
||||||
|
rpcFreeCont(pMsg->pCont);
|
||||||
|
// tsem_post(&pInfo->rspSem);
|
||||||
|
tsem_post(&pInfo->rspSem);
|
||||||
|
}
|
||||||
|
|
||||||
|
static int tcount = 0;
|
||||||
|
|
||||||
|
typedef struct SPushArg {
|
||||||
|
tsem_t sem;
|
||||||
|
|
||||||
|
} SPushArg;
|
||||||
|
|
||||||
|
int pushCallback(void *arg, SRpcMsg *msg) {
|
||||||
|
SPushArg *push = arg;
|
||||||
|
tsem_post(&push->sem);
|
||||||
|
}
|
||||||
|
SRpcPush *createPushArg() {
|
||||||
|
SRpcPush *push = calloc(1, sizeof(SRpcPush));
|
||||||
|
push->arg = calloc(1, sizeof(SPushArg));
|
||||||
|
tsem_init(&push->arg->sem, 0, 0);
|
||||||
|
push->callback = pushCallback;
|
||||||
|
return push;
|
||||||
|
}
|
||||||
|
static void *sendRequest(void *param) {
|
||||||
|
SInfo * pInfo = (SInfo *)param;
|
||||||
|
SRpcMsg rpcMsg = {0};
|
||||||
|
|
||||||
|
tDebug("thread:%d, start to send request", pInfo->index);
|
||||||
|
|
||||||
|
tDebug("thread:%d, reqs: %d", pInfo->index, pInfo->numOfReqs);
|
||||||
|
int u100 = 0;
|
||||||
|
int u500 = 0;
|
||||||
|
int u1000 = 0;
|
||||||
|
int u10000 = 0;
|
||||||
|
|
||||||
|
while (pInfo->numOfReqs == 0 || pInfo->num < pInfo->numOfReqs) {
|
||||||
|
SRpcPush *push = createPushArg();
|
||||||
|
pInfo->num++;
|
||||||
|
rpcMsg.pCont = rpcMallocCont(pInfo->msgSize);
|
||||||
|
rpcMsg.contLen = pInfo->msgSize;
|
||||||
|
rpcMsg.ahandle = pInfo;
|
||||||
|
rpcMsg.msgType = 1;
|
||||||
|
rpcMsg.push = push;
|
||||||
|
;
|
||||||
|
// tDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num);
|
||||||
|
int64_t start = taosGetTimestampUs();
|
||||||
|
rpcSendRequest(pInfo->pRpc, &pInfo->epSet, &rpcMsg, NULL);
|
||||||
|
if (pInfo->num % 20000 == 0) tInfo("thread:%d, %d requests have been sent", pInfo->index, pInfo->num);
|
||||||
|
tsem_wait(&pInfo->rspSem);
|
||||||
|
// tsem_wait(&pInfo->rspSem);
|
||||||
|
tsem_wait(&push->sem);
|
||||||
|
int64_t end = taosGetTimestampUs() - start;
|
||||||
|
if (end <= 100) {
|
||||||
|
u100++;
|
||||||
|
} else if (end > 100 && end <= 500) {
|
||||||
|
u500++;
|
||||||
|
} else if (end > 500 && end < 1000) {
|
||||||
|
u1000++;
|
||||||
|
} else {
|
||||||
|
u10000++;
|
||||||
|
}
|
||||||
|
|
||||||
|
tDebug("recv response succefully");
|
||||||
|
|
||||||
|
// usleep(100000000);
|
||||||
|
}
|
||||||
|
|
||||||
|
tError("send and recv sum: %d, %d, %d, %d", u100, u500, u1000, u10000);
|
||||||
|
tDebug("thread:%d, it is over", pInfo->index);
|
||||||
|
tcount++;
|
||||||
|
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
int main(int argc, char *argv[]) {
|
||||||
|
SRpcInit rpcInit;
|
||||||
|
SEpSet epSet;
|
||||||
|
int msgSize = 128;
|
||||||
|
int numOfReqs = 0;
|
||||||
|
int appThreads = 1;
|
||||||
|
char serverIp[40] = "127.0.0.1";
|
||||||
|
char secret[20] = "mypassword";
|
||||||
|
struct timeval systemTime;
|
||||||
|
int64_t startTime, endTime;
|
||||||
|
pthread_attr_t thattr;
|
||||||
|
|
||||||
|
// server info
|
||||||
|
epSet.inUse = 0;
|
||||||
|
addEpIntoEpSet(&epSet, serverIp, 7000);
|
||||||
|
addEpIntoEpSet(&epSet, "192.168.0.1", 7000);
|
||||||
|
|
||||||
|
// client info
|
||||||
|
memset(&rpcInit, 0, sizeof(rpcInit));
|
||||||
|
rpcInit.localPort = 0;
|
||||||
|
rpcInit.label = "APP";
|
||||||
|
rpcInit.numOfThreads = 1;
|
||||||
|
rpcInit.cfp = processResponse;
|
||||||
|
rpcInit.sessions = 100;
|
||||||
|
rpcInit.idleTime = 100;
|
||||||
|
rpcInit.user = "michael";
|
||||||
|
rpcInit.secret = secret;
|
||||||
|
rpcInit.ckey = "key";
|
||||||
|
rpcInit.spi = 1;
|
||||||
|
rpcInit.connType = TAOS_CONN_CLIENT;
|
||||||
|
|
||||||
|
for (int i = 1; i < argc; ++i) {
|
||||||
|
if (strcmp(argv[i], "-p") == 0 && i < argc - 1) {
|
||||||
|
epSet.eps[0].port = atoi(argv[++i]);
|
||||||
|
} else if (strcmp(argv[i], "-i") == 0 && i < argc - 1) {
|
||||||
|
tstrncpy(epSet.eps[0].fqdn, argv[++i], sizeof(epSet.eps[0].fqdn));
|
||||||
|
} else if (strcmp(argv[i], "-t") == 0 && i < argc - 1) {
|
||||||
|
rpcInit.numOfThreads = atoi(argv[++i]);
|
||||||
|
} else if (strcmp(argv[i], "-m") == 0 && i < argc - 1) {
|
||||||
|
msgSize = atoi(argv[++i]);
|
||||||
|
} else if (strcmp(argv[i], "-s") == 0 && i < argc - 1) {
|
||||||
|
rpcInit.sessions = atoi(argv[++i]);
|
||||||
|
} else if (strcmp(argv[i], "-n") == 0 && i < argc - 1) {
|
||||||
|
numOfReqs = atoi(argv[++i]);
|
||||||
|
} else if (strcmp(argv[i], "-a") == 0 && i < argc - 1) {
|
||||||
|
appThreads = atoi(argv[++i]);
|
||||||
|
} else if (strcmp(argv[i], "-o") == 0 && i < argc - 1) {
|
||||||
|
tsCompressMsgSize = atoi(argv[++i]);
|
||||||
|
} else if (strcmp(argv[i], "-u") == 0 && i < argc - 1) {
|
||||||
|
rpcInit.user = argv[++i];
|
||||||
|
} else if (strcmp(argv[i], "-k") == 0 && i < argc - 1) {
|
||||||
|
rpcInit.secret = argv[++i];
|
||||||
|
} else if (strcmp(argv[i], "-spi") == 0 && i < argc - 1) {
|
||||||
|
rpcInit.spi = atoi(argv[++i]);
|
||||||
|
} else if (strcmp(argv[i], "-d") == 0 && i < argc - 1) {
|
||||||
|
rpcDebugFlag = atoi(argv[++i]);
|
||||||
|
} else {
|
||||||
|
printf("\nusage: %s [options] \n", argv[0]);
|
||||||
|
printf(" [-i ip]: first server IP address, default is:%s\n", serverIp);
|
||||||
|
printf(" [-p port]: server port number, default is:%d\n", epSet.eps[0].port);
|
||||||
|
printf(" [-t threads]: number of rpc threads, default is:%d\n", rpcInit.numOfThreads);
|
||||||
|
printf(" [-s sessions]: number of rpc sessions, default is:%d\n", rpcInit.sessions);
|
||||||
|
printf(" [-m msgSize]: message body size, default is:%d\n", msgSize);
|
||||||
|
printf(" [-a threads]: number of app threads, default is:%d\n", appThreads);
|
||||||
|
printf(" [-n requests]: number of requests per thread, default is:%d\n", numOfReqs);
|
||||||
|
printf(" [-o compSize]: compression message size, default is:%d\n", tsCompressMsgSize);
|
||||||
|
printf(" [-u user]: user name for the connection, default is:%s\n", rpcInit.user);
|
||||||
|
printf(" [-k secret]: password for the connection, default is:%s\n", rpcInit.secret);
|
||||||
|
printf(" [-spi SPI]: security parameter index, default is:%d\n", rpcInit.spi);
|
||||||
|
printf(" [-d debugFlag]: debug flag, default:%d\n", rpcDebugFlag);
|
||||||
|
printf(" [-h help]: print out this help\n\n");
|
||||||
|
exit(0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
taosInitLog("client.log", 100000, 10);
|
||||||
|
|
||||||
|
void *pRpc = rpcOpen(&rpcInit);
|
||||||
|
if (pRpc == NULL) {
|
||||||
|
tError("failed to initialize RPC");
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
tInfo("client is initialized");
|
||||||
|
tInfo("threads:%d msgSize:%d requests:%d", appThreads, msgSize, numOfReqs);
|
||||||
|
|
||||||
|
gettimeofday(&systemTime, NULL);
|
||||||
|
startTime = systemTime.tv_sec * 1000000 + systemTime.tv_usec;
|
||||||
|
|
||||||
|
SInfo *pInfo = (SInfo *)calloc(1, sizeof(SInfo) * appThreads);
|
||||||
|
|
||||||
|
pthread_attr_init(&thattr);
|
||||||
|
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
|
||||||
|
|
||||||
|
for (int i = 0; i < appThreads; ++i) {
|
||||||
|
pInfo->index = i;
|
||||||
|
pInfo->epSet = epSet;
|
||||||
|
pInfo->numOfReqs = numOfReqs;
|
||||||
|
pInfo->msgSize = msgSize;
|
||||||
|
tsem_init(&pInfo->rspSem, 0, 0);
|
||||||
|
pInfo->pRpc = pRpc;
|
||||||
|
pthread_create(&pInfo->thread, &thattr, sendRequest, pInfo);
|
||||||
|
pInfo++;
|
||||||
|
}
|
||||||
|
|
||||||
|
do {
|
||||||
|
usleep(1);
|
||||||
|
} while (tcount < appThreads);
|
||||||
|
|
||||||
|
gettimeofday(&systemTime, NULL);
|
||||||
|
endTime = systemTime.tv_sec * 1000000 + systemTime.tv_usec;
|
||||||
|
float usedTime = (endTime - startTime) / 1000.0f; // mseconds
|
||||||
|
|
||||||
|
tInfo("it takes %.3f mseconds to send %d requests to server", usedTime, numOfReqs * appThreads);
|
||||||
|
tInfo("Performance: %.3f requests per second, msgSize:%d bytes", 1000.0 * numOfReqs * appThreads / usedTime, msgSize);
|
||||||
|
|
||||||
|
int ch = getchar();
|
||||||
|
UNUSED(ch);
|
||||||
|
|
||||||
|
taosCloseLog();
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
Loading…
Reference in New Issue