add test case
This commit is contained in:
parent
456ed98e17
commit
97c9dba167
|
@ -1,5 +1,6 @@
|
|||
add_executable(transportTest "")
|
||||
add_executable(transUT "")
|
||||
add_executable(transUT2 "")
|
||||
add_executable(svrBench "")
|
||||
add_executable(cliBench "")
|
||||
add_executable(httpBench "")
|
||||
|
@ -9,6 +10,10 @@ target_sources(transUT
|
|||
"transUT.cpp"
|
||||
)
|
||||
|
||||
target_sources(transUT2
|
||||
PRIVATE
|
||||
"transUT2.cpp"
|
||||
)
|
||||
target_sources(transportTest
|
||||
PRIVATE
|
||||
"transportTests.cpp"
|
||||
|
@ -56,6 +61,20 @@ target_include_directories(transUT
|
|||
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
|
||||
)
|
||||
|
||||
target_link_libraries (transUT2
|
||||
os
|
||||
util
|
||||
common
|
||||
gtest_main
|
||||
transport
|
||||
)
|
||||
|
||||
target_include_directories(transUT2
|
||||
PUBLIC
|
||||
"${TD_SOURCE_DIR}/include/libs/transport"
|
||||
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
|
||||
)
|
||||
|
||||
target_include_directories(svrBench
|
||||
PUBLIC
|
||||
"${TD_SOURCE_DIR}/include/libs/transport"
|
||||
|
|
|
@ -53,8 +53,6 @@ static void processResponse(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
|||
tDebug("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, pMsg->msgType, pMsg->contLen,
|
||||
pMsg->code);
|
||||
|
||||
if (pEpSet) pInfo->epSet = *pEpSet;
|
||||
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
tsem_post(&pInfo->rspSem);
|
||||
}
|
||||
|
@ -72,12 +70,12 @@ static void *sendRequest(void *param) {
|
|||
rpcMsg.pCont = rpcMallocCont(pInfo->msgSize);
|
||||
rpcMsg.contLen = pInfo->msgSize;
|
||||
rpcMsg.info.ahandle = pInfo;
|
||||
rpcMsg.info.noResp = 1;
|
||||
rpcMsg.info.noResp = 0;
|
||||
rpcMsg.msgType = 1;
|
||||
tDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num);
|
||||
rpcSendRequest(pInfo->pRpc, &pInfo->epSet, &rpcMsg, NULL);
|
||||
if (pInfo->num % 20000 == 0) tInfo("thread:%d, %d requests have been sent", pInfo->index, pInfo->num);
|
||||
// tsem_wait(&pInfo->rspSem);
|
||||
tsem_wait(&pInfo->rspSem);
|
||||
}
|
||||
|
||||
tDebug("thread:%d, it is over", pInfo->index);
|
||||
|
@ -110,17 +108,15 @@ int main(int argc, char *argv[]) {
|
|||
rpcInit.label = "APP";
|
||||
rpcInit.numOfThreads = 1;
|
||||
rpcInit.cfp = processResponse;
|
||||
rpcInit.sessions = 100;
|
||||
rpcInit.sessions = 1000;
|
||||
rpcInit.idleTime = tsShellActivityTimer * 1000;
|
||||
rpcInit.user = "michael";
|
||||
|
||||
rpcInit.connType = TAOS_CONN_CLIENT;
|
||||
rpcInit.connLimitNum = 10;
|
||||
rpcInit.connLimitLock = 1;
|
||||
rpcInit.shareConnLimit = 16 * 1024;
|
||||
rpcInit.shareConnLimit = tsShareConnLimit;
|
||||
rpcInit.supportBatch = 1;
|
||||
|
||||
rpcDebugFlag = 135;
|
||||
rpcInit.compressSize = -1;
|
||||
rpcDebugFlag = 143;
|
||||
for (int i = 1; i < argc; ++i) {
|
||||
if (strcmp(argv[i], "-p") == 0 && i < argc - 1) {
|
||||
} else if (strcmp(argv[i], "-i") == 0 && i < argc - 1) {
|
||||
|
@ -139,6 +135,10 @@ int main(int argc, char *argv[]) {
|
|||
} else if (strcmp(argv[i], "-u") == 0 && i < argc - 1) {
|
||||
} else if (strcmp(argv[i], "-k") == 0 && i < argc - 1) {
|
||||
} else if (strcmp(argv[i], "-spi") == 0 && i < argc - 1) {
|
||||
} else if (strcmp(argv[i], "-l") == 0 && i < argc - 1) {
|
||||
rpcInit.shareConnLimit = atoi(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-c") == 0 && i < argc - 1) {
|
||||
rpcInit.compressSize = atoi(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-d") == 0 && i < argc - 1) {
|
||||
rpcDebugFlag = atoi(argv[++i]);
|
||||
} else {
|
||||
|
@ -150,6 +150,8 @@ int main(int argc, char *argv[]) {
|
|||
printf(" [-n requests]: number of requests per thread, default is:%d\n", numOfReqs);
|
||||
printf(" [-u user]: user name for the connection, default is:%s\n", rpcInit.user);
|
||||
printf(" [-d debugFlag]: debug flag, default:%d\n", rpcDebugFlag);
|
||||
printf(" [-c compressSize]: compress size, default:%d\n", tsCompressMsgSize);
|
||||
printf(" [-l shareConnLimit]: share conn limit, default:%d\n", tsShareConnLimit);
|
||||
printf(" [-h help]: print out this help\n\n");
|
||||
exit(0);
|
||||
}
|
||||
|
@ -168,18 +170,18 @@ int main(int argc, char *argv[]) {
|
|||
|
||||
int64_t now = taosGetTimestampUs();
|
||||
|
||||
SInfo *pInfo = (SInfo *)taosMemoryCalloc(1, sizeof(SInfo) * appThreads);
|
||||
SInfo *p = pInfo;
|
||||
SInfo **pInfo = (SInfo **)taosMemoryCalloc(1, sizeof(SInfo *) * appThreads);
|
||||
for (int i = 0; i < appThreads; ++i) {
|
||||
pInfo->index = i;
|
||||
pInfo->epSet = epSet;
|
||||
pInfo->numOfReqs = numOfReqs;
|
||||
pInfo->msgSize = msgSize;
|
||||
tsem_init(&pInfo->rspSem, 0, 0);
|
||||
pInfo->pRpc = pRpc;
|
||||
SInfo *p = taosMemoryCalloc(1, sizeof(SInfo));
|
||||
p->index = i;
|
||||
p->epSet = epSet;
|
||||
p->numOfReqs = numOfReqs;
|
||||
p->msgSize = msgSize;
|
||||
tsem_init(&p->rspSem, 0, 0);
|
||||
p->pRpc = pRpc;
|
||||
pInfo[i] = p;
|
||||
|
||||
taosThreadCreate(&pInfo->thread, NULL, sendRequest, pInfo);
|
||||
pInfo++;
|
||||
taosThreadCreate(&p->thread, NULL, sendRequest, pInfo[i]);
|
||||
}
|
||||
|
||||
do {
|
||||
|
@ -192,12 +194,14 @@ int main(int argc, char *argv[]) {
|
|||
tInfo("Performance: %.3f requests per second, msgSize:%d bytes", 1000.0 * numOfReqs * appThreads / usedTime, msgSize);
|
||||
|
||||
for (int i = 0; i < appThreads; i++) {
|
||||
SInfo *pInfo = p;
|
||||
taosThreadJoin(pInfo->thread, NULL);
|
||||
p++;
|
||||
SInfo *p = pInfo[i];
|
||||
taosThreadJoin(p->thread, NULL);
|
||||
taosMemoryFree(p);
|
||||
}
|
||||
int ch = getchar();
|
||||
UNUSED(ch);
|
||||
taosMemoryFree(pInfo);
|
||||
|
||||
// int ch = getchar();
|
||||
// UNUSED(ch);
|
||||
|
||||
taosCloseLog();
|
||||
|
||||
|
|
|
@ -76,23 +76,6 @@ void *processShellMsg(void *arg) {
|
|||
|
||||
for (int i = 0; i < numOfMsgs; ++i) {
|
||||
taosGetQitem(qall, (void **)&pRpcMsg);
|
||||
|
||||
if (pDataFile != NULL) {
|
||||
if (taosWriteFile(pDataFile, pRpcMsg->pCont, pRpcMsg->contLen) < 0) {
|
||||
tInfo("failed to write data file, reason:%s", strerror(errno));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (commit >= 2) {
|
||||
num += numOfMsgs;
|
||||
// if (taosFsync(pDataFile) < 0) {
|
||||
// tInfo("failed to flush data to file, reason:%s", strerror(errno));
|
||||
//}
|
||||
|
||||
if (num % 10000 == 0) {
|
||||
tInfo("%d request have been written into disk", num);
|
||||
}
|
||||
}
|
||||
|
||||
taosResetQitems(qall);
|
||||
|
@ -107,16 +90,7 @@ void *processShellMsg(void *arg) {
|
|||
rpcMsg.code = 0;
|
||||
rpcSendResponse(&rpcMsg);
|
||||
|
||||
void *handle = pRpcMsg->info.handle;
|
||||
taosFreeQitem(pRpcMsg);
|
||||
//{
|
||||
// SRpcMsg nRpcMsg = {0};
|
||||
// nRpcMsg.pCont = rpcMallocCont(msgSize);
|
||||
// nRpcMsg.contLen = msgSize;
|
||||
// nRpcMsg.info.handle = handle;
|
||||
// nRpcMsg.code = TSDB_CODE_CTG_NOT_READY;
|
||||
// rpcSendResponse(&nRpcMsg);
|
||||
//}
|
||||
}
|
||||
|
||||
taosUpdateItemSize(qinfo.queue, numOfMsgs);
|
||||
|
@ -149,12 +123,13 @@ int main(int argc, char *argv[]) {
|
|||
rpcInit.localPort = 7000;
|
||||
memcpy(rpcInit.localFqdn, "localhost", strlen("localhost"));
|
||||
rpcInit.label = "SER";
|
||||
rpcInit.numOfThreads = 1;
|
||||
rpcInit.numOfThreads = 10;
|
||||
rpcInit.cfp = processRequestMsg;
|
||||
rpcInit.idleTime = 2 * 1500;
|
||||
|
||||
taosVersionStrToInt(version, &(rpcInit.compatibilityVer));
|
||||
rpcDebugFlag = 131;
|
||||
rpcInit.compressSize = -1;
|
||||
|
||||
for (int i = 1; i < argc; ++i) {
|
||||
if (strcmp(argv[i], "-p") == 0 && i < argc - 1) {
|
||||
|
@ -205,8 +180,8 @@ int main(int argc, char *argv[]) {
|
|||
if (pDataFile == NULL) tInfo("failed to open data file, reason:%s", strerror(errno));
|
||||
}
|
||||
|
||||
int32_t numOfAthread = 5;
|
||||
multiQ = taosMemoryMalloc(sizeof(numOfAthread));
|
||||
int32_t numOfAthread = 1;
|
||||
multiQ = taosMemoryMalloc(sizeof(MultiThreadQhandle));
|
||||
multiQ->numOfThread = numOfAthread;
|
||||
multiQ->qhandle = (STaosQueue **)taosMemoryMalloc(sizeof(STaosQueue *) * numOfAthread);
|
||||
multiQ->qset = (STaosQset **)taosMemoryMalloc(sizeof(STaosQset *) * numOfAthread);
|
||||
|
@ -221,11 +196,6 @@ int main(int argc, char *argv[]) {
|
|||
threads[i].idx = i;
|
||||
taosThreadCreate(&(threads[i].thread), NULL, processShellMsg, (void *)&threads[i]);
|
||||
}
|
||||
// qhandle = taosOpenQueue();
|
||||
// qset = taosOpenQset();
|
||||
// taosAddIntoQset(qset, qhandle, NULL);
|
||||
|
||||
// processShellMsg();
|
||||
|
||||
if (pDataFile != NULL) {
|
||||
taosCloseFile(&pDataFile);
|
||||
|
|
|
@ -54,6 +54,7 @@ class Client {
|
|||
rpcInit_.user = (char *)user;
|
||||
rpcInit_.parent = this;
|
||||
rpcInit_.connType = TAOS_CONN_CLIENT;
|
||||
rpcInit_.shareConnLimit = 200;
|
||||
|
||||
taosVersionStrToInt(version, &(rpcInit_.compatibilityVer));
|
||||
this->transCli = rpcOpen(&rpcInit_);
|
||||
|
@ -85,6 +86,14 @@ class Client {
|
|||
SemWait();
|
||||
*resp = this->resp;
|
||||
}
|
||||
void sendReq(SRpcMsg *req) {
|
||||
SEpSet epSet = {0};
|
||||
epSet.inUse = 0;
|
||||
addEpIntoEpSet(&epSet, "127.0.0.1", 7000);
|
||||
|
||||
rpcSendRequest(this->transCli, &epSet, req, NULL);
|
||||
|
||||
}
|
||||
void SendAndRecvNoHandle(SRpcMsg *req, SRpcMsg *resp) {
|
||||
if (req->info.handle != NULL) {
|
||||
rpcReleaseHandle(req->info.handle, TAOS_CONN_CLIENT);
|
||||
|
@ -160,6 +169,7 @@ static void processReq(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
|||
rpcMsg.contLen = 100;
|
||||
rpcMsg.info = pMsg->info;
|
||||
rpcMsg.code = 0;
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
rpcSendResponse(&rpcMsg);
|
||||
}
|
||||
|
||||
|
@ -264,6 +274,7 @@ class TransObj {
|
|||
cli->Stop();
|
||||
}
|
||||
void cliSendAndRecv(SRpcMsg *req, SRpcMsg *resp) { cli->SendAndRecv(req, resp); }
|
||||
void cliSendReq(SRpcMsg *req) { cli->sendReq(req); }
|
||||
void cliSendAndRecvNoHandle(SRpcMsg *req, SRpcMsg *resp) { cli->SendAndRecvNoHandle(req, resp); }
|
||||
|
||||
~TransObj() {
|
||||
|
@ -492,15 +503,16 @@ TEST_F(TransEnv, queryExcept) {
|
|||
TEST_F(TransEnv, noResp) {
|
||||
SRpcMsg resp = {0};
|
||||
SRpcMsg req = {0};
|
||||
// for (int i = 0; i < 5; i++) {
|
||||
// memset(&req, 0, sizeof(req));
|
||||
// req.info.noResp = 1;
|
||||
// req.msgType = 1;
|
||||
// req.pCont = rpcMallocCont(10);
|
||||
// req.contLen = 10;
|
||||
for (int i = 0; i < 500000; i++) {
|
||||
memset(&req, 0, sizeof(req));
|
||||
req.info.noResp = 1;
|
||||
req.msgType = 3;
|
||||
req.pCont = rpcMallocCont(10);
|
||||
req.contLen = 10;
|
||||
tr->cliSendReq(&req);
|
||||
//tr->cliSendAndRecv(&req, &resp);
|
||||
//}
|
||||
// taosMsleep(2000);
|
||||
}
|
||||
taosMsleep(2000);
|
||||
|
||||
// no resp
|
||||
}
|
||||
|
|
|
@ -0,0 +1,529 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free
|
||||
* Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
#include <gtest/gtest.h>
|
||||
#include <cstdio>
|
||||
#include <cstring>
|
||||
#include "tdatablock.h"
|
||||
#include "tglobal.h"
|
||||
#include "tlog.h"
|
||||
#include "tmisce.h"
|
||||
#include "transLog.h"
|
||||
#include "trpc.h"
|
||||
#include "tversion.h"
|
||||
using namespace std;
|
||||
|
||||
const char *label = "APP";
|
||||
const char *secret = "secret";
|
||||
const char *user = "user";
|
||||
const char *ckey = "ckey";
|
||||
|
||||
class Server;
|
||||
int port = 7000;
|
||||
// server process
|
||||
// server except
|
||||
|
||||
typedef void (*CB)(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet);
|
||||
|
||||
static void processContinueSend(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet);
|
||||
static void processReleaseHandleCb(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet);
|
||||
static void processRegisterFailure(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet);
|
||||
static void processReq(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet);
|
||||
// client process;
|
||||
static void processResp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet);
|
||||
class Client {
|
||||
public:
|
||||
void Init(int nThread) {
|
||||
memcpy(tsTempDir, TD_TMP_DIR_PATH, strlen(TD_TMP_DIR_PATH));
|
||||
memset(&rpcInit_, 0, sizeof(rpcInit_));
|
||||
rpcInit_.localPort = 0;
|
||||
rpcInit_.label = (char *)"client";
|
||||
rpcInit_.numOfThreads = nThread;
|
||||
rpcInit_.cfp = processResp;
|
||||
rpcInit_.user = (char *)user;
|
||||
rpcInit_.parent = this;
|
||||
rpcInit_.connType = TAOS_CONN_CLIENT;
|
||||
rpcInit_.shareConnLimit = 200;
|
||||
|
||||
taosVersionStrToInt(version, &(rpcInit_.compatibilityVer));
|
||||
this->transCli = rpcOpen(&rpcInit_);
|
||||
//tsem_init(&this->sem, 0, 0);
|
||||
}
|
||||
void SetResp(SRpcMsg *pMsg) {
|
||||
// set up resp;
|
||||
this->resp = *pMsg;
|
||||
}
|
||||
SRpcMsg *Resp() { return &this->resp; }
|
||||
|
||||
void Restart(CB cb) {
|
||||
rpcClose(this->transCli);
|
||||
rpcInit_.cfp = cb;
|
||||
taosVersionStrToInt(version, &(rpcInit_.compatibilityVer));
|
||||
this->transCli = rpcOpen(&rpcInit_);
|
||||
}
|
||||
void Stop() {
|
||||
rpcClose(this->transCli);
|
||||
this->transCli = NULL;
|
||||
}
|
||||
|
||||
void SendAndRecv(SRpcMsg *req, SRpcMsg *resp) {
|
||||
SEpSet epSet = {0};
|
||||
epSet.inUse = 0;
|
||||
addEpIntoEpSet(&epSet, "127.0.0.1", 7000);
|
||||
|
||||
rpcSendRequest(this->transCli, &epSet, req, NULL);
|
||||
SemWait();
|
||||
*resp = this->resp;
|
||||
}
|
||||
void sendReq(SRpcMsg *req) {
|
||||
SEpSet epSet = {0};
|
||||
epSet.inUse = 0;
|
||||
addEpIntoEpSet(&epSet, "127.0.0.1", 7000);
|
||||
|
||||
rpcSendRequest(this->transCli, &epSet, req, NULL);
|
||||
}
|
||||
|
||||
void sendReqWithId(SRpcMsg *req, int64_t *id) {
|
||||
SEpSet epSet = {0};
|
||||
epSet.inUse = 0;
|
||||
addEpIntoEpSet(&epSet, "127.0.0.1",7000);
|
||||
rpcSendRequestWithCtx(this->transCli, &epSet, req, id, NULL);
|
||||
|
||||
}
|
||||
void freeId(int64_t *id) {
|
||||
rpcFreeConnById(this->transCli, *id);
|
||||
}
|
||||
void SendAndRecvNoHandle(SRpcMsg *req, SRpcMsg *resp) {
|
||||
if (req->info.handle != NULL) {
|
||||
rpcReleaseHandle(req->info.handle, TAOS_CONN_CLIENT);
|
||||
req->info.handle = NULL;
|
||||
}
|
||||
SendAndRecv(req, resp);
|
||||
}
|
||||
|
||||
void SemWait() { tsem_wait(&this->sem); }
|
||||
void SemPost() { tsem_post(&this->sem); }
|
||||
void Reset() {}
|
||||
|
||||
~Client() {
|
||||
if (this->transCli) rpcClose(this->transCli);
|
||||
}
|
||||
|
||||
private:
|
||||
tsem_t sem;
|
||||
SRpcInit rpcInit_;
|
||||
void *transCli;
|
||||
SRpcMsg resp;
|
||||
};
|
||||
class Server {
|
||||
public:
|
||||
Server() {
|
||||
memcpy(tsTempDir, TD_TMP_DIR_PATH, strlen(TD_TMP_DIR_PATH));
|
||||
memset(&rpcInit_, 0, sizeof(rpcInit_));
|
||||
|
||||
memcpy(rpcInit_.localFqdn, "localhost", strlen("localhost"));
|
||||
rpcInit_.localPort = port;
|
||||
rpcInit_.label = (char *)"server";
|
||||
rpcInit_.numOfThreads = 5;
|
||||
rpcInit_.cfp = processReq;
|
||||
rpcInit_.user = (char *)user;
|
||||
rpcInit_.connType = TAOS_CONN_SERVER;
|
||||
taosVersionStrToInt(version, &(rpcInit_.compatibilityVer));
|
||||
}
|
||||
void Start() {
|
||||
this->transSrv = rpcOpen(&this->rpcInit_);
|
||||
taosMsleep(1000);
|
||||
}
|
||||
void SetSrvContinueSend(CB cb) {
|
||||
this->Stop();
|
||||
rpcInit_.cfp = cb;
|
||||
this->Start();
|
||||
}
|
||||
void Stop() {
|
||||
if (this->transSrv == NULL) return;
|
||||
rpcClose(this->transSrv);
|
||||
this->transSrv = NULL;
|
||||
}
|
||||
void SetSrvSend(void (*cfp)(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet)) {
|
||||
this->Stop();
|
||||
rpcInit_.cfp = cfp;
|
||||
this->Start();
|
||||
}
|
||||
void Restart() {
|
||||
this->Stop();
|
||||
this->Start();
|
||||
}
|
||||
~Server() {
|
||||
if (this->transSrv) rpcClose(this->transSrv);
|
||||
this->transSrv = NULL;
|
||||
}
|
||||
|
||||
private:
|
||||
SRpcInit rpcInit_;
|
||||
void *transSrv;
|
||||
};
|
||||
static void processReq(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||
SRpcMsg rpcMsg = {0};
|
||||
rpcMsg.pCont = rpcMallocCont(100);
|
||||
rpcMsg.contLen = 100;
|
||||
rpcMsg.info = pMsg->info;
|
||||
rpcMsg.code = 0;
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
rpcSendResponse(&rpcMsg);
|
||||
}
|
||||
|
||||
static void processContinueSend(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||
// for (int i = 0; i < 10; i++) {
|
||||
// SRpcMsg rpcMsg = {0};
|
||||
// rpcMsg.pCont = rpcMallocCont(100);
|
||||
// rpcMsg.contLen = 100;
|
||||
// rpcMsg.info = pMsg->info;
|
||||
// rpcMsg.code = 0;
|
||||
// rpcSendResponse(&rpcMsg);
|
||||
// }
|
||||
}
|
||||
static void processReleaseHandleCb(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||
SRpcMsg rpcMsg = {0};
|
||||
rpcMsg.pCont = rpcMallocCont(100);
|
||||
rpcMsg.contLen = 100;
|
||||
rpcMsg.info = pMsg->info;
|
||||
rpcMsg.code = 0;
|
||||
rpcSendResponse(&rpcMsg);
|
||||
|
||||
rpcReleaseHandle(&pMsg->info, TAOS_CONN_SERVER);
|
||||
}
|
||||
static void processRegisterFailure(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||
// {
|
||||
// SRpcMsg rpcMsg1 = {0};
|
||||
// rpcMsg1.pCont = rpcMallocCont(100);
|
||||
// rpcMsg1.contLen = 100;
|
||||
// rpcMsg1.info = pMsg->info;
|
||||
// rpcMsg1.code = 0;
|
||||
// rpcRegisterBrokenLinkArg(&rpcMsg1);
|
||||
// }
|
||||
// taosMsleep(10);
|
||||
|
||||
// SRpcMsg rpcMsg = {0};
|
||||
// rpcMsg.pCont = rpcMallocCont(100);
|
||||
// rpcMsg.contLen = 100;
|
||||
// rpcMsg.info = pMsg->info;
|
||||
// rpcMsg.code = 0;
|
||||
// rpcSendResponse(&rpcMsg);
|
||||
}
|
||||
// client process;
|
||||
static void processResp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||
Client *client = (Client *)parent;
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
STraceId *trace = (STraceId *)&pMsg->info.traceId;
|
||||
tGDebug("received resp %s",tstrerror(pMsg->code));
|
||||
}
|
||||
|
||||
static void initEnv() {
|
||||
dDebugFlag = 143;
|
||||
vDebugFlag = 0;
|
||||
mDebugFlag = 143;
|
||||
cDebugFlag = 0;
|
||||
jniDebugFlag = 0;
|
||||
tmrDebugFlag = 143;
|
||||
uDebugFlag = 143;
|
||||
rpcDebugFlag = 143;
|
||||
qDebugFlag = 0;
|
||||
wDebugFlag = 0;
|
||||
sDebugFlag = 0;
|
||||
tsdbDebugFlag = 0;
|
||||
tsLogEmbedded = 1;
|
||||
tsAsyncLog = 0;
|
||||
|
||||
std::string path = TD_TMP_DIR_PATH "transport";
|
||||
// taosRemoveDir(path.c_str());
|
||||
taosMkDir(path.c_str());
|
||||
|
||||
tstrncpy(tsLogDir, path.c_str(), PATH_MAX);
|
||||
if (taosInitLog("taosdlog", 1, false) != 0) {
|
||||
printf("failed to init log file\n");
|
||||
}
|
||||
}
|
||||
|
||||
class TransObj {
|
||||
public:
|
||||
TransObj() {
|
||||
initEnv();
|
||||
cli = new Client;
|
||||
cli->Init(1);
|
||||
srv = new Server;
|
||||
srv->Start();
|
||||
}
|
||||
|
||||
void RestartCli(CB cb) {
|
||||
//
|
||||
cli->Restart(cb);
|
||||
}
|
||||
void StopSrv() {
|
||||
//
|
||||
srv->Stop();
|
||||
}
|
||||
// call when link broken, and notify query or fetch stop
|
||||
void SetSrvContinueSend(void (*cfp)(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet)) {
|
||||
///////
|
||||
srv->SetSrvContinueSend(cfp);
|
||||
}
|
||||
void RestartSrv() { srv->Restart(); }
|
||||
void StopCli() {
|
||||
///////
|
||||
cli->Stop();
|
||||
}
|
||||
void cliSendAndRecv(SRpcMsg *req, SRpcMsg *resp) { cli->SendAndRecv(req, resp); }
|
||||
void cliSendReq(SRpcMsg *req) { cli->sendReq(req); }
|
||||
|
||||
void cliSendReqWithId(SRpcMsg *req, int64_t *id) { cli->sendReqWithId(req, id);}
|
||||
void cliFreeReqId(int64_t *id) { cli->freeId(id);}
|
||||
void cliSendAndRecvNoHandle(SRpcMsg *req, SRpcMsg *resp) { cli->SendAndRecvNoHandle(req, resp); }
|
||||
|
||||
~TransObj() {
|
||||
delete cli;
|
||||
delete srv;
|
||||
}
|
||||
|
||||
private:
|
||||
Client *cli;
|
||||
Server *srv;
|
||||
};
|
||||
class TransEnv : public ::testing::Test {
|
||||
protected:
|
||||
virtual void SetUp() {
|
||||
// set up trans obj
|
||||
tr = new TransObj();
|
||||
}
|
||||
virtual void TearDown() {
|
||||
// tear down
|
||||
delete tr;
|
||||
}
|
||||
|
||||
TransObj *tr = NULL;
|
||||
};
|
||||
|
||||
TEST_F(TransEnv, 01sendAndRec) {
|
||||
// for (int i = 0; i < 10; i++) {
|
||||
// SRpcMsg req = {0}, resp = {0};
|
||||
// req.msgType = 0;
|
||||
// req.pCont = rpcMallocCont(10);
|
||||
// req.contLen = 10;
|
||||
// tr->cliSendAndRecv(&req, &resp);
|
||||
// assert(resp.code == 0);
|
||||
// }
|
||||
}
|
||||
|
||||
TEST_F(TransEnv, 02StopServer) {
|
||||
// for (int i = 0; i < 1; i++) {
|
||||
// SRpcMsg req = {0}, resp = {0};
|
||||
// req.msgType = 0;
|
||||
// req.info.ahandle = (void *)0x35;
|
||||
// req.pCont = rpcMallocCont(10);
|
||||
// req.contLen = 10;
|
||||
// tr->cliSendAndRecv(&req, &resp);
|
||||
// assert(resp.code == 0);
|
||||
// }
|
||||
// SRpcMsg req = {0}, resp = {0};
|
||||
// req.info.ahandle = (void *)0x35;
|
||||
// req.msgType = 1;
|
||||
// req.pCont = rpcMallocCont(10);
|
||||
// req.contLen = 10;
|
||||
// tr->StopSrv();
|
||||
// // tr->RestartSrv();
|
||||
// tr->cliSendAndRecv(&req, &resp);
|
||||
// assert(resp.code != 0);
|
||||
}
|
||||
TEST_F(TransEnv, clientUserDefined) {
|
||||
// tr->RestartSrv();
|
||||
// for (int i = 0; i < 10; i++) {
|
||||
// SRpcMsg req = {0}, resp = {0};
|
||||
// req.msgType = 0;
|
||||
// req.pCont = rpcMallocCont(10);
|
||||
// req.contLen = 10;
|
||||
// tr->cliSendAndRecv(&req, &resp);
|
||||
// assert(resp.code == 0);
|
||||
// }
|
||||
|
||||
//////////////////
|
||||
}
|
||||
|
||||
TEST_F(TransEnv, cliPersistHandle) {
|
||||
// SRpcMsg resp = {0};
|
||||
// void *handle = NULL;
|
||||
// for (int i = 0; i < 10; i++) {
|
||||
// SRpcMsg req = {0};
|
||||
// req.info = resp.info;
|
||||
// req.info.persistHandle = 1;
|
||||
|
||||
// req.msgType = 1;
|
||||
// req.pCont = rpcMallocCont(10);
|
||||
// req.contLen = 10;
|
||||
// tr->cliSendAndRecv(&req, &resp);
|
||||
// // if (i == 5) {
|
||||
// // std::cout << "stop server" << std::endl;
|
||||
// // tr->StopSrv();
|
||||
// //}
|
||||
// // if (i >= 6) {
|
||||
// // EXPECT_TRUE(resp.code != 0);
|
||||
// //}
|
||||
// handle = resp.info.handle;
|
||||
// }
|
||||
// rpcReleaseHandle(handle, TAOS_CONN_CLIENT);
|
||||
// for (int i = 0; i < 10; i++) {
|
||||
// SRpcMsg req = {0};
|
||||
// req.msgType = 1;
|
||||
// req.pCont = rpcMallocCont(10);
|
||||
// req.contLen = 10;
|
||||
// tr->cliSendAndRecv(&req, &resp);
|
||||
// }
|
||||
|
||||
// taosMsleep(1000);
|
||||
//////////////////
|
||||
}
|
||||
|
||||
TEST_F(TransEnv, srvReleaseHandle) {
|
||||
// SRpcMsg resp = {0};
|
||||
// tr->SetSrvContinueSend(processReleaseHandleCb);
|
||||
// // tr->Restart(processReleaseHandleCb);
|
||||
// void *handle = NULL;
|
||||
// SRpcMsg req = {0};
|
||||
// for (int i = 0; i < 1; i++) {
|
||||
// memset(&req, 0, sizeof(req));
|
||||
// req.info = resp.info;
|
||||
// req.info.persistHandle = 1;
|
||||
// req.msgType = 1;
|
||||
// req.pCont = rpcMallocCont(10);
|
||||
// req.contLen = 10;
|
||||
// tr->cliSendAndRecv(&req, &resp);
|
||||
// // tr->cliSendAndRecvNoHandle(&req, &resp);
|
||||
// EXPECT_TRUE(resp.code == 0);
|
||||
// }
|
||||
//////////////////
|
||||
}
|
||||
// reopen later
|
||||
// TEST_F(TransEnv, cliReleaseHandleExcept) {
|
||||
// SRpcMsg resp = {0};
|
||||
// SRpcMsg req = {0};
|
||||
// for (int i = 0; i < 3; i++) {
|
||||
// memset(&req, 0, sizeof(req));
|
||||
// req.info = resp.info;
|
||||
// req.info.persistHandle = 1;
|
||||
// req.info.ahandle = (void *)1234;
|
||||
// req.msgType = 1;
|
||||
// req.pCont = rpcMallocCont(10);
|
||||
// req.contLen = 10;
|
||||
// tr->cliSendAndRecv(&req, &resp);
|
||||
// if (i == 1) {
|
||||
// std::cout << "stop server" << std::endl;
|
||||
// tr->StopSrv();
|
||||
// }
|
||||
// if (i > 1) {
|
||||
// EXPECT_TRUE(resp.code != 0);
|
||||
// }
|
||||
// }
|
||||
// //////////////////
|
||||
//}
|
||||
TEST_F(TransEnv, srvContinueSend) {
|
||||
// tr->SetSrvContinueSend(processContinueSend);
|
||||
// SRpcMsg req = {0}, resp = {0};
|
||||
// for (int i = 0; i < 10; i++) {
|
||||
// // memset(&req, 0, sizeof(req));
|
||||
// // memset(&resp, 0, sizeof(resp));
|
||||
// // req.msgType = 1;
|
||||
// // req.pCont = rpcMallocCont(10);
|
||||
// // req.contLen = 10;
|
||||
// // tr->cliSendAndRecv(&req, &resp);
|
||||
// }
|
||||
// taosMsleep(1000);
|
||||
}
|
||||
|
||||
TEST_F(TransEnv, srvPersistHandleExcept) {
|
||||
// tr->SetSrvContinueSend(processContinueSend);
|
||||
// // tr->SetCliPersistFp(cliPersistHandle);
|
||||
// SRpcMsg resp = {0};
|
||||
// SRpcMsg req = {0};
|
||||
// for (int i = 0; i < 5; i++) {
|
||||
// // memset(&req, 0, sizeof(req));
|
||||
// // req.info = resp.info;
|
||||
// // req.msgType = 1;
|
||||
// // req.pCont = rpcMallocCont(10);
|
||||
// // req.contLen = 10;
|
||||
// // tr->cliSendAndRecv(&req, &resp);
|
||||
// // if (i > 2) {
|
||||
// // tr->StopCli();
|
||||
// // break;
|
||||
// //}
|
||||
// }
|
||||
// taosMsleep(2000);
|
||||
// conn broken
|
||||
//
|
||||
}
|
||||
TEST_F(TransEnv, cliPersistHandleExcept) {
|
||||
// tr->SetSrvContinueSend(processContinueSend);
|
||||
// SRpcMsg resp = {0};
|
||||
// SRpcMsg req = {0};
|
||||
// for (int i = 0; i < 5; i++) {
|
||||
// // memset(&req, 0, sizeof(req));
|
||||
// // req.info = resp.info;
|
||||
// // req.msgType = 1;
|
||||
// // req.pCont = rpcMallocCont(10);
|
||||
// // req.contLen = 10;
|
||||
// // tr->cliSendAndRecv(&req, &resp);
|
||||
// // if (i > 2) {
|
||||
// // tr->StopSrv();
|
||||
// // break;
|
||||
// //}
|
||||
// }
|
||||
// taosMsleep(2000);
|
||||
// // conn broken
|
||||
//
|
||||
}
|
||||
|
||||
TEST_F(TransEnv, multiCliPersistHandleExcept) {
|
||||
// conn broken
|
||||
}
|
||||
TEST_F(TransEnv, queryExcept) {
|
||||
//taosMsleep(4 * 1000);
|
||||
}
|
||||
TEST_F(TransEnv, idTest) {
|
||||
SRpcMsg resp = {0};
|
||||
SRpcMsg req = {0};
|
||||
for (int i = 0; i < 50000; i++) {
|
||||
memset(&req, 0, sizeof(req));
|
||||
req.info.noResp = 0;
|
||||
req.msgType = 3;
|
||||
req.pCont = rpcMallocCont(10);
|
||||
req.contLen = 10;
|
||||
int64_t id;
|
||||
tr->cliSendReqWithId(&req, &id);
|
||||
tr->cliFreeReqId(&id);
|
||||
}
|
||||
taosMsleep(1000);
|
||||
// no resp
|
||||
}
|
||||
TEST_F(TransEnv, noResp) {
|
||||
SRpcMsg resp = {0};
|
||||
SRpcMsg req = {0};
|
||||
for (int i = 0; i < 500000; i++) {
|
||||
memset(&req, 0, sizeof(req));
|
||||
req.info.noResp = 0;
|
||||
req.msgType = 3;
|
||||
req.pCont = rpcMallocCont(10);
|
||||
req.contLen = 10;
|
||||
tr->cliSendReq(&req);
|
||||
//tr->cliSendAndRecv(&req, &resp);
|
||||
}
|
||||
taosMsleep(100000);
|
||||
// no resp
|
||||
}
|
Loading…
Reference in New Issue