add bench to rpc
This commit is contained in:
parent
38672e395c
commit
6520db8584
|
@ -1,5 +1,4 @@
|
||||||
/** Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
/** Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
|
||||||
*
|
*
|
||||||
* This program is free software: you can use, redistribute, and/or modify
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
* it under the terms of the GNU Affero General Public License, version 3
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
add_executable(transportTest "")
|
add_executable(transportTest "")
|
||||||
add_executable(transUT "")
|
add_executable(transUT "")
|
||||||
add_executable(pushServer "")
|
add_executable(svrBench "")
|
||||||
|
add_executable(cliBench "")
|
||||||
|
|
||||||
target_sources(transUT
|
target_sources(transUT
|
||||||
PRIVATE
|
PRIVATE
|
||||||
|
@ -12,9 +13,13 @@ target_sources(transportTest
|
||||||
"transportTests.cpp"
|
"transportTests.cpp"
|
||||||
)
|
)
|
||||||
|
|
||||||
target_sources(pushServer
|
target_sources(svrBench
|
||||||
PRIVATE
|
PRIVATE
|
||||||
"pushServer.c"
|
"svrBench.c"
|
||||||
|
)
|
||||||
|
target_sources(cliBench
|
||||||
|
PRIVATE
|
||||||
|
"cliBench.c"
|
||||||
)
|
)
|
||||||
|
|
||||||
target_include_directories(transportTest
|
target_include_directories(transportTest
|
||||||
|
@ -45,13 +50,37 @@ target_include_directories(transUT
|
||||||
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
|
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
|
||||||
)
|
)
|
||||||
|
|
||||||
target_include_directories(pushServer
|
target_include_directories(svrBench
|
||||||
|
PUBLIC
|
||||||
|
"${TD_SOURCE_DIR}/include/libs/transport"
|
||||||
|
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
|
||||||
|
)
|
||||||
|
target_include_directories(svrBench
|
||||||
PUBLIC
|
PUBLIC
|
||||||
"${TD_SOURCE_DIR}/include/libs/transport"
|
"${TD_SOURCE_DIR}/include/libs/transport"
|
||||||
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
|
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
|
||||||
)
|
)
|
||||||
|
|
||||||
target_link_libraries (pushServer
|
target_link_libraries (svrBench
|
||||||
|
os
|
||||||
|
util
|
||||||
|
common
|
||||||
|
gtest_main
|
||||||
|
transport
|
||||||
|
)
|
||||||
|
|
||||||
|
target_include_directories(cliBench
|
||||||
|
PUBLIC
|
||||||
|
"${TD_SOURCE_DIR}/include/libs/transport"
|
||||||
|
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
|
||||||
|
)
|
||||||
|
target_include_directories(cliBench
|
||||||
|
PUBLIC
|
||||||
|
"${TD_SOURCE_DIR}/include/libs/transport"
|
||||||
|
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
|
||||||
|
)
|
||||||
|
|
||||||
|
target_link_libraries (cliBench
|
||||||
os
|
os
|
||||||
util
|
util
|
||||||
common
|
common
|
||||||
|
|
|
@ -0,0 +1,182 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "os.h"
|
||||||
|
#include "taoserror.h"
|
||||||
|
#include "tglobal.h"
|
||||||
|
#include "transLog.h"
|
||||||
|
#include "trpc.h"
|
||||||
|
#include "tutil.h"
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int index;
|
||||||
|
SEpSet epSet;
|
||||||
|
int num;
|
||||||
|
int numOfReqs;
|
||||||
|
int msgSize;
|
||||||
|
tsem_t rspSem;
|
||||||
|
tsem_t *pOverSem;
|
||||||
|
TdThread thread;
|
||||||
|
void *pRpc;
|
||||||
|
} SInfo;
|
||||||
|
|
||||||
|
static void processResponse(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||||
|
SInfo *pInfo = (SInfo *)pMsg->info.ahandle;
|
||||||
|
tDebug("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, pMsg->msgType, pMsg->contLen,
|
||||||
|
pMsg->code);
|
||||||
|
|
||||||
|
if (pEpSet) pInfo->epSet = *pEpSet;
|
||||||
|
|
||||||
|
rpcFreeCont(pMsg->pCont);
|
||||||
|
tsem_post(&pInfo->rspSem);
|
||||||
|
}
|
||||||
|
|
||||||
|
static int tcount = 0;
|
||||||
|
|
||||||
|
static void *sendRequest(void *param) {
|
||||||
|
SInfo *pInfo = (SInfo *)param;
|
||||||
|
SRpcMsg rpcMsg = {0};
|
||||||
|
|
||||||
|
tDebug("thread:%d, start to send request", pInfo->index);
|
||||||
|
|
||||||
|
while (pInfo->numOfReqs == 0 || pInfo->num < pInfo->numOfReqs) {
|
||||||
|
pInfo->num++;
|
||||||
|
rpcMsg.pCont = rpcMallocCont(pInfo->msgSize);
|
||||||
|
rpcMsg.contLen = pInfo->msgSize;
|
||||||
|
rpcMsg.info.ahandle = pInfo;
|
||||||
|
rpcMsg.msgType = 1;
|
||||||
|
tDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num);
|
||||||
|
rpcSendRequest(pInfo->pRpc, &pInfo->epSet, &rpcMsg, NULL);
|
||||||
|
if (pInfo->num % 20000 == 0) tInfo("thread:%d, %d requests have been sent", pInfo->index, pInfo->num);
|
||||||
|
tsem_wait(&pInfo->rspSem);
|
||||||
|
}
|
||||||
|
|
||||||
|
tDebug("thread:%d, it is over", pInfo->index);
|
||||||
|
tcount++;
|
||||||
|
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
int main(int argc, char *argv[]) {
|
||||||
|
SRpcInit rpcInit;
|
||||||
|
SEpSet epSet;
|
||||||
|
int msgSize = 128;
|
||||||
|
int numOfReqs = 0;
|
||||||
|
int appThreads = 1;
|
||||||
|
char serverIp[40] = "127.0.0.1";
|
||||||
|
struct timeval systemTime;
|
||||||
|
int64_t startTime, endTime;
|
||||||
|
|
||||||
|
// server info
|
||||||
|
epSet.numOfEps = 1;
|
||||||
|
epSet.inUse = 0;
|
||||||
|
epSet.eps[0].port = 7000;
|
||||||
|
epSet.eps[1].port = 7000;
|
||||||
|
strcpy(epSet.eps[0].fqdn, serverIp);
|
||||||
|
strcpy(epSet.eps[1].fqdn, "192.168.0.1");
|
||||||
|
|
||||||
|
// client info
|
||||||
|
memset(&rpcInit, 0, sizeof(rpcInit));
|
||||||
|
rpcInit.localPort = 0;
|
||||||
|
rpcInit.label = "APP";
|
||||||
|
rpcInit.numOfThreads = 1;
|
||||||
|
rpcInit.cfp = processResponse;
|
||||||
|
rpcInit.sessions = 100;
|
||||||
|
rpcInit.idleTime = tsShellActivityTimer * 1000;
|
||||||
|
rpcInit.user = "michael";
|
||||||
|
rpcInit.connType = TAOS_CONN_CLIENT;
|
||||||
|
|
||||||
|
rpcDebugFlag = 131;
|
||||||
|
for (int i = 1; i < argc; ++i) {
|
||||||
|
if (strcmp(argv[i], "-p") == 0 && i < argc - 1) {
|
||||||
|
} else if (strcmp(argv[i], "-i") == 0 && i < argc - 1) {
|
||||||
|
} else if (strcmp(argv[i], "-t") == 0 && i < argc - 1) {
|
||||||
|
rpcInit.numOfThreads = atoi(argv[++i]);
|
||||||
|
} else if (strcmp(argv[i], "-m") == 0 && i < argc - 1) {
|
||||||
|
msgSize = atoi(argv[++i]);
|
||||||
|
} else if (strcmp(argv[i], "-s") == 0 && i < argc - 1) {
|
||||||
|
rpcInit.sessions = atoi(argv[++i]);
|
||||||
|
} else if (strcmp(argv[i], "-n") == 0 && i < argc - 1) {
|
||||||
|
numOfReqs = atoi(argv[++i]);
|
||||||
|
} else if (strcmp(argv[i], "-a") == 0 && i < argc - 1) {
|
||||||
|
appThreads = atoi(argv[++i]);
|
||||||
|
} else if (strcmp(argv[i], "-o") == 0 && i < argc - 1) {
|
||||||
|
tsCompressMsgSize = atoi(argv[++i]);
|
||||||
|
} else if (strcmp(argv[i], "-u") == 0 && i < argc - 1) {
|
||||||
|
} else if (strcmp(argv[i], "-k") == 0 && i < argc - 1) {
|
||||||
|
} else if (strcmp(argv[i], "-spi") == 0 && i < argc - 1) {
|
||||||
|
} else if (strcmp(argv[i], "-d") == 0 && i < argc - 1) {
|
||||||
|
rpcDebugFlag = atoi(argv[++i]);
|
||||||
|
} else {
|
||||||
|
printf("\nusage: %s [options] \n", argv[0]);
|
||||||
|
printf(" [-i ip]: first server IP address, default is:%s\n", serverIp);
|
||||||
|
printf(" [-t threads]: number of rpc threads, default is:%d\n", rpcInit.numOfThreads);
|
||||||
|
printf(" [-m msgSize]: message body size, default is:%d\n", msgSize);
|
||||||
|
printf(" [-a threads]: number of app threads, default is:%d\n", appThreads);
|
||||||
|
printf(" [-n requests]: number of requests per thread, default is:%d\n", numOfReqs);
|
||||||
|
printf(" [-u user]: user name for the connection, default is:%s\n", rpcInit.user);
|
||||||
|
printf(" [-d debugFlag]: debug flag, default:%d\n", rpcDebugFlag);
|
||||||
|
printf(" [-h help]: print out this help\n\n");
|
||||||
|
exit(0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
taosInitLog("client.log", 100000);
|
||||||
|
|
||||||
|
void *pRpc = rpcOpen(&rpcInit);
|
||||||
|
if (pRpc == NULL) {
|
||||||
|
tError("failed to initialize RPC");
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
tInfo("client is initialized");
|
||||||
|
tInfo("threads:%d msgSize:%d requests:%d", appThreads, msgSize, numOfReqs);
|
||||||
|
|
||||||
|
int64_t now = taosGetTimestampUs();
|
||||||
|
|
||||||
|
SInfo *pInfo = (SInfo *)taosMemoryCalloc(1, sizeof(SInfo) * appThreads);
|
||||||
|
SInfo *p = pInfo;
|
||||||
|
for (int i = 0; i < appThreads; ++i) {
|
||||||
|
pInfo->index = i;
|
||||||
|
pInfo->epSet = epSet;
|
||||||
|
pInfo->numOfReqs = numOfReqs;
|
||||||
|
pInfo->msgSize = msgSize;
|
||||||
|
tsem_init(&pInfo->rspSem, 0, 0);
|
||||||
|
pInfo->pRpc = pRpc;
|
||||||
|
|
||||||
|
taosThreadCreate(&pInfo->thread, NULL, sendRequest, pInfo);
|
||||||
|
pInfo++;
|
||||||
|
}
|
||||||
|
|
||||||
|
do {
|
||||||
|
taosUsleep(1);
|
||||||
|
} while (tcount < appThreads);
|
||||||
|
|
||||||
|
float usedTime = (taosGetTimestampUs() - now) / 1000.0f;
|
||||||
|
|
||||||
|
tInfo("it takes %.3f mseconds to send %d requests to server", usedTime, numOfReqs * appThreads);
|
||||||
|
tInfo("Performance: %.3f requests per second, msgSize:%d bytes", 1000.0 * numOfReqs * appThreads / usedTime, msgSize);
|
||||||
|
|
||||||
|
for (int i = 0; i < appThreads; i++) {
|
||||||
|
SInfo *pInfo = p;
|
||||||
|
taosThreadJoin(pInfo->thread, NULL);
|
||||||
|
p++;
|
||||||
|
}
|
||||||
|
int ch = getchar();
|
||||||
|
UNUSED(ch);
|
||||||
|
|
||||||
|
taosCloseLog();
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
|
@ -24,12 +24,12 @@ int msgSize = 128;
|
||||||
int commit = 0;
|
int commit = 0;
|
||||||
TdFilePtr pDataFile = NULL;
|
TdFilePtr pDataFile = NULL;
|
||||||
STaosQueue *qhandle = NULL;
|
STaosQueue *qhandle = NULL;
|
||||||
STaosQset * qset = NULL;
|
STaosQset *qset = NULL;
|
||||||
|
|
||||||
void processShellMsg() {
|
void processShellMsg() {
|
||||||
static int num = 0;
|
static int num = 0;
|
||||||
STaosQall *qall;
|
STaosQall *qall;
|
||||||
SRpcMsg * pRpcMsg, rpcMsg;
|
SRpcMsg *pRpcMsg, rpcMsg;
|
||||||
int type;
|
int type;
|
||||||
SQueueInfo qinfo = {0};
|
SQueueInfo qinfo = {0};
|
||||||
|
|
||||||
|
@ -77,7 +77,6 @@ void processShellMsg() {
|
||||||
taosFreeQitem(pRpcMsg);
|
taosFreeQitem(pRpcMsg);
|
||||||
|
|
||||||
{
|
{
|
||||||
// taosSsleep(1);
|
|
||||||
SRpcMsg nRpcMsg = {0};
|
SRpcMsg nRpcMsg = {0};
|
||||||
nRpcMsg.pCont = rpcMallocCont(msgSize);
|
nRpcMsg.pCont = rpcMallocCont(msgSize);
|
||||||
nRpcMsg.contLen = msgSize;
|
nRpcMsg.contLen = msgSize;
|
||||||
|
@ -93,26 +92,6 @@ void processShellMsg() {
|
||||||
taosFreeQall(qall);
|
taosFreeQall(qall);
|
||||||
}
|
}
|
||||||
|
|
||||||
int retrieveAuthInfo(void *parent, char *meterId, char *spi, char *encrypt, char *secret, char *ckey) {
|
|
||||||
// app shall retrieve the auth info based on meterID from DB or a data file
|
|
||||||
// demo code here only for simple demo
|
|
||||||
int ret = 0;
|
|
||||||
|
|
||||||
if (strcmp(meterId, "michael") == 0) {
|
|
||||||
*spi = 1;
|
|
||||||
*encrypt = 0;
|
|
||||||
strcpy(secret, "mypassword");
|
|
||||||
strcpy(ckey, "key");
|
|
||||||
} else if (strcmp(meterId, "jeff") == 0) {
|
|
||||||
*spi = 0;
|
|
||||||
*encrypt = 0;
|
|
||||||
} else {
|
|
||||||
ret = -1; // user not there
|
|
||||||
}
|
|
||||||
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
|
|
||||||
void processRequestMsg(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
void processRequestMsg(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||||
SRpcMsg *pTemp;
|
SRpcMsg *pTemp;
|
||||||
|
|
||||||
|
@ -131,11 +110,12 @@ int main(int argc, char *argv[]) {
|
||||||
|
|
||||||
memset(&rpcInit, 0, sizeof(rpcInit));
|
memset(&rpcInit, 0, sizeof(rpcInit));
|
||||||
rpcInit.localPort = 7000;
|
rpcInit.localPort = 7000;
|
||||||
|
memcpy(rpcInit.localFqdn, "localhost", strlen("localhost"));
|
||||||
rpcInit.label = "SER";
|
rpcInit.label = "SER";
|
||||||
rpcInit.numOfThreads = 1;
|
rpcInit.numOfThreads = 1;
|
||||||
rpcInit.cfp = processRequestMsg;
|
rpcInit.cfp = processRequestMsg;
|
||||||
rpcInit.sessions = 1000;
|
|
||||||
rpcInit.idleTime = 2 * 1500;
|
rpcInit.idleTime = 2 * 1500;
|
||||||
|
rpcDebugFlag = 131;
|
||||||
|
|
||||||
for (int i = 1; i < argc; ++i) {
|
for (int i = 1; i < argc; ++i) {
|
||||||
if (strcmp(argv[i], "-p") == 0 && i < argc - 1) {
|
if (strcmp(argv[i], "-p") == 0 && i < argc - 1) {
|
||||||
|
@ -170,7 +150,7 @@ int main(int argc, char *argv[]) {
|
||||||
|
|
||||||
tsAsyncLog = 0;
|
tsAsyncLog = 0;
|
||||||
rpcInit.connType = TAOS_CONN_SERVER;
|
rpcInit.connType = TAOS_CONN_SERVER;
|
||||||
taosInitLog("server.log", 10);
|
taosInitLog("server.log", 100000);
|
||||||
|
|
||||||
void *pRpc = rpcOpen(&rpcInit);
|
void *pRpc = rpcOpen(&rpcInit);
|
||||||
if (pRpc == NULL) {
|
if (pRpc == NULL) {
|
|
@ -16,8 +16,8 @@
|
||||||
#define _DEFAULT_SOURCE
|
#define _DEFAULT_SOURCE
|
||||||
#include "tlog.h"
|
#include "tlog.h"
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
#include "tutil.h"
|
|
||||||
#include "tconfig.h"
|
#include "tconfig.h"
|
||||||
|
#include "tutil.h"
|
||||||
|
|
||||||
#define LOG_MAX_LINE_SIZE (1024)
|
#define LOG_MAX_LINE_SIZE (1024)
|
||||||
#define LOG_MAX_LINE_BUFFER_SIZE (LOG_MAX_LINE_SIZE + 3)
|
#define LOG_MAX_LINE_BUFFER_SIZE (LOG_MAX_LINE_SIZE + 3)
|
||||||
|
@ -40,7 +40,7 @@
|
||||||
#define LOG_BUF_MUTEX(x) ((x)->buffMutex)
|
#define LOG_BUF_MUTEX(x) ((x)->buffMutex)
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
char * buffer;
|
char *buffer;
|
||||||
int32_t buffStart;
|
int32_t buffStart;
|
||||||
int32_t buffEnd;
|
int32_t buffEnd;
|
||||||
int32_t buffSize;
|
int32_t buffSize;
|
||||||
|
@ -59,7 +59,7 @@ typedef struct {
|
||||||
int32_t openInProgress;
|
int32_t openInProgress;
|
||||||
pid_t pid;
|
pid_t pid;
|
||||||
char logName[LOG_FILE_NAME_LEN];
|
char logName[LOG_FILE_NAME_LEN];
|
||||||
SLogBuff * logHandle;
|
SLogBuff *logHandle;
|
||||||
TdThreadMutex logMutex;
|
TdThreadMutex logMutex;
|
||||||
} SLogObj;
|
} SLogObj;
|
||||||
|
|
||||||
|
@ -106,7 +106,7 @@ int64_t dbgSmallWN = 0;
|
||||||
int64_t dbgBigWN = 0;
|
int64_t dbgBigWN = 0;
|
||||||
int64_t dbgWSize = 0;
|
int64_t dbgWSize = 0;
|
||||||
|
|
||||||
static void * taosAsyncOutputLog(void *param);
|
static void *taosAsyncOutputLog(void *param);
|
||||||
static int32_t taosPushLogBuffer(SLogBuff *pLogBuf, const char *msg, int32_t msgLen);
|
static int32_t taosPushLogBuffer(SLogBuff *pLogBuf, const char *msg, int32_t msgLen);
|
||||||
static SLogBuff *taosLogBuffNew(int32_t bufSize);
|
static SLogBuff *taosLogBuffNew(int32_t bufSize);
|
||||||
static void taosCloseLogByFd(TdFilePtr pFile);
|
static void taosCloseLogByFd(TdFilePtr pFile);
|
||||||
|
@ -128,7 +128,11 @@ int32_t taosInitLog(const char *logName, int32_t maxFiles) {
|
||||||
osUpdate();
|
osUpdate();
|
||||||
|
|
||||||
char fullName[PATH_MAX] = {0};
|
char fullName[PATH_MAX] = {0};
|
||||||
|
if (strlen(tsLogDir) != 0) {
|
||||||
snprintf(fullName, PATH_MAX, "%s" TD_DIRSEP "%s", tsLogDir, logName);
|
snprintf(fullName, PATH_MAX, "%s" TD_DIRSEP "%s", tsLogDir, logName);
|
||||||
|
} else {
|
||||||
|
snprintf(fullName, PATH_MAX, "%s", logName);
|
||||||
|
}
|
||||||
|
|
||||||
tsLogObj.logHandle = taosLogBuffNew(LOG_DEFAULT_BUF_SIZE);
|
tsLogObj.logHandle = taosLogBuffNew(LOG_DEFAULT_BUF_SIZE);
|
||||||
if (tsLogObj.logHandle == NULL) return -1;
|
if (tsLogObj.logHandle == NULL) return -1;
|
||||||
|
@ -704,7 +708,7 @@ int32_t taosCompressFile(char *srcFileName, char *destFileName) {
|
||||||
int32_t compressSize = 163840;
|
int32_t compressSize = 163840;
|
||||||
int32_t ret = 0;
|
int32_t ret = 0;
|
||||||
int32_t len = 0;
|
int32_t len = 0;
|
||||||
char * data = taosMemoryMalloc(compressSize);
|
char *data = taosMemoryMalloc(compressSize);
|
||||||
// gzFile dstFp = NULL;
|
// gzFile dstFp = NULL;
|
||||||
|
|
||||||
// srcFp = fopen(srcFileName, "r");
|
// srcFp = fopen(srcFileName, "r");
|
||||||
|
|
Loading…
Reference in New Issue