diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 7e5874d8cd..f197e72ec5 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -23,6 +23,8 @@ typedef struct SCliConn { uv_write_t* writeReq; void* data; queue conn; + char spi; + char secured; } SCliConn; typedef struct SCliMsg { SRpcReqContext* context; @@ -47,6 +49,10 @@ typedef struct SClientObj { SCliThrdObj** pThreadObj; } SClientObj; +// conn pool +static SCliConn* getConnFromCache(void* cache, char* ip, uint32_t port); +static void addConnToCache(void* cache, char* ip, uint32_t port, SCliConn* conn); + static void clientAllocrReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); static void clientReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf); static void clientWriteCb(uv_write_t* req, int status); @@ -93,6 +99,16 @@ static void clientWriteCb(uv_write_t* req, int status) { uv_read_start((uv_stream_t*)pConn->stream, clientAllocrReadBufferCb, clientReadCb); // impl later } + +static void clientWrite(SCliConn* pConn) { + SCliMsg* pMsg = pConn->data; + SRpcHead* pHead = rpcHeadFromCont(pMsg->context->pCont); + int msgLen = rpcMsgLenFromCont(pMsg->context->contLen); + char* msg = (char*)(pHead); + + uv_buf_t wb = uv_buf_init(msg, msgLen); + uv_write(pConn->writeReq, (uv_stream_t*)pConn->stream, &wb, 1, clientWriteCb); +} static void clientConnCb(uv_connect_t* req, int status) { // impl later SCliConn* pConn = req->data; @@ -105,8 +121,8 @@ static void clientConnCb(uv_connect_t* req, int status) { SCliMsg* pMsg = pConn->data; SEpSet* pEpSet = &pMsg->context->epSet; SRpcMsg rpcMsg; - rpcMsg.ahandle = pMsg->context->ahandle; - rpcMsg.pCont = NULL; + // rpcMsg.ahandle = pMsg->context->ahandle; + // rpcMsg.pCont = NULL; char* fqdn = pEpSet->fqdn[pEpSet->inUse]; uint32_t port = pEpSet->port[pEpSet->inUse]; @@ -119,15 +135,16 @@ static void clientConnCb(uv_connect_t* req, int status) { return; } assert(pConn->stream == req->handle); - - uv_buf_t wb; - uv_write(pConn->writeReq, (uv_stream_t*)pConn->stream, &wb, 1, clientWriteCb); } static SCliConn* getConnFromCache(void* cache, char* ip, uint32_t port) { // impl later + return NULL; } +static void addConnToCache(void* cache, char* ip, uint32_t port, SCliConn* conn) { + // impl later +} static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { SEpSet* pEpSet = &pMsg->context->epSet; @@ -140,11 +157,12 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { SCliConn* conn = getConnFromCache(pThrd->cache, fqdn, port); if (conn != NULL) { + // impl later conn->data = pMsg; conn->writeReq->data = conn; - uv_buf_t wb; - uv_write(conn->writeReq, (uv_stream_t*)conn->stream, &wb, 1, clientWriteCb); - // impl later + clientWrite(conn); + // uv_buf_t wb; + // uv_write(conn->writeReq, (uv_stream_t*)conn->stream, &wb, 1, clientWriteCb); } else { SCliConn* conn = malloc(sizeof(SCliConn));