From 71729218863c06a0861ad55e6b2cbf479444a56d Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 19 Jan 2022 21:07:29 +0800 Subject: [PATCH 01/10] add client --- source/libs/transport/src/transCli.c | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index f197e72ec5..2ecc9e2343 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -53,7 +53,7 @@ typedef struct SClientObj { static SCliConn* getConnFromCache(void* cache, char* ip, uint32_t port); static void addConnToCache(void* cache, char* ip, uint32_t port, SCliConn* conn); -static void clientAllocrReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); +static void clientAllocBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); static void clientReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf); static void clientWriteCb(uv_write_t* req, int status); static void clientConnCb(uv_connect_t* req, int status); @@ -65,7 +65,7 @@ static void* clientThread(void* arg); static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd); -static void clientAllocrReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { +static void clientAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { // impl later } static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) { @@ -96,7 +96,7 @@ static void clientWriteCb(uv_write_t* req, int status) { return; } - uv_read_start((uv_stream_t*)pConn->stream, clientAllocrReadBufferCb, clientReadCb); + uv_read_start((uv_stream_t*)pConn->stream, clientAllocReadBufferCb, clientReadCb); // impl later } @@ -121,8 +121,6 @@ static void clientConnCb(uv_connect_t* req, int status) { SCliMsg* pMsg = pConn->data; SEpSet* pEpSet = &pMsg->context->epSet; SRpcMsg rpcMsg; - // rpcMsg.ahandle = pMsg->context->ahandle; - // rpcMsg.pCont = NULL; char* fqdn = pEpSet->fqdn[pEpSet->inUse]; uint32_t port = pEpSet->port[pEpSet->inUse]; @@ -135,6 +133,7 @@ static void clientConnCb(uv_connect_t* req, int status) { return; } assert(pConn->stream == req->handle); + clientWrite(pConn); } static SCliConn* getConnFromCache(void* cache, char* ip, uint32_t port) { From c063ea6913f35e9f9bc335d432bdbdca5ebe4823 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 20 Jan 2022 18:38:25 +0800 Subject: [PATCH 02/10] add client --- source/libs/transport/inc/transComm.h | 103 ++++++++--- source/libs/transport/inc/transportInt.h | 3 + source/libs/transport/src/trans.c | 13 +- source/libs/transport/src/transCli.c | 105 ++++++------ source/libs/transport/src/transComm.c | 72 ++++++++ source/libs/transport/src/transSrv.c | 59 ++++--- source/libs/transport/test/uv.c | 207 +++++++++++++++++++++++ 7 files changed, 461 insertions(+), 101 deletions(-) create mode 100644 source/libs/transport/test/uv.c diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 4b14f9f2c7..506b085ecd 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -102,38 +102,101 @@ typedef void* queue[2]; #define QUEUE_DATA(e, type, field) ((type*)((void*)((char*)(e)-offsetof(type, field)))) typedef struct { - SRpcInfo* pRpc; // associated SRpcInfo - SEpSet epSet; // ip list provided by app - void* ahandle; // handle provided by app - struct SRpcConn* pConn; // pConn allocated - tmsg_t msgType; // message type - uint8_t* pCont; // content provided by app - int32_t contLen; // content length - int32_t code; // error code - int16_t numOfTry; // number of try for different servers - int8_t oldInUse; // server EP inUse passed by app - int8_t redirect; // flag to indicate redirect - int8_t connType; // connection type - int64_t rid; // refId returned by taosAddRef - SRpcMsg* pRsp; // for synchronous API - tsem_t* pSem; // for synchronous API - SEpSet* pSet; // for synchronous API - char msg[0]; // RpcHead starts from here + SRpcInfo* pRpc; // associated SRpcInfo + SEpSet epSet; // ip list provided by app + void* ahandle; // handle provided by app + // struct SRpcConn* pConn; // pConn allocated + tmsg_t msgType; // message type + uint8_t* pCont; // content provided by app + int32_t contLen; // content length + // int32_t code; // error code + // int16_t numOfTry; // number of try for different servers + // int8_t oldInUse; // server EP inUse passed by app + // int8_t redirect; // flag to indicate redirect + int8_t connType; // connection type + int64_t rid; // refId returned by taosAddRef + SRpcMsg* pRsp; // for synchronous API + tsem_t* pSem; // for synchronous API + char* ip; + uint32_t port; + // SEpSet* pSet; // for synchronous API } SRpcReqContext; -#define container_of(ptr, type, member) ((type*)((char*)(ptr)-offsetof(type, member))) -#define RPC_RESERVE_SIZE (sizeof(SRpcReqContext)) +typedef struct { + SRpcInfo* pRpc; // associated SRpcInfo + SEpSet epSet; // ip list provided by app + void* ahandle; // handle provided by app + // struct SRpcConn* pConn; // pConn allocated + tmsg_t msgType; // message type + uint8_t* pCont; // content provided by app + int32_t contLen; // content length + // int32_t code; // error code + // int16_t numOfTry; // number of try for different servers + // int8_t oldInUse; // server EP inUse passed by app + // int8_t redirect; // flag to indicate redirect + int8_t connType; // connection type + int64_t rid; // refId returned by taosAddRef + SRpcMsg* pRsp; // for synchronous API + tsem_t* pSem; // for synchronous API + char* ip; + uint32_t port; + // SEpSet* pSet; // for synchronous API +} STransConnCtx; -#define RPC_MSG_OVERHEAD (sizeof(SRpcReqContext) + sizeof(SRpcHead) + sizeof(SRpcDigest)) +#pragma pack(push, 1) + +typedef struct { + char version : 4; // RPC version + char comp : 4; // compression algorithm, 0:no compression 1:lz4 + char resflag : 2; // reserved bits + char spi : 3; // security parameter index + char encrypt : 3; // encrypt algorithm, 0: no encryption + + uint32_t code; // del later + uint32_t msgType; + int32_t msgLen; + uint8_t content[0]; // message body starts from here +} STransMsgHead; + +typedef struct { + int32_t reserved; + int32_t contLen; +} STransCompMsg; + +typedef struct { + uint32_t timeStamp; + uint8_t auth[TSDB_AUTH_LEN]; +} STransDigestMsg; + +#pragma pack(pop) + +#define container_of(ptr, type, member) ((type*)((char*)(ptr)-offsetof(type, member))) +#define RPC_RESERVE_SIZE (sizeof(STranConnCtx)) + +#define RPC_MSG_OVERHEAD (sizeof(SRpcHead) + sizeof(SRpcDigest)) #define rpcHeadFromCont(cont) ((SRpcHead*)((char*)cont - sizeof(SRpcHead))) #define rpcContFromHead(msg) (msg + sizeof(SRpcHead)) #define rpcMsgLenFromCont(contLen) (contLen + sizeof(SRpcHead)) #define rpcContLenFromMsg(msgLen) (msgLen - sizeof(SRpcHead)) #define rpcIsReq(type) (type & 1U) +#define TRANS_RESERVE_SIZE (sizeof(STranConnCtx)) + +#define TRANS_MSG_OVERHEAD (sizeof(STransMsgHead) + sizeof(STransDigestMsg)) +#define transHeadFromCont(cont) ((STransMsgHead*)((char*)cont - sizeof(STransMsgHead))) +#define transContFromHead(msg) (msg + sizeof(STransMsgHead)) +#define transMsgLenFromCont(contLen) (contLen + sizeof(STransMsgHead)) +#define transContLenFromMsg(msgLen) (msgLen - sizeof(STransMsgHead)); +#define transIsReq(type) (type & 1U) + int rpcAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey); void rpcBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey); int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen); SRpcHead* rpcDecompressRpcMsg(SRpcHead* pHead); +int transAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey); +void transBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey); +bool transCompressMsg(char* msg, int32_t len, int32_t* flen); +bool transDecompressMsg(char* msg, int32_t len, int32_t* flen); + #endif diff --git a/source/libs/transport/inc/transportInt.h b/source/libs/transport/inc/transportInt.h index e39e0d9273..3c8c922d83 100644 --- a/source/libs/transport/inc/transportInt.h +++ b/source/libs/transport/inc/transportInt.h @@ -45,6 +45,9 @@ extern "C" { void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle); void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle); +void taosCloseServer(void* arg); +void taosCloseClient(void* arg); + typedef struct { int sessions; // number of sessions allowed int numOfThreads; // number of threads to process incoming messages diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index cb8ef87b48..48398316f1 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -24,8 +24,9 @@ typedef struct SConnBuffer { int left; } SConnBuffer; -void* (*taosHandle[])(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) = { +void* (*taosInitHandle[])(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) = { taosInitServer, taosInitClient}; +void (*taosCloseHandle[])(void* arg) = {taosCloseServer, taosCloseClient}; void* rpcOpen(const SRpcInit* pInit) { SRpcInfo* pRpc = calloc(1, sizeof(SRpcInfo)); @@ -38,11 +39,15 @@ void* rpcOpen(const SRpcInit* pInit) { pRpc->cfp = pInit->cfp; pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads; pRpc->connType = pInit->connType; - pRpc->tcphandle = (*taosHandle[pRpc->connType])(0, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc); + pRpc->tcphandle = (*taosInitHandle[pRpc->connType])(0, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc); return pRpc; } -void rpcClose(void* arg) { return; } +void rpcClose(void* arg) { + SRpcInfo* pRpc = (SRpcInfo*)arg; + (*taosCloseHandle[pRpc->connType])(pRpc->tcphandle); + return; +} void* rpcMallocCont(int contLen) { int size = contLen + RPC_MSG_OVERHEAD; @@ -53,7 +58,7 @@ void* rpcMallocCont(int contLen) { } else { tTrace("malloc mem:%p size:%d", start, size); } - return start + sizeof(SRpcReqContext) + sizeof(SRpcHead); + return start + sizeof(STransMsgHead); } void rpcFreeCont(void* cont) { return; } void* rpcReallocCont(void* ptr, int contLen) { return NULL; } diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 2ecc9e2343..8622db9b3f 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -26,10 +26,12 @@ typedef struct SCliConn { char spi; char secured; } SCliConn; + typedef struct SCliMsg { - SRpcReqContext* context; - queue q; - uint64_t st; + STransConnCtx* ctx; + SRpcMsg msg; + queue q; + uint64_t st; } SCliMsg; typedef struct SCliThrdObj { @@ -74,6 +76,7 @@ static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf if (nread > 0) { return; } + // uv_close((uv_handle_t*)handle, clientDestroy); } @@ -101,10 +104,12 @@ static void clientWriteCb(uv_write_t* req, int status) { } static void clientWrite(SCliConn* pConn) { - SCliMsg* pMsg = pConn->data; - SRpcHead* pHead = rpcHeadFromCont(pMsg->context->pCont); - int msgLen = rpcMsgLenFromCont(pMsg->context->contLen); - char* msg = (char*)(pHead); + SCliMsg* pCliMsg = pConn->data; + SRpcMsg* pMsg = (SRpcMsg*)(&pCliMsg->msg); + STransMsgHead* pHead = transHeadFromCont(pMsg->pCont); + + int msgLen = transMsgLenFromCont(pMsg->contLen); + char* msg = (char*)(pHead); uv_buf_t wb = uv_buf_init(msg, msgLen); uv_write(pConn->writeReq, (uv_stream_t*)pConn->stream, &wb, 1, clientWriteCb); @@ -118,17 +123,17 @@ static void clientConnCb(uv_connect_t* req, int status) { return; } - SCliMsg* pMsg = pConn->data; - SEpSet* pEpSet = &pMsg->context->epSet; - SRpcMsg rpcMsg; + SCliMsg* pMsg = pConn->data; + STransConnCtx* pCtx = ((SCliMsg*)(pConn->data))->ctx; + + SRpcMsg rpcMsg; + rpcMsg.ahandle = pCtx->ahandle; - char* fqdn = pEpSet->fqdn[pEpSet->inUse]; - uint32_t port = pEpSet->port[pEpSet->inUse]; if (status != 0) { // call user fp later - tError("failed to connect server(%s, %d), errmsg: %s", fqdn, port, uv_strerror(status)); - SRpcInfo* pRpc = pMsg->context->pRpc; - (pRpc->cfp)(NULL, &rpcMsg, pEpSet); + tError("failed to connect server(%s, %d), errmsg: %s", pCtx->ip, pCtx->port, uv_strerror(status)); + SRpcInfo* pRpc = pMsg->ctx->pRpc; + (pRpc->cfp)(NULL, &rpcMsg, NULL); uv_close((uv_handle_t*)req->handle, clientDestroy); return; } @@ -146,22 +151,18 @@ static void addConnToCache(void* cache, char* ip, uint32_t port, SCliConn* conn) } static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { - SEpSet* pEpSet = &pMsg->context->epSet; - - char* fqdn = pEpSet->fqdn[pEpSet->inUse]; - uint32_t port = pEpSet->port[pEpSet->inUse]; - - uint64_t el = taosGetTimestampUs() - pMsg->st; + uint64_t et = taosGetTimestampUs(); + uint64_t el = et - pMsg->st; tDebug("msg tran time cost: %" PRIu64 "", el); + et = taosGetTimestampUs(); - SCliConn* conn = getConnFromCache(pThrd->cache, fqdn, port); + STransConnCtx* pCtx = pMsg->ctx; + SCliConn* conn = getConnFromCache(pThrd->cache, pCtx->ip, pCtx->port); if (conn != NULL) { // impl later conn->data = pMsg; conn->writeReq->data = conn; clientWrite(conn); - // uv_buf_t wb; - // uv_write(conn->writeReq, (uv_stream_t*)conn->stream, &wb, 1, clientWriteCb); } else { SCliConn* conn = malloc(sizeof(SCliConn)); @@ -171,23 +172,11 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { conn->connReq.data = conn; conn->data = pMsg; + struct sockaddr_in addr; - uv_ip4_addr(fqdn, port, &addr); + uv_ip4_addr(pMsg->ctx->ip, pMsg->ctx->port, &addr); // handle error in callback if fail to connect uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, clientConnCb); - - // SRpcMsg rpcMsg; - // SEpSet* pEpSet = &pMsg->context->epSet; - // SRpcInfo* pRpc = pMsg->context->pRpc; - //// rpcMsg.ahandle = pMsg->context->ahandle; - // rpcMsg.pCont = NULL; - // rpcMsg.ahandle = pMsg->context->ahandle; - // uint64_t el1 = taosGetTimestampUs() - et; - // tError("msg tran back first: time cost: %" PRIu64 "", el1); - // et = taosGetTimestampUs(); - //(pRpc->cfp)(NULL, &rpcMsg, pEpSet); - // uint64_t el2 = taosGetTimestampUs() - et; - // tError("msg tran back second: time cost: %" PRIu64 "", el2); } } static void clientAsyncCb(uv_async_t* handle) { @@ -204,7 +193,8 @@ static void clientAsyncCb(uv_async_t* handle) { while (!QUEUE_IS_EMPTY(&wq)) { queue* h = QUEUE_HEAD(&wq); QUEUE_REMOVE(h); - pMsg = QUEUE_DATA(h, SCliMsg, q); + + SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q); clientHandleReq(pMsg, pThrd); count++; if (count >= 2) { @@ -220,6 +210,7 @@ static void* clientThread(void* arg) { void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) { SClientObj* cli = calloc(1, sizeof(SClientObj)); + memcpy(cli->label, label, strlen(label)); cli->numOfThreads = numOfThreads; cli->pThreadObj = (SCliThrdObj**)calloc(cli->numOfThreads, sizeof(SCliThrdObj*)); @@ -244,22 +235,31 @@ void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, } return cli; } +void taosCloseClient(void* arg) { + // impl later + SClientObj* cli = arg; +} void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) { // impl later + char* ip = (char*)(pEpSet->fqdn[pEpSet->inUse]); + uint32_t port = pEpSet->port[pEpSet->inUse]; + SRpcInfo* pRpc = (SRpcInfo*)shandle; int len = rpcCompressRpcMsg(pMsg->pCont, pMsg->contLen); - SRpcReqContext* pContext; - pContext = (SRpcReqContext*)((char*)pMsg->pCont - sizeof(SRpcHead) - sizeof(SRpcReqContext)); - pContext->ahandle = pMsg->ahandle; - pContext->pRpc = (SRpcInfo*)shandle; - pContext->epSet = *pEpSet; - pContext->contLen = len; - pContext->pCont = pMsg->pCont; - pContext->msgType = pMsg->msgType; - pContext->oldInUse = pEpSet->inUse; + STransConnCtx* pCtx = calloc(1, sizeof(STransConnCtx)); + + pCtx->pRpc = (SRpcInfo*)shandle; + pCtx->ahandle = pMsg->ahandle; + // pContext->contLen = len; + // pContext->pCont = pMsg->pCont; + pCtx->msgType = pMsg->msgType; + pCtx->ip = strdup(ip); + pCtx->port = port; + // pContext->epSet = *pEpSet; + // pContext->oldInUse = pEpSet->inUse; assert(pRpc->connType == TAOS_CONN_CLIENT); // atomic or not @@ -267,14 +267,15 @@ void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* if (pRpc->index++ >= pRpc->numOfThreads) { pRpc->index = 0; } - SCliMsg* msg = malloc(sizeof(SCliMsg)); - msg->context = pContext; - msg->st = taosGetTimestampUs(); + SCliMsg* cliMsg = malloc(sizeof(SCliMsg)); + cliMsg->ctx = pCtx; + cliMsg->msg = *pMsg; + cliMsg->st = taosGetTimestampUs(); SCliThrdObj* thrd = ((SClientObj*)pRpc->tcphandle)->pThreadObj[index % pRpc->numOfThreads]; pthread_mutex_lock(&thrd->msgMtx); - QUEUE_PUSH(&thrd->msg, &msg->q); + QUEUE_PUSH(&thrd->msg, &cliMsg->q); pthread_mutex_unlock(&thrd->msgMtx); uv_async_send(thrd->cliAsync); diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index f23cfb6e2d..27cff80586 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -30,6 +30,20 @@ int rpcAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey) { return ret; } +int transAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey) { + T_MD5_CTX context; + int ret = -1; + + tMD5Init(&context); + tMD5Update(&context, (uint8_t*)pKey, TSDB_PASSWORD_LEN); + tMD5Update(&context, (uint8_t*)pMsg, msgLen); + tMD5Update(&context, (uint8_t*)pKey, TSDB_PASSWORD_LEN); + tMD5Final(&context); + + if (memcmp(context.digest, pAuth, sizeof(context.digest)) == 0) ret = 0; + + return ret; +} void rpcBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey) { T_MD5_CTX context; @@ -41,6 +55,17 @@ void rpcBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey) { memcpy(pAuth, context.digest, sizeof(context.digest)); } +void transBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey) { + T_MD5_CTX context; + + tMD5Init(&context); + tMD5Update(&context, (uint8_t*)pKey, TSDB_PASSWORD_LEN); + tMD5Update(&context, (uint8_t*)pMsg, msgLen); + tMD5Update(&context, (uint8_t*)pKey, TSDB_PASSWORD_LEN); + tMD5Final(&context); + + memcpy(pAuth, context.digest, sizeof(context.digest)); +} int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen) { SRpcHead* pHead = rpcHeadFromCont(pCont); @@ -81,6 +106,53 @@ int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen) { return finalLen; } +bool transCompressMsg(char* msg, int32_t len, int32_t* flen) { + // SRpcHead* pHead = rpcHeadFromCont(pCont); + bool succ = false; + int overhead = sizeof(STransCompMsg); + if (!NEEDTO_COMPRESSS_MSG(len)) { + return succ; + } + + char* buf = malloc(len + overhead + 8); // 8 extra bytes + if (buf == NULL) { + tError("failed to allocate memory for rpc msg compression, contLen:%d", len); + *flen = len; + return succ; + } + + int32_t clen = LZ4_compress_default(msg, buf, len, len + overhead); + tDebug("compress rpc msg, before:%d, after:%d, overhead:%d", len, clen, overhead); + /* + * only the compressed size is less than the value of contLen - overhead, the compression is applied + * The first four bytes is set to 0, the second four bytes are utilized to keep the original length of message + */ + if (clen > 0 && clen < len - overhead) { + STransCompMsg* pComp = (STransCompMsg*)msg; + pComp->reserved = 0; + pComp->contLen = htonl(len); + memcpy(msg + overhead, buf, clen); + + tDebug("compress rpc msg, before:%d, after:%d", len, clen); + *flen = clen + overhead; + succ = true; + } else { + *flen = len; + succ = false; + } + free(buf); + return succ; +} +bool transDecompressMsg(char* msg, int32_t len, int32_t* flen) { + // impl later + return false; + STransCompMsg* pComp = (STransCompMsg*)msg; + + int overhead = sizeof(STransCompMsg); + int clen = 0; + return false; +} + SRpcHead* rpcDecompressRpcMsg(SRpcHead* pHead) { int overhead = sizeof(SRpcComp); SRpcHead* pNewHead = NULL; diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index bc4cc695b0..46caffd93c 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -107,24 +107,24 @@ void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* b SConn* conn = handle->data; SConnBuffer* pBuf = &conn->connBuf; if (pBuf->cap == 0) { - pBuf->buf = (char*)calloc(CAPACITY + RPC_RESERVE_SIZE, sizeof(char)); + pBuf->buf = (char*)calloc(CAPACITY, sizeof(char)); pBuf->len = 0; pBuf->cap = CAPACITY; pBuf->left = -1; - buf->base = pBuf->buf + RPC_RESERVE_SIZE; + buf->base = pBuf->buf; buf->len = CAPACITY; } else { if (pBuf->len >= pBuf->cap) { if (pBuf->left == -1) { pBuf->cap *= 2; - pBuf->buf = realloc(pBuf->buf, pBuf->cap + RPC_RESERVE_SIZE); + pBuf->buf = realloc(pBuf->buf, pBuf->cap); } else if (pBuf->len + pBuf->left > pBuf->cap) { pBuf->cap = pBuf->len + pBuf->left; - pBuf->buf = realloc(pBuf->buf, pBuf->len + pBuf->left + RPC_RESERVE_SIZE); + pBuf->buf = realloc(pBuf->buf, pBuf->len + pBuf->left); } } - buf->base = pBuf->buf + pBuf->len + RPC_RESERVE_SIZE; + buf->base = pBuf->buf + pBuf->len; buf->len = pBuf->cap - pBuf->len; } } @@ -133,11 +133,12 @@ void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* b // static bool readComplete(SConnBuffer* data) { // TODO(yihao): handle pipeline later - SRpcHead rpcHead; - int32_t headLen = sizeof(rpcHead); + // SRpcHead rpcHead; + STransMsgHead head; + int32_t headLen = sizeof(head); if (data->len >= headLen) { - memcpy((char*)&rpcHead, data->buf + RPC_RESERVE_SIZE, headLen); - int32_t msgLen = (int32_t)htonl((uint32_t)rpcHead.msgLen); + memcpy((char*)&head, data->buf, headLen); + int32_t msgLen = (int32_t)htonl((uint32_t)head.msgLen); if (msgLen > data->len) { data->left = msgLen - data->len; return false; @@ -150,21 +151,21 @@ static bool readComplete(SConnBuffer* data) { } static void uvDoProcess(SRecvInfo* pRecv) { - SRpcHead* pHead = (SRpcHead*)pRecv->msg; - SRpcInfo* pRpc = (SRpcInfo*)pRecv->shandle; - SConn* pConn = pRecv->thandle; - + // impl later + STransMsgHead* pHead = (STransMsgHead*)pRecv->msg; + SRpcInfo* pRpc = (SRpcInfo*)pRecv->shandle; + SConn* pConn = pRecv->thandle; tDump(pRecv->msg, pRecv->msgLen); - terrno = 0; - SRpcReqContext* pContest; + // SRpcReqContext* pContest; // do auth and check } static int uvAuthMsg(SConn* pConn, char* msg, int len) { - SRpcHead* pHead = (SRpcHead*)msg; - int code = 0; + STransMsgHead* pHead = (STransMsgHead*)msg; + + int code = 0; if ((pConn->secured && pHead->spi == 0) || (pHead->spi == 0 && pConn->spi == 0)) { // secured link, or no authentication @@ -224,7 +225,7 @@ static void uvProcessData(SConn* pConn) { SRecvInfo info; SRecvInfo* p = &info; SConnBuffer* pBuf = &pConn->connBuf; - p->msg = pBuf->buf + RPC_RESERVE_SIZE; + p->msg = pBuf->buf; p->msgLen = pBuf->len; p->ip = 0; p->port = 0; @@ -233,11 +234,10 @@ static void uvProcessData(SConn* pConn) { p->chandle = NULL; // - SRpcHead* pHead = (SRpcHead*)p->msg; - assert(rpcIsReq(pHead->msgType)); + STransMsgHead* pHead = (STransMsgHead*)p->msg; + assert(transIsReq(pHead->msgType)); SRpcInfo* pRpc = (SRpcInfo*)p->shandle; - pConn->ahandle = (void*)pHead->ahandle; // auth here int8_t code = uvAuthMsg(pConn, (char*)pHead, p->msgLen); @@ -247,14 +247,19 @@ static void uvProcessData(SConn* pConn) { } pHead->code = htonl(pHead->code); + int32_t dlen = 0; SRpcMsg rpcMsg; - - pHead = rpcDecompressRpcMsg(pHead); + if (transDecompressMsg(NULL, 0, NULL)) { + // add compress later + // pHead = rpcDecompressRpcMsg(pHead); + } else { + // impl later + } rpcMsg.contLen = rpcContLenFromMsg(pHead->msgLen); rpcMsg.pCont = pHead->content; rpcMsg.msgType = pHead->msgType; rpcMsg.code = pHead->code; - rpcMsg.ahandle = pConn->ahandle; + rpcMsg.ahandle = NULL; rpcMsg.handle = pConn; (*(pRpc->cfp))(pRpc->parent, &rpcMsg, NULL); @@ -423,7 +428,7 @@ void* workerThread(void* arg) { uv_loop_init(pThrd->loop); // SRpcInfo* pRpc = pThrd->shandle; - uv_pipe_init(pThrd->loop, pThrd->pipe, 1); + uv_pipe_init(pThrd->loop, pThrd->pipe, 0); uv_pipe_open(pThrd->pipe, pThrd->fd); pThrd->pipe->data = pThrd; @@ -522,6 +527,10 @@ void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, return srv; } +void taosCloseServer(void* arg) { + // impl later + SServerObj* srv = arg; +} void rpcSendResponse(const SRpcMsg* pMsg) { SConn* pConn = pMsg->handle; diff --git a/source/libs/transport/test/uv.c b/source/libs/transport/test/uv.c new file mode 100644 index 0000000000..4c7d30900b --- /dev/null +++ b/source/libs/transport/test/uv.c @@ -0,0 +1,207 @@ +#include +#include +#include +#include +#include + +#include "task.h" +#include + +#define NUM_OF_THREAD 1 +#define TIMEOUT 10000 + +typedef struct SThreadObj { + pthread_t thread; + uv_pipe_t *pipe; + uv_loop_t *loop; + uv_async_t *workerAsync; // + int fd; +} SThreadObj; + +typedef struct SServerObj { + uv_tcp_t server; + uv_loop_t *loop; + int workerIdx; + int numOfThread; + SThreadObj **pThreadObj; + uv_pipe_t **pipe; +} SServerObj; + +typedef struct SConnCtx { + uv_tcp_t *pClient; + uv_timer_t *pTimer; + uv_async_t *pWorkerAsync; + int ref; +} SConnCtx; + +void echo_write(uv_write_t *req, int status) { + if (status < 0) { + fprintf(stderr, "Write error %s\n", uv_err_name(status)); + } + printf("write data to client\n"); + free(req); +} + +void echo_read(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) { + + SConnCtx *pConn = container_of(client, SConnCtx, pClient); + pConn->ref += 1; + printf("read data %d\n", nread, buf->base, buf->len); + if (nread > 0) { + uv_write_t *req = (uv_write_t *)malloc(sizeof(uv_write_t)); + // dispatch request to database other process thread + // just write out + uv_buf_t write_out; + write_out.base = buf->base; + write_out.len = nread; + uv_write((uv_write_t *)req, client, &write_out, 1, echo_write); + free(buf->base); + return; + } + + if (nread < 0) { + if (nread != UV_EOF) + fprintf(stderr, "Read error %s\n", uv_err_name(nread)); + uv_close((uv_handle_t *)client, NULL); + } + free(buf->base); +} + +void alloc_buffer(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) { + buf->base = malloc(suggested_size); + buf->len = suggested_size; +} + +void on_new_connection(uv_stream_t *s, int status) { + if (status == -1) { + // error! + return; + } + SServerObj *pObj = container_of(s, SServerObj, server); + printf("new_connection from client\n"); + + uv_tcp_t *client = (uv_tcp_t *)malloc(sizeof(uv_tcp_t)); + uv_tcp_init(pObj->loop, client); + if (uv_accept(s, (uv_stream_t *)client) == 0) { + uv_write_t *write_req = (uv_write_t *)malloc(sizeof(uv_write_t)); + uv_buf_t dummy_buf = uv_buf_init("a", 1); + // despatch to worker thread + pObj->workerIdx = (pObj->workerIdx + 1) % pObj->numOfThread; + uv_write2(write_req, (uv_stream_t *)&(pObj->pipe[pObj->workerIdx][0]), + &dummy_buf, 1, (uv_stream_t *)client, echo_write); + } else { + uv_close((uv_handle_t *)client, NULL); + } +} +void child_on_new_connection(uv_stream_t *q, ssize_t nread, + const uv_buf_t *buf) { + printf("x child_on_new_connection \n"); + if (nread < 0) { + if (nread != UV_EOF) + fprintf(stderr, "Read error %s\n", uv_err_name(nread)); + uv_close((uv_handle_t *)q, NULL); + return; + } + SThreadObj *pObj = (SThreadObj *)container_of(q, struct SThreadObj, pipe); + + uv_pipe_t *pipe = (uv_pipe_t *)q; + if (!uv_pipe_pending_count(pipe)) { + fprintf(stderr, "No pending count\n"); + return; + } + + uv_handle_type pending = uv_pipe_pending_type(pipe); + assert(pending == UV_TCP); + + SConnCtx *pConn = malloc(sizeof(SConnCtx)); + + /* init conn timer*/ + pConn->pTimer = malloc(sizeof(uv_timer_t)); + uv_timer_init(pObj->loop, pConn->pTimer); + + pConn->pClient = (uv_tcp_t *)malloc(sizeof(uv_tcp_t)); + pConn->pWorkerAsync = pObj->workerAsync; // thread safty + uv_tcp_init(pObj->loop, pConn->pClient); + + if (uv_accept(q, (uv_stream_t *)(pConn->pClient)) == 0) { + uv_os_fd_t fd; + uv_fileno((const uv_handle_t *)pConn->pClient, &fd); + fprintf(stderr, "Worker Accepted fd %d\n", fd); + uv_timer_start(pConn->pTimer, timeOutCallBack, TIMEOUT, 0); + uv_read_start((uv_stream_t *)(pConn->pClient), alloc_buffer, echo_read); + } else { + uv_timer_stop(pConn->pTimer); + free(pConn->pTimer); + uv_close((uv_handle_t *)pConn->pClient, NULL); + free(pConn->pClient); + free(pConn); + } +} + +static void workerAsyncCallback(uv_async_t *handle) { + SThreadObj *pObj = container_of(handle, SThreadObj, workerAsync); + // do nothing +} +void *worker_thread(void *arg) { + SThreadObj *pObj = (SThreadObj *)arg; + int fd = pObj->fd; + pObj->loop = (uv_loop_t *)malloc(sizeof(uv_loop_t)); + uv_loop_init(pObj->loop); + + uv_pipe_init(pObj->loop, pObj->pipe, 1); + uv_pipe_open(pObj->pipe, fd); + + pObj->workerAsync = malloc(sizeof(uv_async_t)); + uv_async_init(pObj->loop, pObj->workerAsync, workerAsyncCallback); + uv_read_start((uv_stream_t *)pObj->pipe, alloc_buffer, + child_on_new_connection); + + uv_run(pObj->loop, UV_RUN_DEFAULT); +} +int main() { + + SServerObj *server = calloc(1, sizeof(SServerObj)); + server->loop = (uv_loop_t *)malloc(sizeof(uv_loop_t)); + server->numOfThread = NUM_OF_THREAD; + server->workerIdx = 0; + server->pThreadObj = + (SThreadObj **)calloc(server->numOfThread, sizeof(SThreadObj *)); + server->pipe = (uv_pipe_t **)calloc(server->numOfThread, sizeof(uv_pipe_t *)); + + uv_loop_init(server->loop); + + for (int i = 0; i < server->numOfThread; i++) { + server->pThreadObj[i] = (SThreadObj *)calloc(1, sizeof(SThreadObj)); + server->pipe[i] = (uv_pipe_t *)calloc(2, sizeof(uv_pipe_t)); + int fds[2]; + if (uv_socketpair(AF_UNIX, SOCK_STREAM, fds, UV_NONBLOCK_PIPE, + UV_NONBLOCK_PIPE) != 0) { + return -1; + } + uv_pipe_init(server->loop, &(server->pipe[i][0]), 1); + uv_pipe_open(&(server->pipe[i][0]), fds[1]); // init write + + server->pThreadObj[i]->fd = fds[0]; + server->pThreadObj[i]->pipe = &(server->pipe[i][1]); // init read + int err = pthread_create(&(server->pThreadObj[i]->thread), NULL, + worker_thread, (void *)(server->pThreadObj[i])); + if (err == 0) { + printf("thread %d create\n", i); + } else { + printf("thread %d create failed", i); + } + + uv_tcp_init(server->loop, &server->server); + struct sockaddr_in bind_addr; + uv_ip4_addr("0.0.0.0", 7000, &bind_addr); + uv_tcp_bind(&server->server, (const struct sockaddr *)&bind_addr, 0); + int err = 0; + if ((err = uv_listen((uv_stream_t *)&server->server, 128, + on_new_connection)) != 0) { + fprintf(stderr, "Listen error %s\n", uv_err_name(err)); + return 2; + } + uv_run(server->loop, UV_RUN_DEFAULT); + return 0; + } +} From 37dcc3deb5827554740095e95ecb345a6a68cdf9 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 20 Jan 2022 19:19:00 +0800 Subject: [PATCH 03/10] refactor rpc --- source/libs/transport/test/rsclient.c | 196 ++++++++++++++++++++++++++ 1 file changed, 196 insertions(+) create mode 100644 source/libs/transport/test/rsclient.c diff --git a/source/libs/transport/test/rsclient.c b/source/libs/transport/test/rsclient.c new file mode 100644 index 0000000000..65170d4abb --- /dev/null +++ b/source/libs/transport/test/rsclient.c @@ -0,0 +1,196 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + + +#include "os.h" +#include "tutil.h" +#include "tglobal.h" +#include "rpcLog.h" +#include "trpc.h" +#include "taoserror.h" + +typedef struct { + int index; + SRpcEpSet epSet; + int num; + int numOfReqs; + int msgSize; + tsem_t rspSem; + tsem_t *pOverSem; + pthread_t thread; + void *pRpc; +} SInfo; + + +static int tcount = 0; +static int terror = 0; + +static void *sendRequest(void *param) { + SInfo *pInfo = (SInfo *)param; + SRpcMsg rpcMsg, rspMsg; + + tDebug("thread:%d, start to send request", pInfo->index); + + while ( pInfo->numOfReqs == 0 || pInfo->num < pInfo->numOfReqs) { + pInfo->num++; + rpcMsg.pCont = rpcMallocCont(pInfo->msgSize); + rpcMsg.contLen = pInfo->msgSize; + rpcMsg.handle = pInfo; + rpcMsg.msgType = 1; + tDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num); + + rpcSendRecv(pInfo->pRpc, &pInfo->epSet, &rpcMsg, &rspMsg); + + // handle response + if (rspMsg.code != 0) terror++; + + tDebug("thread:%d, rspLen:%d code:%d", pInfo->index, rspMsg.contLen, rspMsg.code); + + rpcFreeCont(rspMsg.pCont); + + if ( pInfo->num % 20000 == 0 ) + tInfo("thread:%d, %d requests have been sent", pInfo->index, pInfo->num); + } + + tDebug("thread:%d, it is over", pInfo->index); + tcount++; + + return NULL; +} + +int main(int argc, char *argv[]) { + SRpcInit rpcInit; + SRpcEpSet epSet; + int msgSize = 128; + int numOfReqs = 0; + int appThreads = 1; + char serverIp[40] = "127.0.0.1"; + char secret[TSDB_KEY_LEN] = "mypassword"; + struct timeval systemTime; + int64_t startTime, endTime; + pthread_attr_t thattr; + + // server info + epSet.numOfEps = 1; + epSet.inUse = 0; + epSet.port[0] = 7000; + epSet.port[1] = 7000; + strcpy(epSet.fqdn[0], serverIp); + strcpy(epSet.fqdn[1], "192.168.0.1"); + + // client info + memset(&rpcInit, 0, sizeof(rpcInit)); + //rpcInit.localIp = "0.0.0.0"; + rpcInit.localPort = 0; + rpcInit.label = "APP"; + rpcInit.numOfThreads = 1; + rpcInit.sessions = 100; + rpcInit.idleTime = tsShellActivityTimer*1000; + rpcInit.user = "michael"; + rpcInit.secret = secret; + rpcInit.ckey = "key"; + rpcInit.spi = 1; + rpcInit.connType = TAOS_CONN_CLIENT; + + for (int i=1; iindex = i; + pInfo->epSet = epSet; + pInfo->numOfReqs = numOfReqs; + pInfo->msgSize = msgSize; + tsem_init(&pInfo->rspSem, 0, 0); + pInfo->pRpc = pRpc; + pthread_create(&pInfo->thread, &thattr, sendRequest, pInfo); + pInfo++; + } + + do { + usleep(1); + } while ( tcount < appThreads); + + gettimeofday(&systemTime, NULL); + endTime = systemTime.tv_sec*1000000 + systemTime.tv_usec; + float usedTime = (endTime - startTime)/1000.0; // mseconds + + tInfo("it takes %.3f mseconds to send %d requests to server, error num:%d", usedTime, numOfReqs*appThreads, terror); + tInfo("Performance: %.3f requests per second, msgSize:%d bytes", 1000.0*numOfReqs*appThreads/usedTime, msgSize); + + taosCloseLog(); + + return 0; +} + + From 5e8cb50a20f296fb3f631c13dc4e2f93558faeb4 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 20 Jan 2022 20:01:27 +0800 Subject: [PATCH 04/10] refactor rpc --- source/libs/transport/inc/transComm.h | 2 ++ source/libs/transport/src/transCli.c | 24 +++++++++++++++++++----- source/libs/transport/src/transComm.c | 5 +++++ source/libs/transport/src/transSrv.c | 12 ++++++++++++ 4 files changed, 38 insertions(+), 5 deletions(-) diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 506b085ecd..4d3a1b70cb 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -199,4 +199,6 @@ void transBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey); bool transCompressMsg(char* msg, int32_t len, int32_t* flen); bool transDecompressMsg(char* msg, int32_t len, int32_t* flen); +void transConnCtxDestroy(STransConnCtx* ctx); + #endif diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 8622db9b3f..c96a0f81e0 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -63,6 +63,8 @@ static void clientAsyncCb(uv_async_t* handle); static void clientDestroy(uv_handle_t* handle); static void clientConnDestroy(SCliConn* pConn); +static void clientMsgDestroy(SCliMsg* pMsg); + static void* clientThread(void* arg); static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd); @@ -235,9 +237,22 @@ void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, } return cli; } +static void clientMsgDestroy(SCliMsg* pMsg) { + // impl later + free(pMsg); +} void taosCloseClient(void* arg) { // impl later SClientObj* cli = arg; + for (int i = 0; i < cli->numOfThreads; i++) { + SCliThrdObj* pThrd = cli->pThreadObj[i]; + pthread_join(pThrd->thread, NULL); + pthread_mutex_destroy(&pThrd->msgMtx); + free(pThrd->loop); + free(pThrd); + } + free(cli->pThreadObj); + free(cli); } void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) { @@ -247,19 +262,18 @@ void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* SRpcInfo* pRpc = (SRpcInfo*)shandle; - int len = rpcCompressRpcMsg(pMsg->pCont, pMsg->contLen); + int32_t flen = 0; + if (transCompressMsg(pMsg->pCont, pMsg->contLen, &flen)) { + // imp later + } STransConnCtx* pCtx = calloc(1, sizeof(STransConnCtx)); pCtx->pRpc = (SRpcInfo*)shandle; pCtx->ahandle = pMsg->ahandle; - // pContext->contLen = len; - // pContext->pCont = pMsg->pCont; pCtx->msgType = pMsg->msgType; pCtx->ip = strdup(ip); pCtx->port = port; - // pContext->epSet = *pEpSet; - // pContext->oldInUse = pEpSet->inUse; assert(pRpc->connType == TAOS_CONN_CLIENT); // atomic or not diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 27cff80586..617abeea39 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -107,6 +107,7 @@ int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen) { } bool transCompressMsg(char* msg, int32_t len, int32_t* flen) { + return false; // SRpcHead* pHead = rpcHeadFromCont(pCont); bool succ = false; int overhead = sizeof(STransCompMsg); @@ -186,4 +187,8 @@ SRpcHead* rpcDecompressRpcMsg(SRpcHead* pHead) { return pHead; } +void transConnCtxDestroy(STransConnCtx* ctx) { + free(ctx->ip); + free(ctx); +} #endif diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index 46caffd93c..f4625dc0b3 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -496,6 +496,7 @@ void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, for (int i = 0; i < srv->numOfThreads; i++) { SWorkThrdObj* thrd = (SWorkThrdObj*)calloc(1, sizeof(SWorkThrdObj)); + srv->pipe[i] = (uv_pipe_t*)calloc(2, sizeof(uv_pipe_t)); int fds[2]; if (uv_socketpair(AF_UNIX, SOCK_STREAM, fds, UV_NONBLOCK_PIPE, UV_NONBLOCK_PIPE) != 0) { @@ -530,6 +531,17 @@ void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void taosCloseServer(void* arg) { // impl later SServerObj* srv = arg; + for (int i = 0; i < srv->numOfThreads; i++) { + SWorkThrdObj* pThrd = srv->pThreadObj[i]; + pthread_join(pThrd->thread, NULL); + free(srv->pipe[i]); + free(pThrd->loop); + free(pThrd); + } + free(srv->loop); + free(srv->pipe); + free(srv->pThreadObj); + pthread_join(srv->thread, NULL); } void rpcSendResponse(const SRpcMsg* pMsg) { From f2f218402f5f9d1f1744344ff0ac13a6a9dd03e7 Mon Sep 17 00:00:00 2001 From: plum-lihui Date: Thu, 20 Jan 2022 20:34:51 +0800 Subject: [PATCH 05/10] [modify] --- tests/test/c/create_table.c | 25 ++++++++++++++++++------- 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/tests/test/c/create_table.c b/tests/test/c/create_table.c index 559367a275..ab4ef32854 100644 --- a/tests/test/c/create_table.c +++ b/tests/test/c/create_table.c @@ -181,9 +181,20 @@ void *threadFunc(void *param) { pError("index:%d, failed to connect to DB, reason:%s", pInfo->threadIndex, taos_errstr(NULL)); exit(1); } + + pError("====before thread:%d, table range: %"PRId64 " - %"PRId64 "\n", + pInfo->threadIndex, + pInfo->tableBeginIndex, + pInfo->tableEndIndex); - // printf("thread:%d, table range: %"PRId64 " - %"PRId64 "\n", pInfo->threadIndex, pInfo->tableBeginIndex, - // pInfo->tableEndIndex); + pInfo->tableBeginIndex += startOffset; + pInfo->tableEndIndex += startOffset; + + pError("====after thread:%d, table range: %"PRId64 " - %"PRId64 "\n", + pInfo->threadIndex, + pInfo->tableBeginIndex, + pInfo->tableEndIndex); + sprintf(qstr, "use %s", pInfo->dbName); TAOS_RES *pRes = taos_query(con, qstr); taos_free_result(pRes); @@ -211,7 +222,7 @@ void *threadFunc(void *param) { TAOS_RES *pRes = taos_query(con, qstr); code = taos_errno(pRes); if ((code != 0) && (code != TSDB_CODE_RPC_AUTH_REQUIRED)) { - pError("failed to create table t%" PRId64 ", reason:%s", t, tstrerror(code)); + pError("failed to create table reason:%s, sql: %s", tstrerror(code), qstr); } taos_free_result(pRes); int64_t endTs = taosGetTimestampUs(); @@ -297,7 +308,7 @@ void printHelp() { printf("%s%s%s%d\n", indent, indent, "numOfThreads, default is ", numOfThreads); printf("%s%s\n", indent, "-n"); printf("%s%s%s%" PRId64 "\n", indent, indent, "numOfTables, default is ", numOfTables); - printf("%s%s\n", indent, "-o"); + printf("%s%s\n", indent, "-g"); printf("%s%s%s%" PRId64 "\n", indent, indent, "startOffset, default is ", startOffset); printf("%s%s\n", indent, "-v"); printf("%s%s%s%d\n", indent, indent, "numOfVgroups, default is ", numOfVgroups); @@ -332,7 +343,7 @@ void parseArgument(int32_t argc, char *argv[]) { numOfThreads = atoi(argv[++i]); } else if (strcmp(argv[i], "-n") == 0) { numOfTables = atoll(argv[++i]); - } else if (strcmp(argv[i], "-o") == 0) { + } else if (strcmp(argv[i], "-g") == 0) { startOffset = atoll(argv[++i]); } else if (strcmp(argv[i], "-v") == 0) { numOfVgroups = atoi(argv[++i]); @@ -412,8 +423,8 @@ int32_t main(int32_t argc, char *argv[]) { int64_t tableFrom = 0; for (int32_t i = 0; i < numOfThreads; ++i) { - pInfo[i].tableBeginIndex = tableFrom + startOffset; - pInfo[i].tableEndIndex = (i < b ? tableFrom + a : tableFrom + a - 1) + startOffset; + pInfo[i].tableBeginIndex = tableFrom; + pInfo[i].tableEndIndex = (i < b ? tableFrom + a : tableFrom + a - 1); tableFrom = pInfo[i].tableEndIndex + 1; pInfo[i].threadIndex = i; pInfo[i].minDelay = INT64_MAX; From 906e0e785fe2a3cd64e54ab3321e93f0e8f3bc21 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 20 Jan 2022 20:54:08 +0800 Subject: [PATCH 06/10] refactor rpc --- source/libs/transport/inc/transComm.h | 7 +++ source/libs/transport/src/trans.c | 10 +--- source/libs/transport/src/transCli.c | 68 ++++++++++++++++++++++++++- source/libs/transport/src/transSrv.c | 18 +++---- 4 files changed, 81 insertions(+), 22 deletions(-) diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 4d3a1b70cb..c760acd52e 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -201,4 +201,11 @@ bool transDecompressMsg(char* msg, int32_t len, int32_t* flen); void transConnCtxDestroy(STransConnCtx* ctx); +typedef struct SConnBuffer { + char* buf; + int len; + int cap; + int left; +} SConnBuffer; + #endif diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index 48398316f1..cf1e153965 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -17,13 +17,6 @@ #include "transComm.h" -typedef struct SConnBuffer { - char* buf; - int len; - int cap; - int left; -} SConnBuffer; - void* (*taosInitHandle[])(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) = { taosInitServer, taosInitClient}; void (*taosCloseHandle[])(void* arg) = {taosCloseServer, taosCloseClient}; @@ -46,10 +39,11 @@ void* rpcOpen(const SRpcInit* pInit) { void rpcClose(void* arg) { SRpcInfo* pRpc = (SRpcInfo*)arg; (*taosCloseHandle[pRpc->connType])(pRpc->tcphandle); + free(pRpc); return; } void* rpcMallocCont(int contLen) { - int size = contLen + RPC_MSG_OVERHEAD; + int size = contLen + TRANS_MSG_OVERHEAD; char* start = (char*)calloc(1, (size_t)size); if (start == NULL) { diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index c96a0f81e0..86e9c05ccb 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -21,6 +21,7 @@ typedef struct SCliConn { uv_connect_t connReq; uv_stream_t* stream; uv_write_t* writeReq; + SConnBuffer readBuf; void* data; queue conn; char spi; @@ -55,9 +56,17 @@ typedef struct SClientObj { static SCliConn* getConnFromCache(void* cache, char* ip, uint32_t port); static void addConnToCache(void* cache, char* ip, uint32_t port, SCliConn* conn); +// process data read from server, auth/decompress etc +static void clientProcessData(SCliConn* conn); +// check whether already read complete packet from server +static bool clientReadComplete(SConnBuffer* pBuf); +// alloc buf for read static void clientAllocBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); +// callback after read nbytes from socket static void clientReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf); +// callback after write data to socket static void clientWriteCb(uv_write_t* req, int status); +// callback after conn to server static void clientConnCb(uv_connect_t* req, int status); static void clientAsyncCb(uv_async_t* handle); static void clientDestroy(uv_handle_t* handle); @@ -67,18 +76,72 @@ static void clientMsgDestroy(SCliMsg* pMsg); static void* clientThread(void* arg); +static void clientProcessData(SCliConn* conn) { + // impl +} static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd); +static bool clientReadComplete(SConnBuffer* data) { + STransMsgHead head; + int32_t headLen = sizeof(head); + if (data->len >= headLen) { + memcpy((char*)&head, data->buf, headLen); + int32_t msgLen = (int32_t)htonl((uint32_t)head.msgLen); + if (msgLen > data->len) { + data->left = msgLen - data->len; + return false; + } else { + return true; + } + } else { + return false; + } +} static void clientAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { // impl later + static const int CAPACITY = 512; + + SCliConn* conn = handle->data; + SConnBuffer* pBuf = &conn->readBuf; + if (pBuf->cap == 0) { + pBuf->buf = (char*)calloc(CAPACITY, sizeof(char)); + pBuf->len = 0; + pBuf->cap = CAPACITY; + pBuf->left = -1; + buf->base = pBuf->buf; + buf->len = CAPACITY; + } else { + if (pBuf->len >= pBuf->cap) { + if (pBuf->left == -1) { + pBuf->cap *= 2; + pBuf->buf = realloc(pBuf->buf, pBuf->cap); + } else if (pBuf->len + pBuf->left > pBuf->cap) { + pBuf->cap = pBuf->len + pBuf->left; + pBuf->buf = realloc(pBuf->buf, pBuf->len + pBuf->left); + } + } + buf->base = pBuf->buf + pBuf->len; + buf->len = pBuf->cap - pBuf->len; + } } static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) { // impl later - SCliConn* conn = handle->data; + SCliConn* conn = handle->data; + SConnBuffer* pBuf = &conn->readBuf; if (nread > 0) { + pBuf->len += nread; + if (clientReadComplete(pBuf)) { + tDebug("alread read complete pack"); + clientProcessData(conn); + } else { + tDebug("read halp packet, continue to read"); + } return; } + if (nread != UV_EOF) { + tDebug("Read error %s\n", uv_err_name(nread)); + } // uv_close((uv_handle_t*)handle, clientDestroy); } @@ -166,7 +229,7 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { conn->writeReq->data = conn; clientWrite(conn); } else { - SCliConn* conn = malloc(sizeof(SCliConn)); + SCliConn* conn = calloc(1, sizeof(SCliConn)); conn->stream = (uv_stream_t*)malloc(sizeof(uv_tcp_t)); uv_tcp_init(pThrd->loop, (uv_tcp_t*)(conn->stream)); @@ -248,6 +311,7 @@ void taosCloseClient(void* arg) { SCliThrdObj* pThrd = cli->pThreadObj[i]; pthread_join(pThrd->thread, NULL); pthread_mutex_destroy(&pThrd->msgMtx); + free(pThrd->cliAsync); free(pThrd->loop); free(pThrd); } diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index f4625dc0b3..d096ab7813 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -16,13 +16,6 @@ #ifdef USE_UV #include "transComm.h" -typedef struct SConnBuffer { - char* buf; - int len; - int cap; - int left; -} SConnBuffer; - typedef struct SConn { uv_tcp_t* pTcp; uv_write_t* pWriter; @@ -100,7 +93,8 @@ static void* acceptThread(void* arg); void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { /* * formate of data buffer: - * |<-------SRpcReqContext------->|<------------data read from socket----------->| + * |<--------------------------data from socket------------------------------->| + * |<------STransMsgHead------->|<-------------------other data--------------->| */ static const int CAPACITY = 1024; @@ -133,7 +127,6 @@ void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* b // static bool readComplete(SConnBuffer* data) { // TODO(yihao): handle pipeline later - // SRpcHead rpcHead; STransMsgHead head; int32_t headLen = sizeof(head); if (data->len >= headLen) { @@ -270,13 +263,13 @@ static void uvProcessData(SConn* pConn) { void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { // opt - SConn* ctx = cli->data; - SConnBuffer* pBuf = &ctx->connBuf; + SConn* conn = cli->data; + SConnBuffer* pBuf = &conn->connBuf; if (nread > 0) { pBuf->len += nread; if (readComplete(pBuf)) { tDebug("alread read complete packet"); - uvProcessData(ctx); + uvProcessData(conn); } else { tDebug("read half packet, continue to read"); } @@ -542,6 +535,7 @@ void taosCloseServer(void* arg) { free(srv->pipe); free(srv->pThreadObj); pthread_join(srv->thread, NULL); + free(srv); } void rpcSendResponse(const SRpcMsg* pMsg) { From bcd6df15ef20a80829f0e3a26f66799191bdf960 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 20 Jan 2022 22:16:07 +0800 Subject: [PATCH 07/10] refactor rpc --- source/libs/transport/src/transCli.c | 63 ++++++++++++++++++++++++---- source/libs/transport/src/transSrv.c | 18 ++++---- source/libs/transport/test/rclient.c | 2 + 3 files changed, 67 insertions(+), 16 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 86e9c05ccb..2b2fcb557f 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -52,7 +52,14 @@ typedef struct SClientObj { SCliThrdObj** pThreadObj; } SClientObj; +typedef struct SConnList { + queue conn; +} SConnList; + // conn pool +// add expire timeout and capacity limit +static void* connCacheCreate(int size); +static void* connCacheDestroy(void* cache); static SCliConn* getConnFromCache(void* cache, char* ip, uint32_t port); static void addConnToCache(void* cache, char* ip, uint32_t port, SCliConn* conn); @@ -81,6 +88,53 @@ static void clientProcessData(SCliConn* conn) { } static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd); +static void* connCacheCreate(int size) { + SHashObj* cache = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); + return false; +} +static void* connCacheDestroy(void* cache) { + SConnList* connList = taosHashIterate((SHashObj*)cache, NULL); + while (!QUEUE_IS_EMPTY(&connList->conn)) { + queue* h = QUEUE_HEAD(&connList->conn); + QUEUE_REMOVE(h); + SCliConn* c = QUEUE_DATA(h, SCliConn, conn); + clientConnDestroy(c); + } + taosHashClear(cache); +} + +static SCliConn* getConnFromCache(void* cache, char* ip, uint32_t port) { + char key[128] = {0}; + tstrncpy(key, ip, strlen(ip)); + tstrncpy(key + strlen(key), (char*)(&port), sizeof(port)); + + SHashObj* pCache = cache; + SConnList* plist = taosHashGet(pCache, key, strlen(key)); + if (plist == NULL) { + SConnList list; + plist = &list; + QUEUE_INIT(&plist->conn); + taosHashPut(pCache, key, strlen(key), plist, sizeof(*plist)); + } + + if (QUEUE_IS_EMPTY(&plist->conn)) { + return NULL; + } + queue* h = QUEUE_HEAD(&plist->conn); + QUEUE_REMOVE(h); + return QUEUE_DATA(h, SCliConn, conn); +} +static void addConnToCache(void* cache, char* ip, uint32_t port, SCliConn* conn) { + char key[128] = {0}; + tstrncpy(key, ip, strlen(ip)); + tstrncpy(key + strlen(key), (char*)(&port), sizeof(port)); + + SHashObj* pCache = cache; + SConnList* plist = taosHashGet(pCache, key, strlen(key)); + // list already create before + assert(plist != NULL); + QUEUE_PUSH(&plist->conn, &conn->conn); +} static bool clientReadComplete(SConnBuffer* data) { STransMsgHead head; int32_t headLen = sizeof(head); @@ -206,15 +260,6 @@ static void clientConnCb(uv_connect_t* req, int status) { clientWrite(pConn); } -static SCliConn* getConnFromCache(void* cache, char* ip, uint32_t port) { - // impl later - - return NULL; -} -static void addConnToCache(void* cache, char* ip, uint32_t port, SCliConn* conn) { - // impl later -} - static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { uint64_t et = taosGetTimestampUs(); uint64_t el = et - pMsg->st; diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index d096ab7813..4542541043 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -210,8 +210,8 @@ static int uvAuthMsg(SConn* pConn, char* msg, int len) { // refers specifically to query or insert timeout static void uvHandleActivityTimeout(uv_timer_t* handle) { - // impl later SConn* conn = handle->data; + tDebug("%p timeout since no activity", conn); } static void uvProcessData(SConn* pConn) { @@ -232,12 +232,13 @@ static void uvProcessData(SConn* pConn) { SRpcInfo* pRpc = (SRpcInfo*)p->shandle; // auth here + // auth should not do in rpc thread - int8_t code = uvAuthMsg(pConn, (char*)pHead, p->msgLen); - if (code != 0) { - terrno = code; - return; - } + // int8_t code = uvAuthMsg(pConn, (char*)pHead, p->msgLen); + // if (code != 0) { + // terrno = code; + // return; + //} pHead->code = htonl(pHead->code); int32_t dlen = 0; @@ -248,7 +249,7 @@ static void uvProcessData(SConn* pConn) { } else { // impl later } - rpcMsg.contLen = rpcContLenFromMsg(pHead->msgLen); + rpcMsg.contLen = transContLenFromMsg(pHead->msgLen); rpcMsg.pCont = pHead->content; rpcMsg.msgType = pHead->msgType; rpcMsg.code = pHead->code; @@ -318,6 +319,9 @@ void uvWorkerAsyncCb(uv_async_t* handle) { return; } uv_buf_t wb = uv_buf_init(conn->writeBuf.buf, conn->writeBuf.len); + + uv_timer_stop(conn->pTimer); + uv_write(conn->pWriter, (uv_stream_t*)conn->pTcp, &wb, 1, uvOnWriteCb); } } diff --git a/source/libs/transport/test/rclient.c b/source/libs/transport/test/rclient.c index 6339e58560..4ccbb60cc2 100644 --- a/source/libs/transport/test/rclient.c +++ b/source/libs/transport/test/rclient.c @@ -40,6 +40,7 @@ static void processResponse(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) { if (pEpSet) pInfo->epSet = *pEpSet; rpcFreeCont(pMsg->pCont); + // tsem_post(&pInfo->rspSem); tsem_post(&pInfo->rspSem); } @@ -60,6 +61,7 @@ static void *sendRequest(void *param) { // tDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num); rpcSendRequest(pInfo->pRpc, &pInfo->epSet, &rpcMsg, NULL); if (pInfo->num % 20000 == 0) tInfo("thread:%d, %d requests have been sent", pInfo->index, pInfo->num); + // tsem_wait(&pInfo->rspSem); tsem_wait(&pInfo->rspSem); } From 930ffd3eb833395f1db1f483f132bec7de1ebb51 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 20 Jan 2022 23:21:41 +0800 Subject: [PATCH 08/10] refactor rpc --- source/libs/transport/src/transCli.c | 69 ++++++++++++++++++++++++---- 1 file changed, 60 insertions(+), 9 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 2b2fcb557f..f2d844f73d 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -26,6 +26,7 @@ typedef struct SCliConn { queue conn; char spi; char secured; + uint64_t expireTime; } SCliConn; typedef struct SCliMsg { @@ -39,10 +40,13 @@ typedef struct SCliThrdObj { pthread_t thread; uv_loop_t* loop; uv_async_t* cliAsync; // - void* cache; // conn pool + uv_timer_t* pTimer; + void* cache; // conn pool queue msg; pthread_mutex_t msgMtx; - void* shandle; + uint64_t nextTimeout; // next timeout + void* shandle; // + } SCliThrdObj; typedef struct SClientObj { @@ -63,6 +67,8 @@ static void* connCacheDestroy(void* cache); static SCliConn* getConnFromCache(void* cache, char* ip, uint32_t port); static void addConnToCache(void* cache, char* ip, uint32_t port, SCliConn* conn); +// register timer in each thread to clear expire conn +static void clientTimeoutCb(uv_timer_t* handle); // process data read from server, auth/decompress etc static void clientProcessData(SCliConn* conn); // check whether already read complete packet from server @@ -84,21 +90,55 @@ static void clientMsgDestroy(SCliMsg* pMsg); static void* clientThread(void* arg); static void clientProcessData(SCliConn* conn) { + STransConnCtx* pCtx = ((SCliMsg*)conn->data)->ctx; + SRpcInfo* pRpc = pCtx->ahandle; + SRpcMsg rpcMsg; + + rpcMsg.pCont = conn->readBuf.buf; + rpcMsg.contLen = conn->readBuf.len; + rpcMsg.ahandle = pCtx->ahandle; + (pRpc->cfp)(NULL, &rpcMsg, NULL); // impl } static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd); +static void clientTimeoutCb(uv_timer_t* handle) { + SCliThrdObj* pThrd = handle->data; + SRpcInfo* pRpc = pThrd->shandle; + int64_t currentTime = pThrd->nextTimeout; + + SConnList* p = taosHashIterate((SHashObj*)pThrd->cache, NULL); + while (p != NULL) { + while (!QUEUE_IS_EMPTY(&p->conn)) { + queue* h = QUEUE_HEAD(&p->conn); + SCliConn* c = QUEUE_DATA(h, SCliConn, conn); + if (c->expireTime < currentTime) { + QUEUE_REMOVE(h); + clientConnDestroy(c); + } else { + break; + } + } + p = taosHashIterate((SHashObj*)pThrd->cache, p); + } + + pThrd->nextTimeout = taosGetTimestampMs() + pRpc->idleTime * 1000 * 10; + uv_timer_start(handle, clientTimeoutCb, pRpc->idleTime * 10, 0); +} static void* connCacheCreate(int size) { SHashObj* cache = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); return false; } static void* connCacheDestroy(void* cache) { SConnList* connList = taosHashIterate((SHashObj*)cache, NULL); - while (!QUEUE_IS_EMPTY(&connList->conn)) { - queue* h = QUEUE_HEAD(&connList->conn); - QUEUE_REMOVE(h); - SCliConn* c = QUEUE_DATA(h, SCliConn, conn); - clientConnDestroy(c); + while (connList != NULL) { + while (!QUEUE_IS_EMPTY(&connList->conn)) { + queue* h = QUEUE_HEAD(&connList->conn); + QUEUE_REMOVE(h); + SCliConn* c = QUEUE_DATA(h, SCliConn, conn); + clientConnDestroy(c); + } + connList = taosHashIterate((SHashObj*)cache, connList); } taosHashClear(cache); } @@ -129,8 +169,10 @@ static void addConnToCache(void* cache, char* ip, uint32_t port, SCliConn* conn) tstrncpy(key, ip, strlen(ip)); tstrncpy(key + strlen(key), (char*)(&port), sizeof(port)); - SHashObj* pCache = cache; - SConnList* plist = taosHashGet(pCache, key, strlen(key)); + STransConnCtx* ctx = ((SCliMsg*)conn->data)->ctx; + SRpcInfo* pRpc = ctx->pRpc; + conn->expireTime = taosGetTimestampMs() + pRpc->idleTime * 1000 * 10; + SConnList* plist = taosHashGet((SHashObj*)cache, key, strlen(key)); // list already create before assert(plist != NULL); QUEUE_PUSH(&plist->conn, &conn->conn); @@ -206,6 +248,7 @@ static void clientConnDestroy(SCliConn* conn) { } static void clientDestroy(uv_handle_t* handle) { SCliConn* conn = handle->data; + QUEUE_REMOVE(&conn->conn); clientConnDestroy(conn); } @@ -279,6 +322,7 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { conn->stream = (uv_stream_t*)malloc(sizeof(uv_tcp_t)); uv_tcp_init(pThrd->loop, (uv_tcp_t*)(conn->stream)); conn->writeReq = malloc(sizeof(uv_write_t)); + QUEUE_INIT(&conn->conn); conn->connReq.data = conn; conn->data = pMsg; @@ -315,6 +359,9 @@ static void clientAsyncCb(uv_async_t* handle) { static void* clientThread(void* arg) { SCliThrdObj* pThrd = (SCliThrdObj*)arg; + SRpcInfo* pRpc = pThrd->shandle; + pThrd->nextTimeout = taosGetTimestampMs() + pRpc->idleTime * 1000 * 10; + uv_timer_start(pThrd->pTimer, clientTimeoutCb, pRpc->idleTime * 10, 0); uv_run(pThrd->loop, UV_RUN_DEFAULT); } @@ -336,7 +383,11 @@ void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, uv_async_init(pThrd->loop, pThrd->cliAsync, clientAsyncCb); pThrd->cliAsync->data = pThrd; + pThrd->pTimer = malloc(sizeof(uv_timer_t)); + uv_timer_init(pThrd->loop, pThrd->pTimer); + pThrd->shandle = shandle; + int err = pthread_create(&pThrd->thread, NULL, clientThread, (void*)(pThrd)); if (err == 0) { tDebug("sucess to create tranport-client thread %d", i); From 6c0842fb05fefde4a9096c6ae413841466be96d5 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 20 Jan 2022 23:57:21 +0800 Subject: [PATCH 09/10] add index test --- source/libs/index/test/indexTests.cc | 114 ++++++++++++++++++++++++--- 1 file changed, 102 insertions(+), 12 deletions(-) diff --git a/source/libs/index/test/indexTests.cc b/source/libs/index/test/indexTests.cc index 4f3330b7b3..bbcc654ae2 100644 --- a/source/libs/index/test/indexTests.cc +++ b/source/libs/index/test/indexTests.cc @@ -82,7 +82,9 @@ class FstReadMemory { bool init() { char* buf = (char*)calloc(1, sizeof(char) * _size); int nRead = fstCountingWriterRead(_w, (uint8_t*)buf, _size); - if (nRead <= 0) { return false; } + if (nRead <= 0) { + return false; + } _size = nRead; _s = fstSliceCreate((uint8_t*)buf, _size); _fst = fstCreate(&_s); @@ -108,7 +110,9 @@ class FstReadMemory { StreamWithState* st = streamBuilderIntoStream(sb); StreamWithStateResult* rt = NULL; - while ((rt = streamWithStateNextWith(st, NULL)) != NULL) { result.push_back((uint64_t)(rt->out.out)); } + while ((rt = streamWithStateNextWith(st, NULL)) != NULL) { + result.push_back((uint64_t)(rt->out.out)); + } return true; } bool SearchWithTimeCostUs(AutomationCtx* ctx, std::vector& result) { @@ -184,7 +188,9 @@ void checkFstPerf() { delete fw; FstReadMemory* m = new FstReadMemory(1024 * 64); - if (m->init()) { printf("success to init fst read"); } + if (m->init()) { + printf("success to init fst read"); + } Performance_fstReadRecords(m); delete m; } @@ -348,7 +354,9 @@ class TFileObj { tfileReaderDestroy(reader_); reader_ = NULL; } - if (writer_ == NULL) { InitWriter(); } + if (writer_ == NULL) { + InitWriter(); + } return tfileWriterPut(writer_, tv, false); } bool InitWriter() { @@ -388,8 +396,12 @@ class TFileObj { return tfileReaderSearch(reader_, query, result); } ~TFileObj() { - if (writer_) { tfileWriterDestroy(writer_); } - if (reader_) { tfileReaderDestroy(reader_); } + if (writer_) { + tfileWriterDestroy(writer_); + } + if (reader_) { + tfileReaderDestroy(reader_); + } } private: @@ -912,7 +924,8 @@ TEST_F(IndexEnv2, testIndex_serarch_cache_and_tfile) { } TEST_F(IndexEnv2, testIndex_MultiWrite_and_MultiRead) { std::string path = "/tmp/cache_and_tfile"; - if (index->Init(path) != 0) {} + if (index->Init(path) != 0) { + } std::thread threads[NUM_OF_THREAD]; for (int i = 0; i < NUM_OF_THREAD; i++) { @@ -927,14 +940,24 @@ TEST_F(IndexEnv2, testIndex_MultiWrite_and_MultiRead) { TEST_F(IndexEnv2, testIndex_restart) { std::string path = "/tmp/cache_and_tfile"; - if (index->Init(path) != 0) {} + if (index->Init(path) != 0) { + } + index->SearchOneTarget("tag1", "Hello", 10); + index->SearchOneTarget("tag2", "Test", 10); +} +TEST_F(IndexEnv2, testIndex_restart1) { + std::string path = "/tmp/cache_and_tfile"; + if (index->Init(path) != 0) { + } + index->ReadMultiMillonData("tag1", "coding"); index->SearchOneTarget("tag1", "Hello", 10); index->SearchOneTarget("tag2", "Test", 10); } TEST_F(IndexEnv2, testIndex_read_performance) { std::string path = "/tmp/cache_and_tfile"; - if (index->Init(path) != 0) {} + if (index->Init(path) != 0) { + } index->PutOneTarge("tag1", "Hello", 12); index->PutOneTarge("tag1", "Hello", 15); index->ReadMultiMillonData("tag1", "Hello"); @@ -943,17 +966,84 @@ TEST_F(IndexEnv2, testIndex_read_performance) { } TEST_F(IndexEnv2, testIndexMultiTag) { std::string path = "/tmp/multi_tag"; - if (index->Init(path) != 0) {} + if (index->Init(path) != 0) { + } int64_t st = taosGetTimestampUs(); int32_t num = 1000 * 10000; index->WriteMultiMillonData("tag1", "xxxxxxxxxxxxxxx", num); std::cout << "numOfRow: " << num << "\ttime cost:" << taosGetTimestampUs() - st << std::endl; // index->WriteMultiMillonData("tag2", "xxxxxxxxxxxxxxxxxxxxxxxxx", 100 * 10000); } -TEST_F(IndexEnv2, testLongComVal) { +TEST_F(IndexEnv2, testLongComVal1) { std::string path = "/tmp/long_colVal"; - if (index->Init(path) != 0) {} + if (index->Init(path) != 0) { + } // gen colVal by randstr std::string randstr = "xxxxxxxxxxxxxxxxx"; index->WriteMultiMillonData("tag1", randstr, 100 * 10000); } + +TEST_F(IndexEnv2, testLongComVal2) { + std::string path = "/tmp/long_colVal"; + if (index->Init(path) != 0) { + } + // gen colVal by randstr + std::string randstr = "abcccc fdadfafdafda"; + index->WriteMultiMillonData("tag1", randstr, 100 * 10000); +} +TEST_F(IndexEnv2, testLongComVal3) { + std::string path = "/tmp/long_colVal"; + if (index->Init(path) != 0) { + } + // gen colVal by randstr + std::string randstr = "Yes, coding and coding and coding"; + index->WriteMultiMillonData("tag1", randstr, 100 * 10000); +} +TEST_F(IndexEnv2, testLongComVal4) { + std::string path = "/tmp/long_colVal"; + if (index->Init(path) != 0) { + } + // gen colVal by randstr + std::string randstr = "111111 bac fdadfa"; + index->WriteMultiMillonData("tag1", randstr, 100 * 10000); +} +TEST_F(IndexEnv2, testIndex_read_performance1) { + std::string path = "/tmp/cache_and_tfile"; + if (index->Init(path) != 0) { + } + index->PutOneTarge("tag1", "Hello", 12); + index->PutOneTarge("tag1", "Hello", 15); + index->ReadMultiMillonData("tag1", "Hello", 1000); + std::cout << "reader sz: " << index->SearchOne("tag1", "Hello") << std::endl; + assert(3 == index->SearchOne("tag1", "Hello")); +} +TEST_F(IndexEnv2, testIndex_read_performance2) { + std::string path = "/tmp/cache_and_tfile"; + if (index->Init(path) != 0) { + } + index->PutOneTarge("tag1", "Hello", 12); + index->PutOneTarge("tag1", "Hello", 15); + index->ReadMultiMillonData("tag1", "Hello", 1000 * 10); + std::cout << "reader sz: " << index->SearchOne("tag1", "Hello") << std::endl; + assert(3 == index->SearchOne("tag1", "Hello")); +} +TEST_F(IndexEnv2, testIndex_read_performance3) { + std::string path = "/tmp/cache_and_tfile"; + if (index->Init(path) != 0) { + } + index->PutOneTarge("tag1", "Hello", 12); + index->PutOneTarge("tag1", "Hello", 15); + index->ReadMultiMillonData("tag1", "Hello", 1000 * 100); + std::cout << "reader sz: " << index->SearchOne("tag1", "Hello") << std::endl; + assert(3 == index->SearchOne("tag1", "Hello")); +} +TEST_F(IndexEnv2, testIndex_read_performance4) { + std::string path = "/tmp/cache_and_tfile"; + if (index->Init(path) != 0) { + } + index->PutOneTarge("tag10", "Hello", 12); + index->PutOneTarge("tag12", "Hello", 15); + index->ReadMultiMillonData("tag10", "Hello", 1000 * 100); + std::cout << "reader sz: " << index->SearchOne("tag1", "Hello") << std::endl; + assert(3 == index->SearchOne("tag10", "Hello")); +} From 616a02f5601b35951f66377a8f0b67c514d4db21 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 21 Jan 2022 00:01:23 +0800 Subject: [PATCH 10/10] add test --- source/libs/transport/src/trans.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index cf1e153965..4b490936cc 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -70,6 +70,7 @@ int32_t rpcInit(void) { void rpcCleanup(void) { // impl later + // return; } #endif