diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index 5afafa08a3..f913ba06d0 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -29,6 +29,8 @@ extern "C" { extern int tsRpcHeadSize; +typedef struct SRpcPush SRpcPush; + typedef struct SRpcConnInfo { uint32_t clientIp; uint16_t clientPort; @@ -43,8 +45,17 @@ typedef struct SRpcMsg { int32_t code; void * handle; // rpc handle returned to app void * ahandle; // app handle set by client + int persist; // keep handle or not, default 0 + + SRpcPush *push; + } SRpcMsg; +typedef struct SRpcPush { + void *arg; + int (*callback)(void *arg, SRpcMsg *rpcMsg); +} SRpcPush; + typedef struct SRpcInit { uint16_t localPort; // local port char * label; // for debug purpose @@ -83,6 +94,7 @@ int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo); void rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp); int rpcReportProgress(void *pConn, char *pCont, int contLen); void rpcCancelRequest(int64_t rid); + #ifdef __cplusplus } #endif diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index decd0af484..29770831fa 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -27,12 +27,16 @@ typedef struct SCliConn { SConnBuffer readBuf; void* data; queue conn; - char spi; - char secured; uint64_t expireTime; int8_t notifyCount; // timers already notify to client - int32_t ref; + SRpcPush* push; + int persist; // + // spi configure + char spi; + char secured; + int32_t ref; + // debug and log info struct sockaddr_in addr; } SCliConn; @@ -128,13 +132,18 @@ static void clientHandleResp(SCliConn* conn) { tDebug("client conn %p %s received from %s:%d", conn, TMSG_INFO(pHead->msgType), inet_ntoa(conn->addr.sin_addr), ntohs(conn->addr.sin_port)); - if (pCtx->pSem == NULL) { - tTrace("client conn(sync) %p handle resp", conn); - (pRpc->cfp)(pRpc->parent, &rpcMsg, NULL); + + if (conn->push != NULL && conn->notifyCount != 0) { + (*conn->push->callback)(conn->push->arg, &rpcMsg); } else { - tTrace("client conn(sync) %p handle resp", conn); - memcpy((char*)pCtx->pRsp, (char*)&rpcMsg, sizeof(rpcMsg)); - tsem_post(pCtx->pSem); + if (pCtx->pSem == NULL) { + tTrace("client conn(sync) %p handle resp", conn); + (pRpc->cfp)(pRpc->parent, &rpcMsg, NULL); + } else { + tTrace("client conn(sync) %p handle resp", conn); + memcpy((char*)pCtx->pRsp, (char*)&rpcMsg, sizeof(rpcMsg)); + tsem_post(pCtx->pSem); + } } conn->notifyCount += 1; @@ -144,7 +153,10 @@ static void clientHandleResp(SCliConn* conn) { uv_read_start((uv_stream_t*)conn->stream, clientAllocBufferCb, clientReadCb); SCliThrdObj* pThrd = conn->hostThrd; - addConnToPool(pThrd->pool, pCtx->ip, pCtx->port, conn); + // user owns conn->persist = 1 + if (conn->push != NULL) { + addConnToPool(pThrd->pool, pCtx->ip, pCtx->port, conn); + } destroyCmsg(pMsg); conn->data = NULL; @@ -154,7 +166,7 @@ static void clientHandleResp(SCliConn* conn) { } } static void clientHandleExcept(SCliConn* pConn) { - if (pConn->data == NULL) { + if (pConn->data == NULL && pConn->push == NULL) { // handle conn except in conn pool clientConnDestroy(pConn, true); return; @@ -162,20 +174,25 @@ static void clientHandleExcept(SCliConn* pConn) { tTrace("client conn %p start to destroy", pConn); SCliMsg* pMsg = pConn->data; - destroyUserdata(&pMsg->msg); - STransConnCtx* pCtx = pMsg->ctx; SRpcMsg rpcMsg = {0}; rpcMsg.ahandle = pCtx->ahandle; rpcMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL; - if (pCtx->pSem == NULL) { - // SRpcInfo* pRpc = pMsg->ctx->pRpc; - (pCtx->pTransInst->cfp)(pCtx->pTransInst->parent, &rpcMsg, NULL); + + if (pConn->push != NULL && pConn->notifyCount != 0) { + (*pConn->push->callback)(pConn->push->arg, &rpcMsg); } else { - memcpy((char*)(pCtx->pRsp), (char*)(&rpcMsg), sizeof(rpcMsg)); - // SRpcMsg rpcMsg - tsem_post(pCtx->pSem); + if (pCtx->pSem == NULL) { + (pCtx->pTransInst->cfp)(pCtx->pTransInst->parent, &rpcMsg, NULL); + } else { + memcpy((char*)(pCtx->pRsp), (char*)(&rpcMsg), sizeof(rpcMsg)); + // SRpcMsg rpcMsg + tsem_post(pCtx->pSem); + } + if (pConn->push != NULL) { + (*pConn->push->callback)(pConn->push->arg, &rpcMsg); + } } destroyCmsg(pMsg); @@ -411,6 +428,10 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { tTrace("client msg tran time cost: %" PRIu64 "", el); et = taosGetTimestampUs(); + // if (pMsg->msg.handle != NULL) { + // // handle + //} + STransConnCtx* pCtx = pMsg->ctx; SCliConn* conn = getConnFromPool(pThrd->pool, pCtx->ip, pCtx->port); if (conn != NULL) { @@ -426,6 +447,8 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { } clientWrite(conn); + conn->push = pMsg->msg.push; + } else { SCliConn* conn = calloc(1, sizeof(SCliConn)); conn->ref++; @@ -444,6 +467,8 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { conn->data = pMsg; conn->hostThrd = pThrd; + conn->push = pMsg->msg.push; + struct sockaddr_in addr; uv_ip4_addr(pMsg->ctx->ip, pMsg->ctx->port, &addr); // handle error in callback if fail to connect diff --git a/source/libs/transport/test/pushClient.c b/source/libs/transport/test/pushClient.c new file mode 100644 index 0000000000..2756eb4666 --- /dev/null +++ b/source/libs/transport/test/pushClient.c @@ -0,0 +1,239 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +#include + +#include +#include "os.h" +#include "rpcLog.h" +#include "taoserror.h" +#include "tglobal.h" +#include "trpc.h" +#include "tutil.h" + +typedef struct { + int index; + SEpSet epSet; + int num; + int numOfReqs; + int msgSize; + tsem_t rspSem; + tsem_t * pOverSem; + pthread_t thread; + void * pRpc; +} SInfo; +static void processResponse(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) { + SInfo *pInfo = (SInfo *)pMsg->ahandle; + tDebug("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, pMsg->msgType, pMsg->contLen, + pMsg->code); + + if (pEpSet) pInfo->epSet = *pEpSet; + + rpcFreeCont(pMsg->pCont); + // tsem_post(&pInfo->rspSem); + tsem_post(&pInfo->rspSem); +} + +static int tcount = 0; + +typedef struct SPushArg { + tsem_t sem; + +} SPushArg; + +int pushCallback(void *arg, SRpcMsg *msg) { + SPushArg *push = arg; + tsem_post(&push->sem); +} +SRpcPush *createPushArg() { + SRpcPush *push = calloc(1, sizeof(SRpcPush)); + push->arg = calloc(1, sizeof(SPushArg)); + tsem_init(&push->arg->sem, 0, 0); + push->callback = pushCallback; + return push; +} +static void *sendRequest(void *param) { + SInfo * pInfo = (SInfo *)param; + SRpcMsg rpcMsg = {0}; + + tDebug("thread:%d, start to send request", pInfo->index); + + tDebug("thread:%d, reqs: %d", pInfo->index, pInfo->numOfReqs); + int u100 = 0; + int u500 = 0; + int u1000 = 0; + int u10000 = 0; + + while (pInfo->numOfReqs == 0 || pInfo->num < pInfo->numOfReqs) { + SRpcPush *push = createPushArg(); + pInfo->num++; + rpcMsg.pCont = rpcMallocCont(pInfo->msgSize); + rpcMsg.contLen = pInfo->msgSize; + rpcMsg.ahandle = pInfo; + rpcMsg.msgType = 1; + rpcMsg.push = push; + ; + // tDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num); + int64_t start = taosGetTimestampUs(); + rpcSendRequest(pInfo->pRpc, &pInfo->epSet, &rpcMsg, NULL); + if (pInfo->num % 20000 == 0) tInfo("thread:%d, %d requests have been sent", pInfo->index, pInfo->num); + tsem_wait(&pInfo->rspSem); + // tsem_wait(&pInfo->rspSem); + tsem_wait(&push->sem); + int64_t end = taosGetTimestampUs() - start; + if (end <= 100) { + u100++; + } else if (end > 100 && end <= 500) { + u500++; + } else if (end > 500 && end < 1000) { + u1000++; + } else { + u10000++; + } + + tDebug("recv response succefully"); + + // usleep(100000000); + } + + tError("send and recv sum: %d, %d, %d, %d", u100, u500, u1000, u10000); + tDebug("thread:%d, it is over", pInfo->index); + tcount++; + + return NULL; +} + +int main(int argc, char *argv[]) { + SRpcInit rpcInit; + SEpSet epSet; + int msgSize = 128; + int numOfReqs = 0; + int appThreads = 1; + char serverIp[40] = "127.0.0.1"; + char secret[20] = "mypassword"; + struct timeval systemTime; + int64_t startTime, endTime; + pthread_attr_t thattr; + + // server info + epSet.inUse = 0; + addEpIntoEpSet(&epSet, serverIp, 7000); + addEpIntoEpSet(&epSet, "192.168.0.1", 7000); + + // client info + memset(&rpcInit, 0, sizeof(rpcInit)); + rpcInit.localPort = 0; + rpcInit.label = "APP"; + rpcInit.numOfThreads = 1; + rpcInit.cfp = processResponse; + rpcInit.sessions = 100; + rpcInit.idleTime = 100; + rpcInit.user = "michael"; + rpcInit.secret = secret; + rpcInit.ckey = "key"; + rpcInit.spi = 1; + rpcInit.connType = TAOS_CONN_CLIENT; + + for (int i = 1; i < argc; ++i) { + if (strcmp(argv[i], "-p") == 0 && i < argc - 1) { + epSet.eps[0].port = atoi(argv[++i]); + } else if (strcmp(argv[i], "-i") == 0 && i < argc - 1) { + tstrncpy(epSet.eps[0].fqdn, argv[++i], sizeof(epSet.eps[0].fqdn)); + } else if (strcmp(argv[i], "-t") == 0 && i < argc - 1) { + rpcInit.numOfThreads = atoi(argv[++i]); + } else if (strcmp(argv[i], "-m") == 0 && i < argc - 1) { + msgSize = atoi(argv[++i]); + } else if (strcmp(argv[i], "-s") == 0 && i < argc - 1) { + rpcInit.sessions = atoi(argv[++i]); + } else if (strcmp(argv[i], "-n") == 0 && i < argc - 1) { + numOfReqs = atoi(argv[++i]); + } else if (strcmp(argv[i], "-a") == 0 && i < argc - 1) { + appThreads = atoi(argv[++i]); + } else if (strcmp(argv[i], "-o") == 0 && i < argc - 1) { + tsCompressMsgSize = atoi(argv[++i]); + } else if (strcmp(argv[i], "-u") == 0 && i < argc - 1) { + rpcInit.user = argv[++i]; + } else if (strcmp(argv[i], "-k") == 0 && i < argc - 1) { + rpcInit.secret = argv[++i]; + } else if (strcmp(argv[i], "-spi") == 0 && i < argc - 1) { + rpcInit.spi = atoi(argv[++i]); + } else if (strcmp(argv[i], "-d") == 0 && i < argc - 1) { + rpcDebugFlag = atoi(argv[++i]); + } else { + printf("\nusage: %s [options] \n", argv[0]); + printf(" [-i ip]: first server IP address, default is:%s\n", serverIp); + printf(" [-p port]: server port number, default is:%d\n", epSet.eps[0].port); + printf(" [-t threads]: number of rpc threads, default is:%d\n", rpcInit.numOfThreads); + printf(" [-s sessions]: number of rpc sessions, default is:%d\n", rpcInit.sessions); + printf(" [-m msgSize]: message body size, default is:%d\n", msgSize); + printf(" [-a threads]: number of app threads, default is:%d\n", appThreads); + printf(" [-n requests]: number of requests per thread, default is:%d\n", numOfReqs); + printf(" [-o compSize]: compression message size, default is:%d\n", tsCompressMsgSize); + printf(" [-u user]: user name for the connection, default is:%s\n", rpcInit.user); + printf(" [-k secret]: password for the connection, default is:%s\n", rpcInit.secret); + printf(" [-spi SPI]: security parameter index, default is:%d\n", rpcInit.spi); + printf(" [-d debugFlag]: debug flag, default:%d\n", rpcDebugFlag); + printf(" [-h help]: print out this help\n\n"); + exit(0); + } + } + + taosInitLog("client.log", 100000, 10); + + void *pRpc = rpcOpen(&rpcInit); + if (pRpc == NULL) { + tError("failed to initialize RPC"); + return -1; + } + + tInfo("client is initialized"); + tInfo("threads:%d msgSize:%d requests:%d", appThreads, msgSize, numOfReqs); + + gettimeofday(&systemTime, NULL); + startTime = systemTime.tv_sec * 1000000 + systemTime.tv_usec; + + SInfo *pInfo = (SInfo *)calloc(1, sizeof(SInfo) * appThreads); + + pthread_attr_init(&thattr); + pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); + + for (int i = 0; i < appThreads; ++i) { + pInfo->index = i; + pInfo->epSet = epSet; + pInfo->numOfReqs = numOfReqs; + pInfo->msgSize = msgSize; + tsem_init(&pInfo->rspSem, 0, 0); + pInfo->pRpc = pRpc; + pthread_create(&pInfo->thread, &thattr, sendRequest, pInfo); + pInfo++; + } + + do { + usleep(1); + } while (tcount < appThreads); + + gettimeofday(&systemTime, NULL); + endTime = systemTime.tv_sec * 1000000 + systemTime.tv_usec; + float usedTime = (endTime - startTime) / 1000.0f; // mseconds + + tInfo("it takes %.3f mseconds to send %d requests to server", usedTime, numOfReqs * appThreads); + tInfo("Performance: %.3f requests per second, msgSize:%d bytes", 1000.0 * numOfReqs * appThreads / usedTime, msgSize); + + int ch = getchar(); + UNUSED(ch); + + taosCloseLog(); + + return 0; +}