From 781e869e4e716f72503665cc37bc3a78d20f24ae Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sat, 2 Apr 2022 18:11:45 +0800 Subject: [PATCH 1/2] handle except --- source/libs/transport/test/rclient.c | 45 +++++++++++++++------------- source/libs/transport/test/rserver.c | 7 ++++- 2 files changed, 31 insertions(+), 21 deletions(-) diff --git a/source/libs/transport/test/rclient.c b/source/libs/transport/test/rclient.c index 7d3c3aa012..cdf0908167 100644 --- a/source/libs/transport/test/rclient.c +++ b/source/libs/transport/test/rclient.c @@ -23,20 +23,20 @@ #include "tutil.h" typedef struct { - int index; - SEpSet epSet; - int num; - int numOfReqs; - int msgSize; - tsem_t rspSem; - tsem_t * pOverSem; + int index; + SEpSet epSet; + int num; + int numOfReqs; + int msgSize; + tsem_t rspSem; + tsem_t * pOverSem; TdThread thread; - void * pRpc; + void * pRpc; } SInfo; static void processResponse(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) { SInfo *pInfo = (SInfo *)pMsg->ahandle; - tDebug("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, pMsg->msgType, pMsg->contLen, - pMsg->code); + // tError("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, pMsg->msgType, pMsg->contLen, + // pMsg->code); if (pEpSet) pInfo->epSet = *pEpSet; @@ -51,9 +51,9 @@ static void *sendRequest(void *param) { SInfo * pInfo = (SInfo *)param; SRpcMsg rpcMsg = {0}; - tDebug("thread:%d, start to send request", pInfo->index); + tError("thread:%d, start to send request", pInfo->index); - tDebug("thread:%d, reqs: %d", pInfo->index, pInfo->numOfReqs); + tError("thread:%d, reqs: %d", pInfo->index, pInfo->numOfReqs); int u100 = 0; int u500 = 0; int u1000 = 0; @@ -68,7 +68,7 @@ static void *sendRequest(void *param) { // tDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num); int64_t start = taosGetTimestampUs(); rpcSendRequest(pInfo->pRpc, &pInfo->epSet, &rpcMsg, NULL); - if (pInfo->num % 20000 == 0) tInfo("thread:%d, %d requests have been sent", pInfo->index, pInfo->num); + if (pInfo->num % 20000 == 0) tError("thread:%d, %d requests have been sent", pInfo->index, pInfo->num); // tsem_wait(&pInfo->rspSem); tsem_wait(&pInfo->rspSem); int64_t end = taosGetTimestampUs() - start; @@ -88,7 +88,7 @@ static void *sendRequest(void *param) { } tError("send and recv sum: %d, %d, %d, %d", u100, u500, u1000, u10000); - tDebug("thread:%d, it is over", pInfo->index); + tError("thread:%d, it is over", pInfo->index); tcount++; return NULL; @@ -104,7 +104,7 @@ int main(int argc, char *argv[]) { char secret[20] = "mypassword"; struct timeval systemTime; int64_t startTime, endTime; - TdThreadAttr thattr; + TdThreadAttr thattr; // server info epSet.inUse = 0; @@ -124,7 +124,7 @@ int main(int argc, char *argv[]) { rpcInit.ckey = "key"; rpcInit.spi = 1; rpcInit.connType = TAOS_CONN_CLIENT; - rpcDebugFlag = 143; + rpcDebugFlag = 131; for (int i = 1; i < argc; ++i) { if (strcmp(argv[i], "-p") == 0 && i < argc - 1) { @@ -170,6 +170,10 @@ int main(int argc, char *argv[]) { } } + const char *path = "/tmp/transport/client"; + taosRemoveDir(path); + taosMkDir(path); + tstrncpy(tsLogDir, path, PATH_MAX); taosInitLog("client.log", 10); void *pRpc = rpcOpen(&rpcInit); @@ -178,8 +182,8 @@ int main(int argc, char *argv[]) { return -1; } - tInfo("client is initialized"); - tInfo("threads:%d msgSize:%d requests:%d", appThreads, msgSize, numOfReqs); + tError("client is initialized"); + tError("threads:%d msgSize:%d requests:%d", appThreads, msgSize, numOfReqs); taosGetTimeOfDay(&systemTime); startTime = systemTime.tv_sec * 1000000 + systemTime.tv_usec; @@ -208,8 +212,9 @@ int main(int argc, char *argv[]) { endTime = systemTime.tv_sec * 1000000 + systemTime.tv_usec; float usedTime = (endTime - startTime) / 1000.0f; // mseconds - tInfo("it takes %.3f mseconds to send %d requests to server", usedTime, numOfReqs * appThreads); - tInfo("Performance: %.3f requests per second, msgSize:%d bytes", 1000.0 * numOfReqs * appThreads / usedTime, msgSize); + tError("it takes %.3f mseconds to send %d requests to server", usedTime, numOfReqs * appThreads); + tError("Performance: %.3f requests per second, msgSize:%d bytes", 1000.0 * numOfReqs * appThreads / usedTime, + msgSize); int ch = getchar(); UNUSED(ch); diff --git a/source/libs/transport/test/rserver.c b/source/libs/transport/test/rserver.c index 3a086371b0..8ed3bbc960 100644 --- a/source/libs/transport/test/rserver.c +++ b/source/libs/transport/test/rserver.c @@ -125,7 +125,7 @@ int main(int argc, char *argv[]) { rpcInit.idleTime = 2 * 1500; rpcInit.afp = retrieveAuthInfo; - rpcDebugFlag = 143; + rpcDebugFlag = 131; for (int i = 1; i < argc; ++i) { if (strcmp(argv[i], "-p") == 0 && i < argc - 1) { @@ -160,6 +160,11 @@ int main(int argc, char *argv[]) { tsAsyncLog = 0; rpcInit.connType = TAOS_CONN_SERVER; + + const char *path = "/tmp/transport/server"; + taosRemoveDir(path); + taosMkDir(path); + tstrncpy(tsLogDir, path, PATH_MAX); taosInitLog("server.log", 10); void *pRpc = rpcOpen(&rpcInit); From ac82b91ed17759cdc5ef819e8877ffae0d32a3e2 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sat, 2 Apr 2022 23:00:12 +0800 Subject: [PATCH 2/2] handle except --- source/libs/transport/src/transSrv.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index dfb5eb35d6..79c72b3a35 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -193,6 +193,7 @@ static void uvHandleReq(SSrvConn* pConn) { transMsg.ahandle = (void*)pHead->ahandle; transMsg.handle = NULL; + // transDestroyBuffer(&pConn->readBuf); transClearBuffer(&pConn->readBuf); pConn->inType = pHead->msgType; if (pConn->status == ConnNormal) { @@ -249,6 +250,7 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { conn->broken = true; if (conn->status == ConnAcquire) { if (conn->regArg.init) { + tTrace("server conn %p broken, notify server app", conn); STrans* pTransInst = conn->pTransInst; (*pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL); memset(&conn->regArg, 0, sizeof(conn->regArg)); @@ -270,7 +272,7 @@ void uvOnTimeoutCb(uv_timer_t* handle) { void uvOnSendCb(uv_write_t* req, int status) { SSrvConn* conn = req->data; - transClearBuffer(&conn->readBuf); + // transClearBuffer(&conn->readBuf); if (status == 0) { tTrace("server conn %p data already was written on stream", conn); if (!transQueueEmpty(&conn->srvMsgs)) {