diff --git a/src/rpc/inc/rpcCache.h b/source/libs/transport/inc/rpcCache.h similarity index 100% rename from src/rpc/inc/rpcCache.h rename to source/libs/transport/inc/rpcCache.h diff --git a/src/rpc/inc/rpcHead.h b/source/libs/transport/inc/rpcHead.h similarity index 100% rename from src/rpc/inc/rpcHead.h rename to source/libs/transport/inc/rpcHead.h diff --git a/src/rpc/inc/rpcLog.h b/source/libs/transport/inc/rpcLog.h similarity index 100% rename from src/rpc/inc/rpcLog.h rename to source/libs/transport/inc/rpcLog.h diff --git a/src/rpc/inc/rpcTcp.h b/source/libs/transport/inc/rpcTcp.h similarity index 100% rename from src/rpc/inc/rpcTcp.h rename to source/libs/transport/inc/rpcTcp.h diff --git a/src/rpc/inc/rpcUdp.h b/source/libs/transport/inc/rpcUdp.h similarity index 100% rename from src/rpc/inc/rpcUdp.h rename to source/libs/transport/inc/rpcUdp.h diff --git a/src/rpc/src/rpcCache.c b/source/libs/transport/src/rpcCache.c similarity index 100% rename from src/rpc/src/rpcCache.c rename to source/libs/transport/src/rpcCache.c diff --git a/src/rpc/src/rpcMain.c b/source/libs/transport/src/rpcMain.c similarity index 99% rename from src/rpc/src/rpcMain.c rename to source/libs/transport/src/rpcMain.c index 9ea5fd5392..9bf2773dde 100644 --- a/src/rpc/src/rpcMain.c +++ b/source/libs/transport/src/rpcMain.c @@ -55,8 +55,9 @@ typedef struct { char secret[TSDB_KEY_LEN]; // secret for the link char ckey[TSDB_KEY_LEN]; // ciphering key - void (*cfp)(SRpcMsg *, SRpcEpSet *); - int (*afp)(char *user, char *spi, char *encrypt, char *secret, char *ckey); + void *owner; + void (*cfp)(void * owner, SRpcMsg *, SRpcEpSet *); + int (*afp)(void * owner, char *user, char *spi, char *encrypt, char *secret, char *ckey); int32_t refCount; void *idPool; // handle to ID pool @@ -259,6 +260,7 @@ void *rpcOpen(const SRpcInit *pInit) { if (pInit->secret) memcpy(pRpc->secret, pInit->secret, sizeof(pRpc->secret)); if (pInit->ckey) tstrncpy(pRpc->ckey, pInit->ckey, sizeof(pRpc->ckey)); pRpc->spi = pInit->spi; + pRpc->owner = pInit->owner; pRpc->cfp = pInit->cfp; pRpc->afp = pInit->afp; pRpc->refCount = 1; @@ -741,7 +743,7 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) { if (pConn->user[0] == 0) { terrno = TSDB_CODE_RPC_AUTH_REQUIRED; } else { - terrno = (*pRpc->afp)(pConn->user, &pConn->spi, &pConn->encrypt, pConn->secret, pConn->ckey); + terrno = (*pRpc->afp)(pRpc->owner, pConn->user, &pConn->spi, &pConn->encrypt, pConn->secret, pConn->ckey); } if (terrno != 0) { @@ -1021,7 +1023,7 @@ static void doRpcReportBrokenLinkToServer(void *param, void *id) { SRpcMsg *pRpcMsg = (SRpcMsg *)(param); SRpcConn *pConn = (SRpcConn *)(pRpcMsg->handle); SRpcInfo *pRpc = pConn->pRpc; - (*(pRpc->cfp))(pRpcMsg, NULL); + (*(pRpc->cfp))(pRpc->owner, pRpcMsg, NULL); free(pRpcMsg); } static void rpcReportBrokenLinkToServer(SRpcConn *pConn) { @@ -1136,7 +1138,7 @@ static void rpcNotifyClient(SRpcReqContext *pContext, SRpcMsg *pMsg) { if (pContext->epSet.inUse != pContext->oldInUse || pContext->redirect) pEpSet = &pContext->epSet; - (*pRpc->cfp)(pMsg, pEpSet); + (*pRpc->cfp)(pRpc->owner, pMsg, pEpSet); } // free the request message @@ -1160,7 +1162,7 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqConte rpcAddRef(pRpc); // add the refCount for requests // notify the server app - (*(pRpc->cfp))(&rpcMsg, NULL); + (*(pRpc->cfp))(pRpc->owner, &rpcMsg, NULL); } else { // it's a response rpcMsg.handle = pContext; diff --git a/src/rpc/src/rpcTcp.c b/source/libs/transport/src/rpcTcp.c similarity index 100% rename from src/rpc/src/rpcTcp.c rename to source/libs/transport/src/rpcTcp.c diff --git a/src/rpc/src/rpcUdp.c b/source/libs/transport/src/rpcUdp.c similarity index 100% rename from src/rpc/src/rpcUdp.c rename to source/libs/transport/src/rpcUdp.c diff --git a/src/rpc/CMakeLists.txt b/src/rpc/CMakeLists.txt deleted file mode 100644 index 14b77356ba..0000000000 --- a/src/rpc/CMakeLists.txt +++ /dev/null @@ -1,10 +0,0 @@ -CMAKE_MINIMUM_REQUIRED(VERSION 2.8...3.20) -PROJECT(TDengine) - -INCLUDE_DIRECTORIES(inc) -AUX_SOURCE_DIRECTORY(src SRC) - -ADD_LIBRARY(trpc ${SRC}) -TARGET_LINK_LIBRARIES(trpc tutil lz4 common) - -ADD_SUBDIRECTORY(test) diff --git a/src/rpc/test/CMakeLists.txt b/src/rpc/test/CMakeLists.txt deleted file mode 100644 index a32ac9943d..0000000000 --- a/src/rpc/test/CMakeLists.txt +++ /dev/null @@ -1,32 +0,0 @@ -CMAKE_MINIMUM_REQUIRED(VERSION 2.8...3.20) -PROJECT(TDengine) - -INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/rpc/inc) - -IF (TD_LINUX) - LIST(APPEND CLIENT_SRC ./rclient.c) - ADD_EXECUTABLE(rclient ${CLIENT_SRC}) - TARGET_LINK_LIBRARIES(rclient trpc) - - LIST(APPEND SCLIENT_SRC ./rsclient.c) - ADD_EXECUTABLE(rsclient ${SCLIENT_SRC}) - TARGET_LINK_LIBRARIES(rsclient trpc) - - LIST(APPEND SERVER_SRC ./rserver.c) - ADD_EXECUTABLE(rserver ${SERVER_SRC}) - TARGET_LINK_LIBRARIES(rserver trpc) -ENDIF () - -IF (TD_DARWIN) - LIST(APPEND CLIENT_SRC ./rclient.c) - ADD_EXECUTABLE(rclient ${CLIENT_SRC}) - TARGET_LINK_LIBRARIES(rclient trpc) - - LIST(APPEND SCLIENT_SRC ./rsclient.c) - ADD_EXECUTABLE(rsclient ${SCLIENT_SRC}) - TARGET_LINK_LIBRARIES(rsclient trpc) - - LIST(APPEND SERVER_SRC ./rserver.c) - ADD_EXECUTABLE(rserver ${SERVER_SRC}) - TARGET_LINK_LIBRARIES(rserver trpc) -ENDIF () diff --git a/src/rpc/test/rclient.c b/src/rpc/test/rclient.c deleted file mode 100644 index 2f4433f1bb..0000000000 --- a/src/rpc/test/rclient.c +++ /dev/null @@ -1,199 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#include "os.h" -#include "tutil.h" -#include "tglobal.h" -#include "rpcLog.h" -#include "trpc.h" -#include "taoserror.h" - -typedef struct { - int index; - SRpcEpSet epSet; - int num; - int numOfReqs; - int msgSize; - tsem_t rspSem; - tsem_t *pOverSem; - pthread_t thread; - void *pRpc; -} SInfo; - -static void processResponse(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { - SInfo *pInfo = (SInfo *)pMsg->ahandle; - tDebug("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, pMsg->msgType, pMsg->contLen, pMsg->code); - - if (pEpSet) pInfo->epSet = *pEpSet; - - rpcFreeCont(pMsg->pCont); - tsem_post(&pInfo->rspSem); -} - -static int tcount = 0; - -static void *sendRequest(void *param) { - SInfo *pInfo = (SInfo *)param; - SRpcMsg rpcMsg = {0}; - - tDebug("thread:%d, start to send request", pInfo->index); - - while ( pInfo->numOfReqs == 0 || pInfo->num < pInfo->numOfReqs) { - pInfo->num++; - rpcMsg.pCont = rpcMallocCont(pInfo->msgSize); - rpcMsg.contLen = pInfo->msgSize; - rpcMsg.ahandle = pInfo; - rpcMsg.msgType = 1; - tDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num); - rpcSendRequest(pInfo->pRpc, &pInfo->epSet, &rpcMsg, NULL); - if ( pInfo->num % 20000 == 0 ) - tInfo("thread:%d, %d requests have been sent", pInfo->index, pInfo->num); - tsem_wait(&pInfo->rspSem); - } - - tDebug("thread:%d, it is over", pInfo->index); - tcount++; - - return NULL; -} - -int main(int argc, char *argv[]) { - SRpcInit rpcInit; - SRpcEpSet epSet; - int msgSize = 128; - int numOfReqs = 0; - int appThreads = 1; - char serverIp[40] = "127.0.0.1"; - char secret[TSDB_KEY_LEN] = "mypassword"; - struct timeval systemTime; - int64_t startTime, endTime; - pthread_attr_t thattr; - - // server info - epSet.numOfEps = 1; - epSet.inUse = 0; - epSet.port[0] = 7000; - epSet.port[1] = 7000; - strcpy(epSet.fqdn[0], serverIp); - strcpy(epSet.fqdn[1], "192.168.0.1"); - - // client info - memset(&rpcInit, 0, sizeof(rpcInit)); - rpcInit.localPort = 0; - rpcInit.label = "APP"; - rpcInit.numOfThreads = 1; - rpcInit.cfp = processResponse; - rpcInit.sessions = 100; - rpcInit.idleTime = tsShellActivityTimer*1000; - rpcInit.user = "michael"; - rpcInit.secret = secret; - rpcInit.ckey = "key"; - rpcInit.spi = 1; - rpcInit.connType = TAOS_CONN_CLIENT; - - for (int i=1; iindex = i; - pInfo->epSet = epSet; - pInfo->numOfReqs = numOfReqs; - pInfo->msgSize = msgSize; - tsem_init(&pInfo->rspSem, 0, 0); - pInfo->pRpc = pRpc; - pthread_create(&pInfo->thread, &thattr, sendRequest, pInfo); - pInfo++; - } - - do { - usleep(1); - } while ( tcount < appThreads); - - gettimeofday(&systemTime, NULL); - endTime = systemTime.tv_sec*1000000 + systemTime.tv_usec; - float usedTime = (endTime - startTime)/1000.0f; // mseconds - - tInfo("it takes %.3f mseconds to send %d requests to server", usedTime, numOfReqs*appThreads); - tInfo("Performance: %.3f requests per second, msgSize:%d bytes", 1000.0*numOfReqs*appThreads/usedTime, msgSize); - - int ch = getchar(); - UNUSED(ch); - - taosCloseLog(); - - return 0; -} - - diff --git a/src/rpc/test/rsclient.c b/src/rpc/test/rsclient.c deleted file mode 100644 index 65170d4abb..0000000000 --- a/src/rpc/test/rsclient.c +++ /dev/null @@ -1,196 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - - -#include "os.h" -#include "tutil.h" -#include "tglobal.h" -#include "rpcLog.h" -#include "trpc.h" -#include "taoserror.h" - -typedef struct { - int index; - SRpcEpSet epSet; - int num; - int numOfReqs; - int msgSize; - tsem_t rspSem; - tsem_t *pOverSem; - pthread_t thread; - void *pRpc; -} SInfo; - - -static int tcount = 0; -static int terror = 0; - -static void *sendRequest(void *param) { - SInfo *pInfo = (SInfo *)param; - SRpcMsg rpcMsg, rspMsg; - - tDebug("thread:%d, start to send request", pInfo->index); - - while ( pInfo->numOfReqs == 0 || pInfo->num < pInfo->numOfReqs) { - pInfo->num++; - rpcMsg.pCont = rpcMallocCont(pInfo->msgSize); - rpcMsg.contLen = pInfo->msgSize; - rpcMsg.handle = pInfo; - rpcMsg.msgType = 1; - tDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num); - - rpcSendRecv(pInfo->pRpc, &pInfo->epSet, &rpcMsg, &rspMsg); - - // handle response - if (rspMsg.code != 0) terror++; - - tDebug("thread:%d, rspLen:%d code:%d", pInfo->index, rspMsg.contLen, rspMsg.code); - - rpcFreeCont(rspMsg.pCont); - - if ( pInfo->num % 20000 == 0 ) - tInfo("thread:%d, %d requests have been sent", pInfo->index, pInfo->num); - } - - tDebug("thread:%d, it is over", pInfo->index); - tcount++; - - return NULL; -} - -int main(int argc, char *argv[]) { - SRpcInit rpcInit; - SRpcEpSet epSet; - int msgSize = 128; - int numOfReqs = 0; - int appThreads = 1; - char serverIp[40] = "127.0.0.1"; - char secret[TSDB_KEY_LEN] = "mypassword"; - struct timeval systemTime; - int64_t startTime, endTime; - pthread_attr_t thattr; - - // server info - epSet.numOfEps = 1; - epSet.inUse = 0; - epSet.port[0] = 7000; - epSet.port[1] = 7000; - strcpy(epSet.fqdn[0], serverIp); - strcpy(epSet.fqdn[1], "192.168.0.1"); - - // client info - memset(&rpcInit, 0, sizeof(rpcInit)); - //rpcInit.localIp = "0.0.0.0"; - rpcInit.localPort = 0; - rpcInit.label = "APP"; - rpcInit.numOfThreads = 1; - rpcInit.sessions = 100; - rpcInit.idleTime = tsShellActivityTimer*1000; - rpcInit.user = "michael"; - rpcInit.secret = secret; - rpcInit.ckey = "key"; - rpcInit.spi = 1; - rpcInit.connType = TAOS_CONN_CLIENT; - - for (int i=1; iindex = i; - pInfo->epSet = epSet; - pInfo->numOfReqs = numOfReqs; - pInfo->msgSize = msgSize; - tsem_init(&pInfo->rspSem, 0, 0); - pInfo->pRpc = pRpc; - pthread_create(&pInfo->thread, &thattr, sendRequest, pInfo); - pInfo++; - } - - do { - usleep(1); - } while ( tcount < appThreads); - - gettimeofday(&systemTime, NULL); - endTime = systemTime.tv_sec*1000000 + systemTime.tv_usec; - float usedTime = (endTime - startTime)/1000.0; // mseconds - - tInfo("it takes %.3f mseconds to send %d requests to server, error num:%d", usedTime, numOfReqs*appThreads, terror); - tInfo("Performance: %.3f requests per second, msgSize:%d bytes", 1000.0*numOfReqs*appThreads/usedTime, msgSize); - - taosCloseLog(); - - return 0; -} - - diff --git a/src/rpc/test/rserver.c b/src/rpc/test/rserver.c deleted file mode 100644 index 64960db044..0000000000 --- a/src/rpc/test/rserver.c +++ /dev/null @@ -1,192 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -//#define _DEFAULT_SOURCE -#include "os.h" -#include "tglobal.h" -#include "rpcLog.h" -#include "trpc.h" -#include "tqueue.h" - -int msgSize = 128; -int commit = 0; -int dataFd = -1; -void *qhandle = NULL; -void *qset = NULL; - -void processShellMsg() { - static int num = 0; - taos_qall qall; - SRpcMsg *pRpcMsg, rpcMsg; - int type; - void *pvnode; - - qall = taosAllocateQall(); - - while (1) { - int numOfMsgs = taosReadAllQitemsFromQset(qset, qall, &pvnode); - tDebug("%d shell msgs are received", numOfMsgs); - if (numOfMsgs <= 0) break; - - for (int i=0; i=0) { - if ( write(dataFd, pRpcMsg->pCont, pRpcMsg->contLen) <0 ) { - tInfo("failed to write data file, reason:%s", strerror(errno)); - } - } - } - - if (commit >=2) { - num += numOfMsgs; - if ( taosFsync(dataFd) < 0 ) { - tInfo("failed to flush data to file, reason:%s", strerror(errno)); - } - - if (num % 10000 == 0) { - tInfo("%d request have been written into disk", num); - } - } - - taosResetQitems(qall); - for (int i=0; ipCont); - - memset(&rpcMsg, 0, sizeof(rpcMsg)); - rpcMsg.pCont = rpcMallocCont(msgSize); - rpcMsg.contLen = msgSize; - rpcMsg.handle = pRpcMsg->handle; - rpcMsg.code = 0; - rpcSendResponse(&rpcMsg); - - taosFreeQitem(pRpcMsg); - } - - } - - taosFreeQall(qall); - -} - -int retrieveAuthInfo(char *meterId, char *spi, char *encrypt, char *secret, char *ckey) { - // app shall retrieve the auth info based on meterID from DB or a data file - // demo code here only for simple demo - int ret = 0; - - if (strcmp(meterId, "michael") == 0) { - *spi = 1; - *encrypt = 0; - strcpy(secret, "mypassword"); - strcpy(ckey, "key"); - } else if (strcmp(meterId, "jeff") == 0) { - *spi = 0; - *encrypt = 0; - } else { - ret = -1; // user not there - } - - return ret; -} - -void processRequestMsg(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { - SRpcMsg *pTemp; - - pTemp = taosAllocateQitem(sizeof(SRpcMsg)); - memcpy(pTemp, pMsg, sizeof(SRpcMsg)); - - tDebug("request is received, type:%d, contLen:%d, item:%p", pMsg->msgType, pMsg->contLen, pTemp); - taosWriteQitem(qhandle, TAOS_QTYPE_RPC, pTemp); -} - -int main(int argc, char *argv[]) { - SRpcInit rpcInit; - char dataName[20] = "server.data"; - - taosBlockSIGPIPE(); - - memset(&rpcInit, 0, sizeof(rpcInit)); - rpcInit.localPort = 7000; - rpcInit.label = "SER"; - rpcInit.numOfThreads = 1; - rpcInit.cfp = processRequestMsg; - rpcInit.sessions = 1000; - rpcInit.idleTime = tsShellActivityTimer*1500; - rpcInit.afp = retrieveAuthInfo; - - for (int i=1; i= 0) { - close(dataFd); - remove(dataName); - } - - return 0; -}