Merge pull request #28547 from taosdata/fix/fixInvalidWrite

opt transport
This commit is contained in:
Shengliang Guan 2024-10-30 20:26:08 +08:00 committed by GitHub
commit 5ed4bced28
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 682 additions and 80 deletions

View File

@ -353,6 +353,7 @@ typedef struct {
queue node;
void (*freeFunc)(void* arg);
int32_t size;
int8_t inited;
} STransQueue;
/*

View File

@ -127,10 +127,12 @@ typedef struct {
typedef struct SCliReq {
SReqCtx* ctx;
queue q;
queue sendQ;
STransMsgType type;
uint64_t st;
int64_t seq;
int32_t sent; //(0: no send, 1: alread sent)
int8_t inSendQ;
STransMsg msg;
int8_t inRetry;
@ -274,6 +276,8 @@ static FORCE_INLINE void destroyReqAndAhanlde(void* cmsg);
static FORCE_INLINE int cliRBChoseIdx(STrans* pInst);
static FORCE_INLINE void destroyReqCtx(SReqCtx* ctx);
static FORCE_INLINE void removeReqFromSendQ(SCliReq* pReq);
static int32_t cliHandleState_mayUpdateState(SCliConn* pConn, SCliReq* pReq);
static int32_t cliHandleState_mayHandleReleaseResp(SCliConn* conn, STransMsgHead* pHead);
static int32_t cliHandleState_mayCreateAhandle(SCliConn* conn, STransMsgHead* pHead, STransMsg* pResp);
@ -453,6 +457,7 @@ static bool filteBySeq(void* key, void* arg) {
SFiterArg* targ = arg;
SCliReq* pReq = QUEUE_DATA(key, SCliReq, q);
if (pReq->seq == targ->seq && pReq->msg.msgType + 1 == targ->msgType) {
removeReqFromSendQ(pReq);
return true;
} else {
return false;
@ -539,6 +544,7 @@ bool filterByQid(void* key, void* arg) {
SCliReq* pReq = QUEUE_DATA(key, SCliReq, q);
if (pReq->msg.info.qId == *qid) {
removeReqFromSendQ(pReq);
return true;
} else {
return false;
@ -600,7 +606,7 @@ int32_t cliHandleState_mayHandleReleaseResp(SCliConn* conn, STransMsgHead* pHead
queue* el = QUEUE_HEAD(&set);
QUEUE_REMOVE(el);
SCliReq* pReq = QUEUE_DATA(el, SCliReq, q);
removeReqFromSendQ(pReq);
STraceId* trace = &pReq->msg.info.traceId;
tGDebug("start to free msg %p", pReq);
destroyReqWrapper(pReq, pThrd);
@ -700,6 +706,7 @@ void cliHandleResp(SCliConn* conn) {
tstrerror(code));
}
}
removeReqFromSendQ(pReq);
code = cliBuildRespFromCont(pReq, &resp, pHead);
STraceId* trace = &resp.info.traceId;
@ -905,6 +912,10 @@ static void addConnToPool(void* pool, SCliConn* conn) {
}
SCliThrd* thrd = conn->hostThrd;
if (thrd->quit == true) {
return;
}
cliResetConnTimer(conn);
if (conn->list == NULL && conn->dstAddr != NULL) {
conn->list = taosHashGet((SHashObj*)pool, conn->dstAddr, strlen(conn->dstAddr));
@ -1092,6 +1103,7 @@ _failed:
transQueueDestroy(&conn->reqsToSend);
transQueueDestroy(&conn->reqsSentOut);
taosMemoryFree(conn->dstAddr);
taosMemoryFree(conn->ipStr);
}
tError("failed to create conn, code:%d", code);
taosMemoryFree(conn);
@ -1216,6 +1228,7 @@ static FORCE_INLINE void destroyReqInQueue(SCliConn* conn, queue* set, int32_t c
QUEUE_REMOVE(el);
SCliReq* pReq = QUEUE_DATA(el, SCliReq, q);
removeReqFromSendQ(pReq);
notifyAndDestroyReq(conn, pReq, code);
}
}
@ -1246,8 +1259,8 @@ static void cliHandleException(SCliConn* conn) {
}
cliDestroyAllQidFromThrd(conn);
QUEUE_REMOVE(&conn->q);
if (conn->list) {
if (pThrd->quit == false && conn->list) {
QUEUE_REMOVE(&conn->q);
conn->list->totalSize -= 1;
conn->list = NULL;
}
@ -1273,7 +1286,8 @@ static void cliHandleException(SCliConn* conn) {
bool filterToRmReq(void* h, void* arg) {
queue* el = h;
SCliReq* pReq = QUEUE_DATA(el, SCliReq, q);
if (pReq->sent == 1 && REQUEST_NO_RESP(&pReq->msg)) {
if (pReq->sent == 1 && pReq->inSendQ == 0 && REQUEST_NO_RESP(&pReq->msg)) {
removeReqFromSendQ(pReq);
return true;
}
return false;
@ -1300,12 +1314,18 @@ static void cliBatchSendCb(uv_write_t* req, int status) {
SCliThrd* pThrd = conn->hostThrd;
STrans* pInst = pThrd->pInst;
while (!QUEUE_IS_EMPTY(&wrapper->node)) {
queue* h = QUEUE_HEAD(&wrapper->node);
SCliReq* pReq = QUEUE_DATA(h, SCliReq, sendQ);
removeReqFromSendQ(pReq);
}
freeWReqToWQ(&conn->wq, wrapper);
int32_t ref = transUnrefCliHandle(conn);
if (ref <= 0) {
return;
}
cliConnRmReqs(conn);
if (status != 0) {
tDebug("%s conn %p failed to send msg since %s", CONN_GET_INST_LABEL(conn), conn, uv_err_name(status));
@ -1340,6 +1360,9 @@ bool cliConnMayAddUserInfo(SCliConn* pConn, STransMsgHead** ppHead, int32_t* msg
}
STransMsgHead* pHead = *ppHead;
STransMsgHead* tHead = taosMemoryCalloc(1, *msgLen + sizeof(pInst->user));
if (tHead == NULL) {
return false;
}
memcpy((char*)tHead, (char*)pHead, TRANS_MSG_OVERHEAD);
memcpy((char*)tHead + TRANS_MSG_OVERHEAD, pInst->user, sizeof(pInst->user));
@ -1398,6 +1421,10 @@ int32_t cliBatchSend(SCliConn* pConn, int8_t direct) {
int j = 0;
int32_t batchLimit = 64;
queue reqToSend;
QUEUE_INIT(&reqToSend);
while (!transQueueEmpty(&pConn->reqsToSend)) {
queue* h = transQueuePop(&pConn->reqsToSend);
SCliReq* pCliMsg = QUEUE_DATA(h, SCliReq, q);
@ -1422,6 +1449,10 @@ int32_t cliBatchSend(SCliConn* pConn, int8_t direct) {
if (cliConnMayAddUserInfo(pConn, &pHead, &msgLen)) {
content = transContFromHead(pHead);
contLen = transContLenFromMsg(msgLen);
} else {
if (pConn->userInited == 0) {
return terrno;
}
}
if (pHead->comp == 0) {
pHead->noResp = REQUEST_NO_RESP(pReq) ? 1 : 0;
@ -1447,30 +1478,51 @@ int32_t cliBatchSend(SCliConn* pConn, int8_t direct) {
wb[j++] = uv_buf_init((char*)pHead, msgLen);
totalLen += msgLen;
pCliMsg->sent = 1;
pCliMsg->seq = pConn->seq;
pCliMsg->sent = 1;
STraceId* trace = &pCliMsg->msg.info.traceId;
tGDebug("%s conn %p %s is sent to %s, local info:%s, seq:%" PRId64 ", sid:%" PRId64 "", CONN_GET_INST_LABEL(pConn),
pConn, TMSG_INFO(pReq->msgType), pConn->dst, pConn->src, pConn->seq, pReq->info.qId);
transQueuePush(&pConn->reqsSentOut, &pCliMsg->q);
QUEUE_INIT(&pCliMsg->sendQ);
QUEUE_PUSH(&reqToSend, &pCliMsg->sendQ);
pCliMsg->inSendQ = 1;
if (j >= batchLimit) {
break;
}
}
transRefCliHandle(pConn);
uv_write_t* req = allocWReqFromWQ(&pConn->wq, pConn);
if (req == NULL) {
tError("%s conn %p failed to send msg since %s", CONN_GET_INST_LABEL(pConn), pConn, tstrerror(terrno));
while (!QUEUE_IS_EMPTY(&reqToSend)) {
queue* h = QUEUE_HEAD(&reqToSend);
SCliReq* pCliMsg = QUEUE_DATA(h, SCliReq, sendQ);
removeReqFromSendQ(pCliMsg);
}
transRefCliHandle(pConn);
return terrno;
}
SWReqsWrapper* pWreq = req->data;
QUEUE_MOVE(&reqToSend, &pWreq->node);
tDebug("%s conn %p start to send msg, batch size:%d, len:%d", CONN_GET_INST_LABEL(pConn), pConn, j, totalLen);
int32_t ret = uv_write(req, (uv_stream_t*)pConn->stream, wb, j, cliBatchSendCb);
if (ret != 0) {
tError("%s conn %p failed to send msg since %s", CONN_GET_INST_LABEL(pConn), pConn, uv_err_name(ret));
while (!QUEUE_IS_EMPTY(&pWreq->node)) {
queue* h = QUEUE_HEAD(&pWreq->node);
SCliReq* pCliMsg = QUEUE_DATA(h, SCliReq, sendQ);
removeReqFromSendQ(pCliMsg);
}
freeWReqToWQ(&pConn->wq, req->data);
code = TSDB_CODE_THIRDPARTY_ERROR;
TAOS_UNUSED(transUnrefCliHandle(pConn));
@ -2182,11 +2234,21 @@ static void cliAsyncCb(uv_async_t* handle) {
if (pThrd->stopMsg != NULL) cliHandleQuit(pThrd, pThrd->stopMsg);
}
static FORCE_INLINE void removeReqFromSendQ(SCliReq* pReq) {
if (pReq == NULL || pReq->inSendQ == 0) {
return;
}
QUEUE_REMOVE(&pReq->sendQ);
pReq->inSendQ = 0;
}
static FORCE_INLINE void destroyReq(void* arg) {
SCliReq* pReq = arg;
if (pReq == NULL) {
return;
}
removeReqFromSendQ(pReq);
STraceId* trace = &pReq->msg.info.traceId;
tGDebug("free memory:%p, free ctx: %p", pReq, pReq->ctx);
@ -2961,6 +3023,7 @@ int32_t cliNotifyCb(SCliConn* pConn, SCliReq* pReq, STransMsg* pResp) {
STrans* pInst = pThrd->pInst;
if (pReq != NULL) {
removeReqFromSendQ(pReq);
if (pResp->code != TSDB_CODE_SUCCESS) {
if (cliMayRetry(pConn, pReq, pResp)) {
return TSDB_CODE_RPC_ASYNC_IN_PROCESS;
@ -3114,7 +3177,7 @@ static int32_t transInitMsg(void* pInstRef, const SEpSet* pEpSet, STransMsg* pRe
if (ctx != NULL) pCtx->userCtx = *ctx;
pCliReq = taosMemoryCalloc(1, sizeof(SCliReq));
if (pReq == NULL) {
if (pCliReq == NULL) {
TAOS_CHECK_GOTO(terrno, NULL, _exception);
}
@ -3183,6 +3246,7 @@ int32_t transSendRequestWithId(void* pInstRef, const SEpSet* pEpSet, STransMsg*
return TSDB_CODE_INVALID_PARA;
}
int32_t code = 0;
int8_t transIdInited = 0;
STrans* pInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)pInstRef);
if (pInst == NULL) {
@ -3200,6 +3264,7 @@ int32_t transSendRequestWithId(void* pInstRef, const SEpSet* pEpSet, STransMsg*
if (exh == NULL) {
TAOS_CHECK_GOTO(TSDB_CODE_RPC_MODULE_QUIT, NULL, _exception);
}
transIdInited = 1;
pReq->info.handle = (void*)(*transpointId);
pReq->info.qId = *transpointId;
@ -3216,9 +3281,6 @@ int32_t transSendRequestWithId(void* pInstRef, const SEpSet* pEpSet, STransMsg*
return (code == TSDB_CODE_RPC_ASYNC_MODULE_QUIT ? TSDB_CODE_RPC_MODULE_QUIT : code);
}
// if (pReq->msgType == TDMT_SCH_DROP_TASK) {
// TAOS_UNUSED(transReleaseCliHandle(pReq->info.handle));
// }
transReleaseExHandle(transGetRefMgt(), *transpointId);
transReleaseExHandle(transGetInstMgt(), (int64_t)pInstRef);
return 0;
@ -3226,6 +3288,7 @@ int32_t transSendRequestWithId(void* pInstRef, const SEpSet* pEpSet, STransMsg*
_exception:
transFreeMsg(pReq->pCont);
pReq->pCont = NULL;
if (transIdInited) transReleaseExHandle(transGetRefMgt(), *transpointId);
transReleaseExHandle(transGetInstMgt(), (int64_t)pInstRef);
tError("failed to send request since %s", tstrerror(code));
@ -3641,6 +3704,7 @@ bool filterTimeoutReq(void* key, void* arg) {
if (pReq->msg.info.qId == 0 && !REQUEST_NO_RESP(&pReq->msg) && pReq->ctx) {
int64_t elapse = ((st - pReq->st) / 1000000);
if (listArg && elapse >= listArg->pInst->readTimeout) {
removeReqFromSendQ(pReq);
return true;
} else {
return false;

View File

@ -423,6 +423,7 @@ int32_t transQueueInit(STransQueue* wq, void (*freeFunc)(void* arg)) {
QUEUE_INIT(&wq->node);
wq->freeFunc = (void (*)(void*))freeFunc;
wq->size = 0;
wq->inited = 1;
return 0;
}
void transQueuePush(STransQueue* q, void* arg) {
@ -497,6 +498,7 @@ void transQueueRemove(STransQueue* q, void* e) {
bool transQueueEmpty(STransQueue* q) { return q->size == 0 ? true : false; }
void transQueueClear(STransQueue* q) {
if (q->inited == 0) return;
while (!QUEUE_IS_EMPTY(&q->node)) {
queue* h = QUEUE_HEAD(&q->node);
QUEUE_REMOVE(h);

View File

@ -1289,8 +1289,8 @@ static FORCE_INLINE SSvrConn* createConn(void* hThrd) {
int32_t code = 0;
SWorkThrd* pThrd = hThrd;
int32_t lino;
SSvrConn* pConn = (SSvrConn*)taosMemoryCalloc(1, sizeof(SSvrConn));
int8_t wqInited = 0;
SSvrConn* pConn = (SSvrConn*)taosMemoryCalloc(1, sizeof(SSvrConn));
if (pConn == NULL) {
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _end);
}
@ -1340,6 +1340,7 @@ static FORCE_INLINE SSvrConn* createConn(void* hThrd) {
code = initWQ(&pConn->wq);
TAOS_CHECK_GOTO(code, &lino, _end);
wqInited = 1;
// init client handle
pConn->pTcp = (uv_tcp_t*)taosMemoryMalloc(sizeof(uv_tcp_t));
@ -1372,7 +1373,7 @@ _end:
transDestroyBuffer(&pConn->readBuf);
taosHashCleanup(pConn->pQTable);
taosMemoryFree(pConn->pTcp);
destroyWQ(&pConn->wq);
if (wqInited) destroyWQ(&pConn->wq);
taosMemoryFree(pConn->buf);
taosMemoryFree(pConn);
pConn = NULL;

View File

@ -1,5 +1,6 @@
add_executable(transportTest "")
add_executable(transUT "")
add_executable(transUT2 "")
add_executable(svrBench "")
add_executable(cliBench "")
add_executable(httpBench "")
@ -9,6 +10,10 @@ target_sources(transUT
"transUT.cpp"
)
target_sources(transUT2
PRIVATE
"transUT2.cpp"
)
target_sources(transportTest
PRIVATE
"transportTests.cpp"
@ -56,6 +61,20 @@ target_include_directories(transUT
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_link_libraries (transUT2
os
util
common
gtest_main
transport
)
target_include_directories(transUT2
PUBLIC
"${TD_SOURCE_DIR}/include/libs/transport"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_include_directories(svrBench
PUBLIC
"${TD_SOURCE_DIR}/include/libs/transport"

View File

@ -53,8 +53,6 @@ static void processResponse(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
tDebug("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, pMsg->msgType, pMsg->contLen,
pMsg->code);
if (pEpSet) pInfo->epSet = *pEpSet;
rpcFreeCont(pMsg->pCont);
tsem_post(&pInfo->rspSem);
}
@ -72,12 +70,12 @@ static void *sendRequest(void *param) {
rpcMsg.pCont = rpcMallocCont(pInfo->msgSize);
rpcMsg.contLen = pInfo->msgSize;
rpcMsg.info.ahandle = pInfo;
rpcMsg.info.noResp = 1;
rpcMsg.info.noResp = 0;
rpcMsg.msgType = 1;
tDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num);
rpcSendRequest(pInfo->pRpc, &pInfo->epSet, &rpcMsg, NULL);
if (pInfo->num % 20000 == 0) tInfo("thread:%d, %d requests have been sent", pInfo->index, pInfo->num);
// tsem_wait(&pInfo->rspSem);
tsem_wait(&pInfo->rspSem);
}
tDebug("thread:%d, it is over", pInfo->index);
@ -110,17 +108,15 @@ int main(int argc, char *argv[]) {
rpcInit.label = "APP";
rpcInit.numOfThreads = 1;
rpcInit.cfp = processResponse;
rpcInit.sessions = 100;
rpcInit.sessions = 1000;
rpcInit.idleTime = tsShellActivityTimer * 1000;
rpcInit.user = "michael";
rpcInit.connType = TAOS_CONN_CLIENT;
rpcInit.connLimitNum = 10;
rpcInit.connLimitLock = 1;
rpcInit.shareConnLimit = 16 * 1024;
rpcInit.shareConnLimit = tsShareConnLimit;
rpcInit.supportBatch = 1;
rpcDebugFlag = 135;
rpcInit.compressSize = -1;
rpcDebugFlag = 143;
for (int i = 1; i < argc; ++i) {
if (strcmp(argv[i], "-p") == 0 && i < argc - 1) {
} else if (strcmp(argv[i], "-i") == 0 && i < argc - 1) {
@ -139,6 +135,10 @@ int main(int argc, char *argv[]) {
} else if (strcmp(argv[i], "-u") == 0 && i < argc - 1) {
} else if (strcmp(argv[i], "-k") == 0 && i < argc - 1) {
} else if (strcmp(argv[i], "-spi") == 0 && i < argc - 1) {
} else if (strcmp(argv[i], "-l") == 0 && i < argc - 1) {
rpcInit.shareConnLimit = atoi(argv[++i]);
} else if (strcmp(argv[i], "-c") == 0 && i < argc - 1) {
rpcInit.compressSize = atoi(argv[++i]);
} else if (strcmp(argv[i], "-d") == 0 && i < argc - 1) {
rpcDebugFlag = atoi(argv[++i]);
} else {
@ -150,6 +150,8 @@ int main(int argc, char *argv[]) {
printf(" [-n requests]: number of requests per thread, default is:%d\n", numOfReqs);
printf(" [-u user]: user name for the connection, default is:%s\n", rpcInit.user);
printf(" [-d debugFlag]: debug flag, default:%d\n", rpcDebugFlag);
printf(" [-c compressSize]: compress size, default:%d\n", tsCompressMsgSize);
printf(" [-l shareConnLimit]: share conn limit, default:%d\n", tsShareConnLimit);
printf(" [-h help]: print out this help\n\n");
exit(0);
}
@ -168,18 +170,18 @@ int main(int argc, char *argv[]) {
int64_t now = taosGetTimestampUs();
SInfo *pInfo = (SInfo *)taosMemoryCalloc(1, sizeof(SInfo) * appThreads);
SInfo *p = pInfo;
SInfo **pInfo = (SInfo **)taosMemoryCalloc(1, sizeof(SInfo *) * appThreads);
for (int i = 0; i < appThreads; ++i) {
pInfo->index = i;
pInfo->epSet = epSet;
pInfo->numOfReqs = numOfReqs;
pInfo->msgSize = msgSize;
tsem_init(&pInfo->rspSem, 0, 0);
pInfo->pRpc = pRpc;
SInfo *p = taosMemoryCalloc(1, sizeof(SInfo));
p->index = i;
p->epSet = epSet;
p->numOfReqs = numOfReqs;
p->msgSize = msgSize;
tsem_init(&p->rspSem, 0, 0);
p->pRpc = pRpc;
pInfo[i] = p;
taosThreadCreate(&pInfo->thread, NULL, sendRequest, pInfo);
pInfo++;
taosThreadCreate(&p->thread, NULL, sendRequest, pInfo[i]);
}
do {
@ -192,12 +194,14 @@ int main(int argc, char *argv[]) {
tInfo("Performance: %.3f requests per second, msgSize:%d bytes", 1000.0 * numOfReqs * appThreads / usedTime, msgSize);
for (int i = 0; i < appThreads; i++) {
SInfo *pInfo = p;
taosThreadJoin(pInfo->thread, NULL);
p++;
SInfo *p = pInfo[i];
taosThreadJoin(p->thread, NULL);
taosMemoryFree(p);
}
int ch = getchar();
UNUSED(ch);
taosMemoryFree(pInfo);
// int ch = getchar();
// UNUSED(ch);
taosCloseLog();

View File

@ -76,23 +76,6 @@ void *processShellMsg(void *arg) {
for (int i = 0; i < numOfMsgs; ++i) {
taosGetQitem(qall, (void **)&pRpcMsg);
if (pDataFile != NULL) {
if (taosWriteFile(pDataFile, pRpcMsg->pCont, pRpcMsg->contLen) < 0) {
tInfo("failed to write data file, reason:%s", strerror(errno));
}
}
}
if (commit >= 2) {
num += numOfMsgs;
// if (taosFsync(pDataFile) < 0) {
// tInfo("failed to flush data to file, reason:%s", strerror(errno));
//}
if (num % 10000 == 0) {
tInfo("%d request have been written into disk", num);
}
}
taosResetQitems(qall);
@ -107,16 +90,7 @@ void *processShellMsg(void *arg) {
rpcMsg.code = 0;
rpcSendResponse(&rpcMsg);
void *handle = pRpcMsg->info.handle;
taosFreeQitem(pRpcMsg);
//{
// SRpcMsg nRpcMsg = {0};
// nRpcMsg.pCont = rpcMallocCont(msgSize);
// nRpcMsg.contLen = msgSize;
// nRpcMsg.info.handle = handle;
// nRpcMsg.code = TSDB_CODE_CTG_NOT_READY;
// rpcSendResponse(&nRpcMsg);
//}
}
taosUpdateItemSize(qinfo.queue, numOfMsgs);
@ -149,12 +123,13 @@ int main(int argc, char *argv[]) {
rpcInit.localPort = 7000;
memcpy(rpcInit.localFqdn, "localhost", strlen("localhost"));
rpcInit.label = "SER";
rpcInit.numOfThreads = 1;
rpcInit.numOfThreads = 10;
rpcInit.cfp = processRequestMsg;
rpcInit.idleTime = 2 * 1500;
taosVersionStrToInt(version, &(rpcInit.compatibilityVer));
rpcDebugFlag = 131;
rpcInit.compressSize = -1;
for (int i = 1; i < argc; ++i) {
if (strcmp(argv[i], "-p") == 0 && i < argc - 1) {
@ -205,8 +180,8 @@ int main(int argc, char *argv[]) {
if (pDataFile == NULL) tInfo("failed to open data file, reason:%s", strerror(errno));
}
int32_t numOfAthread = 5;
multiQ = taosMemoryMalloc(sizeof(numOfAthread));
int32_t numOfAthread = 1;
multiQ = taosMemoryMalloc(sizeof(MultiThreadQhandle));
multiQ->numOfThread = numOfAthread;
multiQ->qhandle = (STaosQueue **)taosMemoryMalloc(sizeof(STaosQueue *) * numOfAthread);
multiQ->qset = (STaosQset **)taosMemoryMalloc(sizeof(STaosQset *) * numOfAthread);
@ -221,11 +196,6 @@ int main(int argc, char *argv[]) {
threads[i].idx = i;
taosThreadCreate(&(threads[i].thread), NULL, processShellMsg, (void *)&threads[i]);
}
// qhandle = taosOpenQueue();
// qset = taosOpenQset();
// taosAddIntoQset(qset, qhandle, NULL);
// processShellMsg();
if (pDataFile != NULL) {
taosCloseFile(&pDataFile);

View File

@ -54,6 +54,7 @@ class Client {
rpcInit_.user = (char *)user;
rpcInit_.parent = this;
rpcInit_.connType = TAOS_CONN_CLIENT;
rpcInit_.shareConnLimit = 200;
taosVersionStrToInt(version, &(rpcInit_.compatibilityVer));
this->transCli = rpcOpen(&rpcInit_);
@ -85,6 +86,14 @@ class Client {
SemWait();
*resp = this->resp;
}
void sendReq(SRpcMsg *req) {
SEpSet epSet = {0};
epSet.inUse = 0;
addEpIntoEpSet(&epSet, "127.0.0.1", 7000);
rpcSendRequest(this->transCli, &epSet, req, NULL);
}
void SendAndRecvNoHandle(SRpcMsg *req, SRpcMsg *resp) {
if (req->info.handle != NULL) {
rpcReleaseHandle(req->info.handle, TAOS_CONN_CLIENT);
@ -160,6 +169,7 @@ static void processReq(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
rpcMsg.contLen = 100;
rpcMsg.info = pMsg->info;
rpcMsg.code = 0;
rpcFreeCont(pMsg->pCont);
rpcSendResponse(&rpcMsg);
}
@ -264,6 +274,7 @@ class TransObj {
cli->Stop();
}
void cliSendAndRecv(SRpcMsg *req, SRpcMsg *resp) { cli->SendAndRecv(req, resp); }
void cliSendReq(SRpcMsg *req) { cli->sendReq(req); }
void cliSendAndRecvNoHandle(SRpcMsg *req, SRpcMsg *resp) { cli->SendAndRecvNoHandle(req, resp); }
~TransObj() {
@ -492,15 +503,16 @@ TEST_F(TransEnv, queryExcept) {
TEST_F(TransEnv, noResp) {
SRpcMsg resp = {0};
SRpcMsg req = {0};
// for (int i = 0; i < 5; i++) {
// memset(&req, 0, sizeof(req));
// req.info.noResp = 1;
// req.msgType = 1;
// req.pCont = rpcMallocCont(10);
// req.contLen = 10;
// tr->cliSendAndRecv(&req, &resp);
//}
// taosMsleep(2000);
for (int i = 0; i < 500000; i++) {
memset(&req, 0, sizeof(req));
req.info.noResp = 1;
req.msgType = 3;
req.pCont = rpcMallocCont(10);
req.contLen = 10;
tr->cliSendReq(&req);
//tr->cliSendAndRecv(&req, &resp);
}
taosMsleep(2000);
// no resp
}

View File

@ -0,0 +1,529 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free
* Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <gtest/gtest.h>
#include <cstdio>
#include <cstring>
#include "tdatablock.h"
#include "tglobal.h"
#include "tlog.h"
#include "tmisce.h"
#include "transLog.h"
#include "trpc.h"
#include "tversion.h"
using namespace std;
const char *label = "APP";
const char *secret = "secret";
const char *user = "user";
const char *ckey = "ckey";
class Server;
int port = 7000;
// server process
// server except
typedef void (*CB)(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet);
static void processContinueSend(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet);
static void processReleaseHandleCb(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet);
static void processRegisterFailure(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet);
static void processReq(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet);
// client process;
static void processResp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet);
class Client {
public:
void Init(int nThread) {
memcpy(tsTempDir, TD_TMP_DIR_PATH, strlen(TD_TMP_DIR_PATH));
memset(&rpcInit_, 0, sizeof(rpcInit_));
rpcInit_.localPort = 0;
rpcInit_.label = (char *)"client";
rpcInit_.numOfThreads = nThread;
rpcInit_.cfp = processResp;
rpcInit_.user = (char *)user;
rpcInit_.parent = this;
rpcInit_.connType = TAOS_CONN_CLIENT;
rpcInit_.shareConnLimit = 200;
taosVersionStrToInt(version, &(rpcInit_.compatibilityVer));
this->transCli = rpcOpen(&rpcInit_);
//tsem_init(&this->sem, 0, 0);
}
void SetResp(SRpcMsg *pMsg) {
// set up resp;
this->resp = *pMsg;
}
SRpcMsg *Resp() { return &this->resp; }
void Restart(CB cb) {
rpcClose(this->transCli);
rpcInit_.cfp = cb;
taosVersionStrToInt(version, &(rpcInit_.compatibilityVer));
this->transCli = rpcOpen(&rpcInit_);
}
void Stop() {
rpcClose(this->transCli);
this->transCli = NULL;
}
void SendAndRecv(SRpcMsg *req, SRpcMsg *resp) {
SEpSet epSet = {0};
epSet.inUse = 0;
addEpIntoEpSet(&epSet, "127.0.0.1", 7000);
rpcSendRequest(this->transCli, &epSet, req, NULL);
SemWait();
*resp = this->resp;
}
void sendReq(SRpcMsg *req) {
SEpSet epSet = {0};
epSet.inUse = 0;
addEpIntoEpSet(&epSet, "127.0.0.1", 7000);
rpcSendRequest(this->transCli, &epSet, req, NULL);
}
void sendReqWithId(SRpcMsg *req, int64_t *id) {
SEpSet epSet = {0};
epSet.inUse = 0;
addEpIntoEpSet(&epSet, "127.0.0.1",7000);
rpcSendRequestWithCtx(this->transCli, &epSet, req, id, NULL);
}
void freeId(int64_t *id) {
rpcFreeConnById(this->transCli, *id);
}
void SendAndRecvNoHandle(SRpcMsg *req, SRpcMsg *resp) {
if (req->info.handle != NULL) {
rpcReleaseHandle(req->info.handle, TAOS_CONN_CLIENT);
req->info.handle = NULL;
}
SendAndRecv(req, resp);
}
void SemWait() { tsem_wait(&this->sem); }
void SemPost() { tsem_post(&this->sem); }
void Reset() {}
~Client() {
if (this->transCli) rpcClose(this->transCli);
}
private:
tsem_t sem;
SRpcInit rpcInit_;
void *transCli;
SRpcMsg resp;
};
class Server {
public:
Server() {
memcpy(tsTempDir, TD_TMP_DIR_PATH, strlen(TD_TMP_DIR_PATH));
memset(&rpcInit_, 0, sizeof(rpcInit_));
memcpy(rpcInit_.localFqdn, "localhost", strlen("localhost"));
rpcInit_.localPort = port;
rpcInit_.label = (char *)"server";
rpcInit_.numOfThreads = 5;
rpcInit_.cfp = processReq;
rpcInit_.user = (char *)user;
rpcInit_.connType = TAOS_CONN_SERVER;
taosVersionStrToInt(version, &(rpcInit_.compatibilityVer));
}
void Start() {
this->transSrv = rpcOpen(&this->rpcInit_);
taosMsleep(1000);
}
void SetSrvContinueSend(CB cb) {
this->Stop();
rpcInit_.cfp = cb;
this->Start();
}
void Stop() {
if (this->transSrv == NULL) return;
rpcClose(this->transSrv);
this->transSrv = NULL;
}
void SetSrvSend(void (*cfp)(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet)) {
this->Stop();
rpcInit_.cfp = cfp;
this->Start();
}
void Restart() {
this->Stop();
this->Start();
}
~Server() {
if (this->transSrv) rpcClose(this->transSrv);
this->transSrv = NULL;
}
private:
SRpcInit rpcInit_;
void *transSrv;
};
static void processReq(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = rpcMallocCont(100);
rpcMsg.contLen = 100;
rpcMsg.info = pMsg->info;
rpcMsg.code = 0;
rpcFreeCont(pMsg->pCont);
rpcSendResponse(&rpcMsg);
}
static void processContinueSend(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
// for (int i = 0; i < 10; i++) {
// SRpcMsg rpcMsg = {0};
// rpcMsg.pCont = rpcMallocCont(100);
// rpcMsg.contLen = 100;
// rpcMsg.info = pMsg->info;
// rpcMsg.code = 0;
// rpcSendResponse(&rpcMsg);
// }
}
static void processReleaseHandleCb(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = rpcMallocCont(100);
rpcMsg.contLen = 100;
rpcMsg.info = pMsg->info;
rpcMsg.code = 0;
rpcSendResponse(&rpcMsg);
rpcReleaseHandle(&pMsg->info, TAOS_CONN_SERVER);
}
static void processRegisterFailure(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
// {
// SRpcMsg rpcMsg1 = {0};
// rpcMsg1.pCont = rpcMallocCont(100);
// rpcMsg1.contLen = 100;
// rpcMsg1.info = pMsg->info;
// rpcMsg1.code = 0;
// rpcRegisterBrokenLinkArg(&rpcMsg1);
// }
// taosMsleep(10);
// SRpcMsg rpcMsg = {0};
// rpcMsg.pCont = rpcMallocCont(100);
// rpcMsg.contLen = 100;
// rpcMsg.info = pMsg->info;
// rpcMsg.code = 0;
// rpcSendResponse(&rpcMsg);
}
// client process;
static void processResp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
Client *client = (Client *)parent;
rpcFreeCont(pMsg->pCont);
STraceId *trace = (STraceId *)&pMsg->info.traceId;
tGDebug("received resp %s",tstrerror(pMsg->code));
}
static void initEnv() {
dDebugFlag = 143;
vDebugFlag = 0;
mDebugFlag = 143;
cDebugFlag = 0;
jniDebugFlag = 0;
tmrDebugFlag = 143;
uDebugFlag = 143;
rpcDebugFlag = 143;
qDebugFlag = 0;
wDebugFlag = 0;
sDebugFlag = 0;
tsdbDebugFlag = 0;
tsLogEmbedded = 1;
tsAsyncLog = 0;
std::string path = TD_TMP_DIR_PATH "transport";
// taosRemoveDir(path.c_str());
taosMkDir(path.c_str());
tstrncpy(tsLogDir, path.c_str(), PATH_MAX);
if (taosInitLog("taosdlog", 1, false) != 0) {
printf("failed to init log file\n");
}
}
class TransObj {
public:
TransObj() {
initEnv();
cli = new Client;
cli->Init(1);
srv = new Server;
srv->Start();
}
void RestartCli(CB cb) {
//
cli->Restart(cb);
}
void StopSrv() {
//
srv->Stop();
}
// call when link broken, and notify query or fetch stop
void SetSrvContinueSend(void (*cfp)(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet)) {
///////
srv->SetSrvContinueSend(cfp);
}
void RestartSrv() { srv->Restart(); }
void StopCli() {
///////
cli->Stop();
}
void cliSendAndRecv(SRpcMsg *req, SRpcMsg *resp) { cli->SendAndRecv(req, resp); }
void cliSendReq(SRpcMsg *req) { cli->sendReq(req); }
void cliSendReqWithId(SRpcMsg *req, int64_t *id) { cli->sendReqWithId(req, id);}
void cliFreeReqId(int64_t *id) { cli->freeId(id);}
void cliSendAndRecvNoHandle(SRpcMsg *req, SRpcMsg *resp) { cli->SendAndRecvNoHandle(req, resp); }
~TransObj() {
delete cli;
delete srv;
}
private:
Client *cli;
Server *srv;
};
class TransEnv : public ::testing::Test {
protected:
virtual void SetUp() {
// set up trans obj
tr = new TransObj();
}
virtual void TearDown() {
// tear down
delete tr;
}
TransObj *tr = NULL;
};
TEST_F(TransEnv, 01sendAndRec) {
// for (int i = 0; i < 10; i++) {
// SRpcMsg req = {0}, resp = {0};
// req.msgType = 0;
// req.pCont = rpcMallocCont(10);
// req.contLen = 10;
// tr->cliSendAndRecv(&req, &resp);
// assert(resp.code == 0);
// }
}
TEST_F(TransEnv, 02StopServer) {
// for (int i = 0; i < 1; i++) {
// SRpcMsg req = {0}, resp = {0};
// req.msgType = 0;
// req.info.ahandle = (void *)0x35;
// req.pCont = rpcMallocCont(10);
// req.contLen = 10;
// tr->cliSendAndRecv(&req, &resp);
// assert(resp.code == 0);
// }
// SRpcMsg req = {0}, resp = {0};
// req.info.ahandle = (void *)0x35;
// req.msgType = 1;
// req.pCont = rpcMallocCont(10);
// req.contLen = 10;
// tr->StopSrv();
// // tr->RestartSrv();
// tr->cliSendAndRecv(&req, &resp);
// assert(resp.code != 0);
}
TEST_F(TransEnv, clientUserDefined) {
// tr->RestartSrv();
// for (int i = 0; i < 10; i++) {
// SRpcMsg req = {0}, resp = {0};
// req.msgType = 0;
// req.pCont = rpcMallocCont(10);
// req.contLen = 10;
// tr->cliSendAndRecv(&req, &resp);
// assert(resp.code == 0);
// }
//////////////////
}
TEST_F(TransEnv, cliPersistHandle) {
// SRpcMsg resp = {0};
// void *handle = NULL;
// for (int i = 0; i < 10; i++) {
// SRpcMsg req = {0};
// req.info = resp.info;
// req.info.persistHandle = 1;
// req.msgType = 1;
// req.pCont = rpcMallocCont(10);
// req.contLen = 10;
// tr->cliSendAndRecv(&req, &resp);
// // if (i == 5) {
// // std::cout << "stop server" << std::endl;
// // tr->StopSrv();
// //}
// // if (i >= 6) {
// // EXPECT_TRUE(resp.code != 0);
// //}
// handle = resp.info.handle;
// }
// rpcReleaseHandle(handle, TAOS_CONN_CLIENT);
// for (int i = 0; i < 10; i++) {
// SRpcMsg req = {0};
// req.msgType = 1;
// req.pCont = rpcMallocCont(10);
// req.contLen = 10;
// tr->cliSendAndRecv(&req, &resp);
// }
// taosMsleep(1000);
//////////////////
}
TEST_F(TransEnv, srvReleaseHandle) {
// SRpcMsg resp = {0};
// tr->SetSrvContinueSend(processReleaseHandleCb);
// // tr->Restart(processReleaseHandleCb);
// void *handle = NULL;
// SRpcMsg req = {0};
// for (int i = 0; i < 1; i++) {
// memset(&req, 0, sizeof(req));
// req.info = resp.info;
// req.info.persistHandle = 1;
// req.msgType = 1;
// req.pCont = rpcMallocCont(10);
// req.contLen = 10;
// tr->cliSendAndRecv(&req, &resp);
// // tr->cliSendAndRecvNoHandle(&req, &resp);
// EXPECT_TRUE(resp.code == 0);
// }
//////////////////
}
// reopen later
// TEST_F(TransEnv, cliReleaseHandleExcept) {
// SRpcMsg resp = {0};
// SRpcMsg req = {0};
// for (int i = 0; i < 3; i++) {
// memset(&req, 0, sizeof(req));
// req.info = resp.info;
// req.info.persistHandle = 1;
// req.info.ahandle = (void *)1234;
// req.msgType = 1;
// req.pCont = rpcMallocCont(10);
// req.contLen = 10;
// tr->cliSendAndRecv(&req, &resp);
// if (i == 1) {
// std::cout << "stop server" << std::endl;
// tr->StopSrv();
// }
// if (i > 1) {
// EXPECT_TRUE(resp.code != 0);
// }
// }
// //////////////////
//}
TEST_F(TransEnv, srvContinueSend) {
// tr->SetSrvContinueSend(processContinueSend);
// SRpcMsg req = {0}, resp = {0};
// for (int i = 0; i < 10; i++) {
// // memset(&req, 0, sizeof(req));
// // memset(&resp, 0, sizeof(resp));
// // req.msgType = 1;
// // req.pCont = rpcMallocCont(10);
// // req.contLen = 10;
// // tr->cliSendAndRecv(&req, &resp);
// }
// taosMsleep(1000);
}
TEST_F(TransEnv, srvPersistHandleExcept) {
// tr->SetSrvContinueSend(processContinueSend);
// // tr->SetCliPersistFp(cliPersistHandle);
// SRpcMsg resp = {0};
// SRpcMsg req = {0};
// for (int i = 0; i < 5; i++) {
// // memset(&req, 0, sizeof(req));
// // req.info = resp.info;
// // req.msgType = 1;
// // req.pCont = rpcMallocCont(10);
// // req.contLen = 10;
// // tr->cliSendAndRecv(&req, &resp);
// // if (i > 2) {
// // tr->StopCli();
// // break;
// //}
// }
// taosMsleep(2000);
// conn broken
//
}
TEST_F(TransEnv, cliPersistHandleExcept) {
// tr->SetSrvContinueSend(processContinueSend);
// SRpcMsg resp = {0};
// SRpcMsg req = {0};
// for (int i = 0; i < 5; i++) {
// // memset(&req, 0, sizeof(req));
// // req.info = resp.info;
// // req.msgType = 1;
// // req.pCont = rpcMallocCont(10);
// // req.contLen = 10;
// // tr->cliSendAndRecv(&req, &resp);
// // if (i > 2) {
// // tr->StopSrv();
// // break;
// //}
// }
// taosMsleep(2000);
// // conn broken
//
}
TEST_F(TransEnv, multiCliPersistHandleExcept) {
// conn broken
}
TEST_F(TransEnv, queryExcept) {
//taosMsleep(4 * 1000);
}
TEST_F(TransEnv, idTest) {
SRpcMsg resp = {0};
SRpcMsg req = {0};
for (int i = 0; i < 50000; i++) {
memset(&req, 0, sizeof(req));
req.info.noResp = 0;
req.msgType = 3;
req.pCont = rpcMallocCont(10);
req.contLen = 10;
int64_t id;
tr->cliSendReqWithId(&req, &id);
tr->cliFreeReqId(&id);
}
taosMsleep(1000);
// no resp
}
TEST_F(TransEnv, noResp) {
SRpcMsg resp = {0};
SRpcMsg req = {0};
for (int i = 0; i < 500000; i++) {
memset(&req, 0, sizeof(req));
req.info.noResp = 0;
req.msgType = 3;
req.pCont = rpcMallocCont(10);
req.contLen = 10;
tr->cliSendReq(&req);
//tr->cliSendAndRecv(&req, &resp);
}
taosMsleep(10000);
// no resp
}