add client
This commit is contained in:
parent
7172921886
commit
c063ea6913
|
@ -105,35 +105,98 @@ typedef struct {
|
||||||
SRpcInfo* pRpc; // associated SRpcInfo
|
SRpcInfo* pRpc; // associated SRpcInfo
|
||||||
SEpSet epSet; // ip list provided by app
|
SEpSet epSet; // ip list provided by app
|
||||||
void* ahandle; // handle provided by app
|
void* ahandle; // handle provided by app
|
||||||
struct SRpcConn* pConn; // pConn allocated
|
// struct SRpcConn* pConn; // pConn allocated
|
||||||
tmsg_t msgType; // message type
|
tmsg_t msgType; // message type
|
||||||
uint8_t* pCont; // content provided by app
|
uint8_t* pCont; // content provided by app
|
||||||
int32_t contLen; // content length
|
int32_t contLen; // content length
|
||||||
int32_t code; // error code
|
// int32_t code; // error code
|
||||||
int16_t numOfTry; // number of try for different servers
|
// int16_t numOfTry; // number of try for different servers
|
||||||
int8_t oldInUse; // server EP inUse passed by app
|
// int8_t oldInUse; // server EP inUse passed by app
|
||||||
int8_t redirect; // flag to indicate redirect
|
// int8_t redirect; // flag to indicate redirect
|
||||||
int8_t connType; // connection type
|
int8_t connType; // connection type
|
||||||
int64_t rid; // refId returned by taosAddRef
|
int64_t rid; // refId returned by taosAddRef
|
||||||
SRpcMsg* pRsp; // for synchronous API
|
SRpcMsg* pRsp; // for synchronous API
|
||||||
tsem_t* pSem; // for synchronous API
|
tsem_t* pSem; // for synchronous API
|
||||||
SEpSet* pSet; // for synchronous API
|
char* ip;
|
||||||
char msg[0]; // RpcHead starts from here
|
uint32_t port;
|
||||||
|
// SEpSet* pSet; // for synchronous API
|
||||||
} SRpcReqContext;
|
} SRpcReqContext;
|
||||||
|
|
||||||
#define container_of(ptr, type, member) ((type*)((char*)(ptr)-offsetof(type, member)))
|
typedef struct {
|
||||||
#define RPC_RESERVE_SIZE (sizeof(SRpcReqContext))
|
SRpcInfo* pRpc; // associated SRpcInfo
|
||||||
|
SEpSet epSet; // ip list provided by app
|
||||||
|
void* ahandle; // handle provided by app
|
||||||
|
// struct SRpcConn* pConn; // pConn allocated
|
||||||
|
tmsg_t msgType; // message type
|
||||||
|
uint8_t* pCont; // content provided by app
|
||||||
|
int32_t contLen; // content length
|
||||||
|
// int32_t code; // error code
|
||||||
|
// int16_t numOfTry; // number of try for different servers
|
||||||
|
// int8_t oldInUse; // server EP inUse passed by app
|
||||||
|
// int8_t redirect; // flag to indicate redirect
|
||||||
|
int8_t connType; // connection type
|
||||||
|
int64_t rid; // refId returned by taosAddRef
|
||||||
|
SRpcMsg* pRsp; // for synchronous API
|
||||||
|
tsem_t* pSem; // for synchronous API
|
||||||
|
char* ip;
|
||||||
|
uint32_t port;
|
||||||
|
// SEpSet* pSet; // for synchronous API
|
||||||
|
} STransConnCtx;
|
||||||
|
|
||||||
#define RPC_MSG_OVERHEAD (sizeof(SRpcReqContext) + sizeof(SRpcHead) + sizeof(SRpcDigest))
|
#pragma pack(push, 1)
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
char version : 4; // RPC version
|
||||||
|
char comp : 4; // compression algorithm, 0:no compression 1:lz4
|
||||||
|
char resflag : 2; // reserved bits
|
||||||
|
char spi : 3; // security parameter index
|
||||||
|
char encrypt : 3; // encrypt algorithm, 0: no encryption
|
||||||
|
|
||||||
|
uint32_t code; // del later
|
||||||
|
uint32_t msgType;
|
||||||
|
int32_t msgLen;
|
||||||
|
uint8_t content[0]; // message body starts from here
|
||||||
|
} STransMsgHead;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int32_t reserved;
|
||||||
|
int32_t contLen;
|
||||||
|
} STransCompMsg;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
uint32_t timeStamp;
|
||||||
|
uint8_t auth[TSDB_AUTH_LEN];
|
||||||
|
} STransDigestMsg;
|
||||||
|
|
||||||
|
#pragma pack(pop)
|
||||||
|
|
||||||
|
#define container_of(ptr, type, member) ((type*)((char*)(ptr)-offsetof(type, member)))
|
||||||
|
#define RPC_RESERVE_SIZE (sizeof(STranConnCtx))
|
||||||
|
|
||||||
|
#define RPC_MSG_OVERHEAD (sizeof(SRpcHead) + sizeof(SRpcDigest))
|
||||||
#define rpcHeadFromCont(cont) ((SRpcHead*)((char*)cont - sizeof(SRpcHead)))
|
#define rpcHeadFromCont(cont) ((SRpcHead*)((char*)cont - sizeof(SRpcHead)))
|
||||||
#define rpcContFromHead(msg) (msg + sizeof(SRpcHead))
|
#define rpcContFromHead(msg) (msg + sizeof(SRpcHead))
|
||||||
#define rpcMsgLenFromCont(contLen) (contLen + sizeof(SRpcHead))
|
#define rpcMsgLenFromCont(contLen) (contLen + sizeof(SRpcHead))
|
||||||
#define rpcContLenFromMsg(msgLen) (msgLen - sizeof(SRpcHead))
|
#define rpcContLenFromMsg(msgLen) (msgLen - sizeof(SRpcHead))
|
||||||
#define rpcIsReq(type) (type & 1U)
|
#define rpcIsReq(type) (type & 1U)
|
||||||
|
|
||||||
|
#define TRANS_RESERVE_SIZE (sizeof(STranConnCtx))
|
||||||
|
|
||||||
|
#define TRANS_MSG_OVERHEAD (sizeof(STransMsgHead) + sizeof(STransDigestMsg))
|
||||||
|
#define transHeadFromCont(cont) ((STransMsgHead*)((char*)cont - sizeof(STransMsgHead)))
|
||||||
|
#define transContFromHead(msg) (msg + sizeof(STransMsgHead))
|
||||||
|
#define transMsgLenFromCont(contLen) (contLen + sizeof(STransMsgHead))
|
||||||
|
#define transContLenFromMsg(msgLen) (msgLen - sizeof(STransMsgHead));
|
||||||
|
#define transIsReq(type) (type & 1U)
|
||||||
|
|
||||||
int rpcAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey);
|
int rpcAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey);
|
||||||
void rpcBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey);
|
void rpcBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey);
|
||||||
int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen);
|
int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen);
|
||||||
SRpcHead* rpcDecompressRpcMsg(SRpcHead* pHead);
|
SRpcHead* rpcDecompressRpcMsg(SRpcHead* pHead);
|
||||||
|
|
||||||
|
int transAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey);
|
||||||
|
void transBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey);
|
||||||
|
bool transCompressMsg(char* msg, int32_t len, int32_t* flen);
|
||||||
|
bool transDecompressMsg(char* msg, int32_t len, int32_t* flen);
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -45,6 +45,9 @@ extern "C" {
|
||||||
void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle);
|
void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle);
|
||||||
void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle);
|
void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle);
|
||||||
|
|
||||||
|
void taosCloseServer(void* arg);
|
||||||
|
void taosCloseClient(void* arg);
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int sessions; // number of sessions allowed
|
int sessions; // number of sessions allowed
|
||||||
int numOfThreads; // number of threads to process incoming messages
|
int numOfThreads; // number of threads to process incoming messages
|
||||||
|
|
|
@ -24,8 +24,9 @@ typedef struct SConnBuffer {
|
||||||
int left;
|
int left;
|
||||||
} SConnBuffer;
|
} SConnBuffer;
|
||||||
|
|
||||||
void* (*taosHandle[])(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) = {
|
void* (*taosInitHandle[])(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) = {
|
||||||
taosInitServer, taosInitClient};
|
taosInitServer, taosInitClient};
|
||||||
|
void (*taosCloseHandle[])(void* arg) = {taosCloseServer, taosCloseClient};
|
||||||
|
|
||||||
void* rpcOpen(const SRpcInit* pInit) {
|
void* rpcOpen(const SRpcInit* pInit) {
|
||||||
SRpcInfo* pRpc = calloc(1, sizeof(SRpcInfo));
|
SRpcInfo* pRpc = calloc(1, sizeof(SRpcInfo));
|
||||||
|
@ -38,11 +39,15 @@ void* rpcOpen(const SRpcInit* pInit) {
|
||||||
pRpc->cfp = pInit->cfp;
|
pRpc->cfp = pInit->cfp;
|
||||||
pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads;
|
pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads;
|
||||||
pRpc->connType = pInit->connType;
|
pRpc->connType = pInit->connType;
|
||||||
pRpc->tcphandle = (*taosHandle[pRpc->connType])(0, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc);
|
pRpc->tcphandle = (*taosInitHandle[pRpc->connType])(0, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc);
|
||||||
|
|
||||||
return pRpc;
|
return pRpc;
|
||||||
}
|
}
|
||||||
void rpcClose(void* arg) { return; }
|
void rpcClose(void* arg) {
|
||||||
|
SRpcInfo* pRpc = (SRpcInfo*)arg;
|
||||||
|
(*taosCloseHandle[pRpc->connType])(pRpc->tcphandle);
|
||||||
|
return;
|
||||||
|
}
|
||||||
void* rpcMallocCont(int contLen) {
|
void* rpcMallocCont(int contLen) {
|
||||||
int size = contLen + RPC_MSG_OVERHEAD;
|
int size = contLen + RPC_MSG_OVERHEAD;
|
||||||
|
|
||||||
|
@ -53,7 +58,7 @@ void* rpcMallocCont(int contLen) {
|
||||||
} else {
|
} else {
|
||||||
tTrace("malloc mem:%p size:%d", start, size);
|
tTrace("malloc mem:%p size:%d", start, size);
|
||||||
}
|
}
|
||||||
return start + sizeof(SRpcReqContext) + sizeof(SRpcHead);
|
return start + sizeof(STransMsgHead);
|
||||||
}
|
}
|
||||||
void rpcFreeCont(void* cont) { return; }
|
void rpcFreeCont(void* cont) { return; }
|
||||||
void* rpcReallocCont(void* ptr, int contLen) { return NULL; }
|
void* rpcReallocCont(void* ptr, int contLen) { return NULL; }
|
||||||
|
|
|
@ -26,8 +26,10 @@ typedef struct SCliConn {
|
||||||
char spi;
|
char spi;
|
||||||
char secured;
|
char secured;
|
||||||
} SCliConn;
|
} SCliConn;
|
||||||
|
|
||||||
typedef struct SCliMsg {
|
typedef struct SCliMsg {
|
||||||
SRpcReqContext* context;
|
STransConnCtx* ctx;
|
||||||
|
SRpcMsg msg;
|
||||||
queue q;
|
queue q;
|
||||||
uint64_t st;
|
uint64_t st;
|
||||||
} SCliMsg;
|
} SCliMsg;
|
||||||
|
@ -74,6 +76,7 @@ static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf
|
||||||
if (nread > 0) {
|
if (nread > 0) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
//
|
//
|
||||||
uv_close((uv_handle_t*)handle, clientDestroy);
|
uv_close((uv_handle_t*)handle, clientDestroy);
|
||||||
}
|
}
|
||||||
|
@ -101,9 +104,11 @@ static void clientWriteCb(uv_write_t* req, int status) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static void clientWrite(SCliConn* pConn) {
|
static void clientWrite(SCliConn* pConn) {
|
||||||
SCliMsg* pMsg = pConn->data;
|
SCliMsg* pCliMsg = pConn->data;
|
||||||
SRpcHead* pHead = rpcHeadFromCont(pMsg->context->pCont);
|
SRpcMsg* pMsg = (SRpcMsg*)(&pCliMsg->msg);
|
||||||
int msgLen = rpcMsgLenFromCont(pMsg->context->contLen);
|
STransMsgHead* pHead = transHeadFromCont(pMsg->pCont);
|
||||||
|
|
||||||
|
int msgLen = transMsgLenFromCont(pMsg->contLen);
|
||||||
char* msg = (char*)(pHead);
|
char* msg = (char*)(pHead);
|
||||||
|
|
||||||
uv_buf_t wb = uv_buf_init(msg, msgLen);
|
uv_buf_t wb = uv_buf_init(msg, msgLen);
|
||||||
|
@ -119,16 +124,16 @@ static void clientConnCb(uv_connect_t* req, int status) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SCliMsg* pMsg = pConn->data;
|
SCliMsg* pMsg = pConn->data;
|
||||||
SEpSet* pEpSet = &pMsg->context->epSet;
|
STransConnCtx* pCtx = ((SCliMsg*)(pConn->data))->ctx;
|
||||||
SRpcMsg rpcMsg;
|
|
||||||
|
SRpcMsg rpcMsg;
|
||||||
|
rpcMsg.ahandle = pCtx->ahandle;
|
||||||
|
|
||||||
char* fqdn = pEpSet->fqdn[pEpSet->inUse];
|
|
||||||
uint32_t port = pEpSet->port[pEpSet->inUse];
|
|
||||||
if (status != 0) {
|
if (status != 0) {
|
||||||
// call user fp later
|
// call user fp later
|
||||||
tError("failed to connect server(%s, %d), errmsg: %s", fqdn, port, uv_strerror(status));
|
tError("failed to connect server(%s, %d), errmsg: %s", pCtx->ip, pCtx->port, uv_strerror(status));
|
||||||
SRpcInfo* pRpc = pMsg->context->pRpc;
|
SRpcInfo* pRpc = pMsg->ctx->pRpc;
|
||||||
(pRpc->cfp)(NULL, &rpcMsg, pEpSet);
|
(pRpc->cfp)(NULL, &rpcMsg, NULL);
|
||||||
uv_close((uv_handle_t*)req->handle, clientDestroy);
|
uv_close((uv_handle_t*)req->handle, clientDestroy);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -146,22 +151,18 @@ static void addConnToCache(void* cache, char* ip, uint32_t port, SCliConn* conn)
|
||||||
}
|
}
|
||||||
|
|
||||||
static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
|
static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
|
||||||
SEpSet* pEpSet = &pMsg->context->epSet;
|
uint64_t et = taosGetTimestampUs();
|
||||||
|
uint64_t el = et - pMsg->st;
|
||||||
char* fqdn = pEpSet->fqdn[pEpSet->inUse];
|
|
||||||
uint32_t port = pEpSet->port[pEpSet->inUse];
|
|
||||||
|
|
||||||
uint64_t el = taosGetTimestampUs() - pMsg->st;
|
|
||||||
tDebug("msg tran time cost: %" PRIu64 "", el);
|
tDebug("msg tran time cost: %" PRIu64 "", el);
|
||||||
|
et = taosGetTimestampUs();
|
||||||
|
|
||||||
SCliConn* conn = getConnFromCache(pThrd->cache, fqdn, port);
|
STransConnCtx* pCtx = pMsg->ctx;
|
||||||
|
SCliConn* conn = getConnFromCache(pThrd->cache, pCtx->ip, pCtx->port);
|
||||||
if (conn != NULL) {
|
if (conn != NULL) {
|
||||||
// impl later
|
// impl later
|
||||||
conn->data = pMsg;
|
conn->data = pMsg;
|
||||||
conn->writeReq->data = conn;
|
conn->writeReq->data = conn;
|
||||||
clientWrite(conn);
|
clientWrite(conn);
|
||||||
// uv_buf_t wb;
|
|
||||||
// uv_write(conn->writeReq, (uv_stream_t*)conn->stream, &wb, 1, clientWriteCb);
|
|
||||||
} else {
|
} else {
|
||||||
SCliConn* conn = malloc(sizeof(SCliConn));
|
SCliConn* conn = malloc(sizeof(SCliConn));
|
||||||
|
|
||||||
|
@ -171,23 +172,11 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
|
||||||
|
|
||||||
conn->connReq.data = conn;
|
conn->connReq.data = conn;
|
||||||
conn->data = pMsg;
|
conn->data = pMsg;
|
||||||
|
|
||||||
struct sockaddr_in addr;
|
struct sockaddr_in addr;
|
||||||
uv_ip4_addr(fqdn, port, &addr);
|
uv_ip4_addr(pMsg->ctx->ip, pMsg->ctx->port, &addr);
|
||||||
// handle error in callback if fail to connect
|
// handle error in callback if fail to connect
|
||||||
uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, clientConnCb);
|
uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, clientConnCb);
|
||||||
|
|
||||||
// SRpcMsg rpcMsg;
|
|
||||||
// SEpSet* pEpSet = &pMsg->context->epSet;
|
|
||||||
// SRpcInfo* pRpc = pMsg->context->pRpc;
|
|
||||||
//// rpcMsg.ahandle = pMsg->context->ahandle;
|
|
||||||
// rpcMsg.pCont = NULL;
|
|
||||||
// rpcMsg.ahandle = pMsg->context->ahandle;
|
|
||||||
// uint64_t el1 = taosGetTimestampUs() - et;
|
|
||||||
// tError("msg tran back first: time cost: %" PRIu64 "", el1);
|
|
||||||
// et = taosGetTimestampUs();
|
|
||||||
//(pRpc->cfp)(NULL, &rpcMsg, pEpSet);
|
|
||||||
// uint64_t el2 = taosGetTimestampUs() - et;
|
|
||||||
// tError("msg tran back second: time cost: %" PRIu64 "", el2);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
static void clientAsyncCb(uv_async_t* handle) {
|
static void clientAsyncCb(uv_async_t* handle) {
|
||||||
|
@ -204,7 +193,8 @@ static void clientAsyncCb(uv_async_t* handle) {
|
||||||
while (!QUEUE_IS_EMPTY(&wq)) {
|
while (!QUEUE_IS_EMPTY(&wq)) {
|
||||||
queue* h = QUEUE_HEAD(&wq);
|
queue* h = QUEUE_HEAD(&wq);
|
||||||
QUEUE_REMOVE(h);
|
QUEUE_REMOVE(h);
|
||||||
pMsg = QUEUE_DATA(h, SCliMsg, q);
|
|
||||||
|
SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);
|
||||||
clientHandleReq(pMsg, pThrd);
|
clientHandleReq(pMsg, pThrd);
|
||||||
count++;
|
count++;
|
||||||
if (count >= 2) {
|
if (count >= 2) {
|
||||||
|
@ -220,6 +210,7 @@ static void* clientThread(void* arg) {
|
||||||
|
|
||||||
void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) {
|
void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) {
|
||||||
SClientObj* cli = calloc(1, sizeof(SClientObj));
|
SClientObj* cli = calloc(1, sizeof(SClientObj));
|
||||||
|
|
||||||
memcpy(cli->label, label, strlen(label));
|
memcpy(cli->label, label, strlen(label));
|
||||||
cli->numOfThreads = numOfThreads;
|
cli->numOfThreads = numOfThreads;
|
||||||
cli->pThreadObj = (SCliThrdObj**)calloc(cli->numOfThreads, sizeof(SCliThrdObj*));
|
cli->pThreadObj = (SCliThrdObj**)calloc(cli->numOfThreads, sizeof(SCliThrdObj*));
|
||||||
|
@ -244,22 +235,31 @@ void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads,
|
||||||
}
|
}
|
||||||
return cli;
|
return cli;
|
||||||
}
|
}
|
||||||
|
void taosCloseClient(void* arg) {
|
||||||
|
// impl later
|
||||||
|
SClientObj* cli = arg;
|
||||||
|
}
|
||||||
|
|
||||||
void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) {
|
void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) {
|
||||||
// impl later
|
// impl later
|
||||||
|
char* ip = (char*)(pEpSet->fqdn[pEpSet->inUse]);
|
||||||
|
uint32_t port = pEpSet->port[pEpSet->inUse];
|
||||||
|
|
||||||
SRpcInfo* pRpc = (SRpcInfo*)shandle;
|
SRpcInfo* pRpc = (SRpcInfo*)shandle;
|
||||||
|
|
||||||
int len = rpcCompressRpcMsg(pMsg->pCont, pMsg->contLen);
|
int len = rpcCompressRpcMsg(pMsg->pCont, pMsg->contLen);
|
||||||
|
|
||||||
SRpcReqContext* pContext;
|
STransConnCtx* pCtx = calloc(1, sizeof(STransConnCtx));
|
||||||
pContext = (SRpcReqContext*)((char*)pMsg->pCont - sizeof(SRpcHead) - sizeof(SRpcReqContext));
|
|
||||||
pContext->ahandle = pMsg->ahandle;
|
pCtx->pRpc = (SRpcInfo*)shandle;
|
||||||
pContext->pRpc = (SRpcInfo*)shandle;
|
pCtx->ahandle = pMsg->ahandle;
|
||||||
pContext->epSet = *pEpSet;
|
// pContext->contLen = len;
|
||||||
pContext->contLen = len;
|
// pContext->pCont = pMsg->pCont;
|
||||||
pContext->pCont = pMsg->pCont;
|
pCtx->msgType = pMsg->msgType;
|
||||||
pContext->msgType = pMsg->msgType;
|
pCtx->ip = strdup(ip);
|
||||||
pContext->oldInUse = pEpSet->inUse;
|
pCtx->port = port;
|
||||||
|
// pContext->epSet = *pEpSet;
|
||||||
|
// pContext->oldInUse = pEpSet->inUse;
|
||||||
|
|
||||||
assert(pRpc->connType == TAOS_CONN_CLIENT);
|
assert(pRpc->connType == TAOS_CONN_CLIENT);
|
||||||
// atomic or not
|
// atomic or not
|
||||||
|
@ -267,14 +267,15 @@ void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t*
|
||||||
if (pRpc->index++ >= pRpc->numOfThreads) {
|
if (pRpc->index++ >= pRpc->numOfThreads) {
|
||||||
pRpc->index = 0;
|
pRpc->index = 0;
|
||||||
}
|
}
|
||||||
SCliMsg* msg = malloc(sizeof(SCliMsg));
|
SCliMsg* cliMsg = malloc(sizeof(SCliMsg));
|
||||||
msg->context = pContext;
|
cliMsg->ctx = pCtx;
|
||||||
msg->st = taosGetTimestampUs();
|
cliMsg->msg = *pMsg;
|
||||||
|
cliMsg->st = taosGetTimestampUs();
|
||||||
|
|
||||||
SCliThrdObj* thrd = ((SClientObj*)pRpc->tcphandle)->pThreadObj[index % pRpc->numOfThreads];
|
SCliThrdObj* thrd = ((SClientObj*)pRpc->tcphandle)->pThreadObj[index % pRpc->numOfThreads];
|
||||||
|
|
||||||
pthread_mutex_lock(&thrd->msgMtx);
|
pthread_mutex_lock(&thrd->msgMtx);
|
||||||
QUEUE_PUSH(&thrd->msg, &msg->q);
|
QUEUE_PUSH(&thrd->msg, &cliMsg->q);
|
||||||
pthread_mutex_unlock(&thrd->msgMtx);
|
pthread_mutex_unlock(&thrd->msgMtx);
|
||||||
|
|
||||||
uv_async_send(thrd->cliAsync);
|
uv_async_send(thrd->cliAsync);
|
||||||
|
|
|
@ -30,6 +30,20 @@ int rpcAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey) {
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
int transAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey) {
|
||||||
|
T_MD5_CTX context;
|
||||||
|
int ret = -1;
|
||||||
|
|
||||||
|
tMD5Init(&context);
|
||||||
|
tMD5Update(&context, (uint8_t*)pKey, TSDB_PASSWORD_LEN);
|
||||||
|
tMD5Update(&context, (uint8_t*)pMsg, msgLen);
|
||||||
|
tMD5Update(&context, (uint8_t*)pKey, TSDB_PASSWORD_LEN);
|
||||||
|
tMD5Final(&context);
|
||||||
|
|
||||||
|
if (memcmp(context.digest, pAuth, sizeof(context.digest)) == 0) ret = 0;
|
||||||
|
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
void rpcBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey) {
|
void rpcBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey) {
|
||||||
T_MD5_CTX context;
|
T_MD5_CTX context;
|
||||||
|
|
||||||
|
@ -41,6 +55,17 @@ void rpcBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey) {
|
||||||
|
|
||||||
memcpy(pAuth, context.digest, sizeof(context.digest));
|
memcpy(pAuth, context.digest, sizeof(context.digest));
|
||||||
}
|
}
|
||||||
|
void transBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey) {
|
||||||
|
T_MD5_CTX context;
|
||||||
|
|
||||||
|
tMD5Init(&context);
|
||||||
|
tMD5Update(&context, (uint8_t*)pKey, TSDB_PASSWORD_LEN);
|
||||||
|
tMD5Update(&context, (uint8_t*)pMsg, msgLen);
|
||||||
|
tMD5Update(&context, (uint8_t*)pKey, TSDB_PASSWORD_LEN);
|
||||||
|
tMD5Final(&context);
|
||||||
|
|
||||||
|
memcpy(pAuth, context.digest, sizeof(context.digest));
|
||||||
|
}
|
||||||
|
|
||||||
int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen) {
|
int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen) {
|
||||||
SRpcHead* pHead = rpcHeadFromCont(pCont);
|
SRpcHead* pHead = rpcHeadFromCont(pCont);
|
||||||
|
@ -81,6 +106,53 @@ int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen) {
|
||||||
return finalLen;
|
return finalLen;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool transCompressMsg(char* msg, int32_t len, int32_t* flen) {
|
||||||
|
// SRpcHead* pHead = rpcHeadFromCont(pCont);
|
||||||
|
bool succ = false;
|
||||||
|
int overhead = sizeof(STransCompMsg);
|
||||||
|
if (!NEEDTO_COMPRESSS_MSG(len)) {
|
||||||
|
return succ;
|
||||||
|
}
|
||||||
|
|
||||||
|
char* buf = malloc(len + overhead + 8); // 8 extra bytes
|
||||||
|
if (buf == NULL) {
|
||||||
|
tError("failed to allocate memory for rpc msg compression, contLen:%d", len);
|
||||||
|
*flen = len;
|
||||||
|
return succ;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t clen = LZ4_compress_default(msg, buf, len, len + overhead);
|
||||||
|
tDebug("compress rpc msg, before:%d, after:%d, overhead:%d", len, clen, overhead);
|
||||||
|
/*
|
||||||
|
* only the compressed size is less than the value of contLen - overhead, the compression is applied
|
||||||
|
* The first four bytes is set to 0, the second four bytes are utilized to keep the original length of message
|
||||||
|
*/
|
||||||
|
if (clen > 0 && clen < len - overhead) {
|
||||||
|
STransCompMsg* pComp = (STransCompMsg*)msg;
|
||||||
|
pComp->reserved = 0;
|
||||||
|
pComp->contLen = htonl(len);
|
||||||
|
memcpy(msg + overhead, buf, clen);
|
||||||
|
|
||||||
|
tDebug("compress rpc msg, before:%d, after:%d", len, clen);
|
||||||
|
*flen = clen + overhead;
|
||||||
|
succ = true;
|
||||||
|
} else {
|
||||||
|
*flen = len;
|
||||||
|
succ = false;
|
||||||
|
}
|
||||||
|
free(buf);
|
||||||
|
return succ;
|
||||||
|
}
|
||||||
|
bool transDecompressMsg(char* msg, int32_t len, int32_t* flen) {
|
||||||
|
// impl later
|
||||||
|
return false;
|
||||||
|
STransCompMsg* pComp = (STransCompMsg*)msg;
|
||||||
|
|
||||||
|
int overhead = sizeof(STransCompMsg);
|
||||||
|
int clen = 0;
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
SRpcHead* rpcDecompressRpcMsg(SRpcHead* pHead) {
|
SRpcHead* rpcDecompressRpcMsg(SRpcHead* pHead) {
|
||||||
int overhead = sizeof(SRpcComp);
|
int overhead = sizeof(SRpcComp);
|
||||||
SRpcHead* pNewHead = NULL;
|
SRpcHead* pNewHead = NULL;
|
||||||
|
|
|
@ -107,24 +107,24 @@ void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* b
|
||||||
SConn* conn = handle->data;
|
SConn* conn = handle->data;
|
||||||
SConnBuffer* pBuf = &conn->connBuf;
|
SConnBuffer* pBuf = &conn->connBuf;
|
||||||
if (pBuf->cap == 0) {
|
if (pBuf->cap == 0) {
|
||||||
pBuf->buf = (char*)calloc(CAPACITY + RPC_RESERVE_SIZE, sizeof(char));
|
pBuf->buf = (char*)calloc(CAPACITY, sizeof(char));
|
||||||
pBuf->len = 0;
|
pBuf->len = 0;
|
||||||
pBuf->cap = CAPACITY;
|
pBuf->cap = CAPACITY;
|
||||||
pBuf->left = -1;
|
pBuf->left = -1;
|
||||||
|
|
||||||
buf->base = pBuf->buf + RPC_RESERVE_SIZE;
|
buf->base = pBuf->buf;
|
||||||
buf->len = CAPACITY;
|
buf->len = CAPACITY;
|
||||||
} else {
|
} else {
|
||||||
if (pBuf->len >= pBuf->cap) {
|
if (pBuf->len >= pBuf->cap) {
|
||||||
if (pBuf->left == -1) {
|
if (pBuf->left == -1) {
|
||||||
pBuf->cap *= 2;
|
pBuf->cap *= 2;
|
||||||
pBuf->buf = realloc(pBuf->buf, pBuf->cap + RPC_RESERVE_SIZE);
|
pBuf->buf = realloc(pBuf->buf, pBuf->cap);
|
||||||
} else if (pBuf->len + pBuf->left > pBuf->cap) {
|
} else if (pBuf->len + pBuf->left > pBuf->cap) {
|
||||||
pBuf->cap = pBuf->len + pBuf->left;
|
pBuf->cap = pBuf->len + pBuf->left;
|
||||||
pBuf->buf = realloc(pBuf->buf, pBuf->len + pBuf->left + RPC_RESERVE_SIZE);
|
pBuf->buf = realloc(pBuf->buf, pBuf->len + pBuf->left);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
buf->base = pBuf->buf + pBuf->len + RPC_RESERVE_SIZE;
|
buf->base = pBuf->buf + pBuf->len;
|
||||||
buf->len = pBuf->cap - pBuf->len;
|
buf->len = pBuf->cap - pBuf->len;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -133,11 +133,12 @@ void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* b
|
||||||
//
|
//
|
||||||
static bool readComplete(SConnBuffer* data) {
|
static bool readComplete(SConnBuffer* data) {
|
||||||
// TODO(yihao): handle pipeline later
|
// TODO(yihao): handle pipeline later
|
||||||
SRpcHead rpcHead;
|
// SRpcHead rpcHead;
|
||||||
int32_t headLen = sizeof(rpcHead);
|
STransMsgHead head;
|
||||||
|
int32_t headLen = sizeof(head);
|
||||||
if (data->len >= headLen) {
|
if (data->len >= headLen) {
|
||||||
memcpy((char*)&rpcHead, data->buf + RPC_RESERVE_SIZE, headLen);
|
memcpy((char*)&head, data->buf, headLen);
|
||||||
int32_t msgLen = (int32_t)htonl((uint32_t)rpcHead.msgLen);
|
int32_t msgLen = (int32_t)htonl((uint32_t)head.msgLen);
|
||||||
if (msgLen > data->len) {
|
if (msgLen > data->len) {
|
||||||
data->left = msgLen - data->len;
|
data->left = msgLen - data->len;
|
||||||
return false;
|
return false;
|
||||||
|
@ -150,20 +151,20 @@ static bool readComplete(SConnBuffer* data) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static void uvDoProcess(SRecvInfo* pRecv) {
|
static void uvDoProcess(SRecvInfo* pRecv) {
|
||||||
SRpcHead* pHead = (SRpcHead*)pRecv->msg;
|
// impl later
|
||||||
|
STransMsgHead* pHead = (STransMsgHead*)pRecv->msg;
|
||||||
SRpcInfo* pRpc = (SRpcInfo*)pRecv->shandle;
|
SRpcInfo* pRpc = (SRpcInfo*)pRecv->shandle;
|
||||||
SConn* pConn = pRecv->thandle;
|
SConn* pConn = pRecv->thandle;
|
||||||
|
|
||||||
tDump(pRecv->msg, pRecv->msgLen);
|
tDump(pRecv->msg, pRecv->msgLen);
|
||||||
|
|
||||||
terrno = 0;
|
terrno = 0;
|
||||||
SRpcReqContext* pContest;
|
// SRpcReqContext* pContest;
|
||||||
|
|
||||||
// do auth and check
|
// do auth and check
|
||||||
}
|
}
|
||||||
|
|
||||||
static int uvAuthMsg(SConn* pConn, char* msg, int len) {
|
static int uvAuthMsg(SConn* pConn, char* msg, int len) {
|
||||||
SRpcHead* pHead = (SRpcHead*)msg;
|
STransMsgHead* pHead = (STransMsgHead*)msg;
|
||||||
|
|
||||||
int code = 0;
|
int code = 0;
|
||||||
|
|
||||||
if ((pConn->secured && pHead->spi == 0) || (pHead->spi == 0 && pConn->spi == 0)) {
|
if ((pConn->secured && pHead->spi == 0) || (pHead->spi == 0 && pConn->spi == 0)) {
|
||||||
|
@ -224,7 +225,7 @@ static void uvProcessData(SConn* pConn) {
|
||||||
SRecvInfo info;
|
SRecvInfo info;
|
||||||
SRecvInfo* p = &info;
|
SRecvInfo* p = &info;
|
||||||
SConnBuffer* pBuf = &pConn->connBuf;
|
SConnBuffer* pBuf = &pConn->connBuf;
|
||||||
p->msg = pBuf->buf + RPC_RESERVE_SIZE;
|
p->msg = pBuf->buf;
|
||||||
p->msgLen = pBuf->len;
|
p->msgLen = pBuf->len;
|
||||||
p->ip = 0;
|
p->ip = 0;
|
||||||
p->port = 0;
|
p->port = 0;
|
||||||
|
@ -233,11 +234,10 @@ static void uvProcessData(SConn* pConn) {
|
||||||
p->chandle = NULL;
|
p->chandle = NULL;
|
||||||
|
|
||||||
//
|
//
|
||||||
SRpcHead* pHead = (SRpcHead*)p->msg;
|
STransMsgHead* pHead = (STransMsgHead*)p->msg;
|
||||||
assert(rpcIsReq(pHead->msgType));
|
assert(transIsReq(pHead->msgType));
|
||||||
|
|
||||||
SRpcInfo* pRpc = (SRpcInfo*)p->shandle;
|
SRpcInfo* pRpc = (SRpcInfo*)p->shandle;
|
||||||
pConn->ahandle = (void*)pHead->ahandle;
|
|
||||||
// auth here
|
// auth here
|
||||||
|
|
||||||
int8_t code = uvAuthMsg(pConn, (char*)pHead, p->msgLen);
|
int8_t code = uvAuthMsg(pConn, (char*)pHead, p->msgLen);
|
||||||
|
@ -247,14 +247,19 @@ static void uvProcessData(SConn* pConn) {
|
||||||
}
|
}
|
||||||
pHead->code = htonl(pHead->code);
|
pHead->code = htonl(pHead->code);
|
||||||
|
|
||||||
|
int32_t dlen = 0;
|
||||||
SRpcMsg rpcMsg;
|
SRpcMsg rpcMsg;
|
||||||
|
if (transDecompressMsg(NULL, 0, NULL)) {
|
||||||
pHead = rpcDecompressRpcMsg(pHead);
|
// add compress later
|
||||||
|
// pHead = rpcDecompressRpcMsg(pHead);
|
||||||
|
} else {
|
||||||
|
// impl later
|
||||||
|
}
|
||||||
rpcMsg.contLen = rpcContLenFromMsg(pHead->msgLen);
|
rpcMsg.contLen = rpcContLenFromMsg(pHead->msgLen);
|
||||||
rpcMsg.pCont = pHead->content;
|
rpcMsg.pCont = pHead->content;
|
||||||
rpcMsg.msgType = pHead->msgType;
|
rpcMsg.msgType = pHead->msgType;
|
||||||
rpcMsg.code = pHead->code;
|
rpcMsg.code = pHead->code;
|
||||||
rpcMsg.ahandle = pConn->ahandle;
|
rpcMsg.ahandle = NULL;
|
||||||
rpcMsg.handle = pConn;
|
rpcMsg.handle = pConn;
|
||||||
|
|
||||||
(*(pRpc->cfp))(pRpc->parent, &rpcMsg, NULL);
|
(*(pRpc->cfp))(pRpc->parent, &rpcMsg, NULL);
|
||||||
|
@ -423,7 +428,7 @@ void* workerThread(void* arg) {
|
||||||
uv_loop_init(pThrd->loop);
|
uv_loop_init(pThrd->loop);
|
||||||
|
|
||||||
// SRpcInfo* pRpc = pThrd->shandle;
|
// SRpcInfo* pRpc = pThrd->shandle;
|
||||||
uv_pipe_init(pThrd->loop, pThrd->pipe, 1);
|
uv_pipe_init(pThrd->loop, pThrd->pipe, 0);
|
||||||
uv_pipe_open(pThrd->pipe, pThrd->fd);
|
uv_pipe_open(pThrd->pipe, pThrd->fd);
|
||||||
|
|
||||||
pThrd->pipe->data = pThrd;
|
pThrd->pipe->data = pThrd;
|
||||||
|
@ -522,6 +527,10 @@ void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
|
||||||
|
|
||||||
return srv;
|
return srv;
|
||||||
}
|
}
|
||||||
|
void taosCloseServer(void* arg) {
|
||||||
|
// impl later
|
||||||
|
SServerObj* srv = arg;
|
||||||
|
}
|
||||||
|
|
||||||
void rpcSendResponse(const SRpcMsg* pMsg) {
|
void rpcSendResponse(const SRpcMsg* pMsg) {
|
||||||
SConn* pConn = pMsg->handle;
|
SConn* pConn = pMsg->handle;
|
||||||
|
|
|
@ -0,0 +1,207 @@
|
||||||
|
#include <assert.h>
|
||||||
|
#include <pthread.h>
|
||||||
|
#include <stdio.h>
|
||||||
|
#include <stdlib.h>
|
||||||
|
#include <string.h>
|
||||||
|
|
||||||
|
#include "task.h"
|
||||||
|
#include <uv.h>
|
||||||
|
|
||||||
|
#define NUM_OF_THREAD 1
|
||||||
|
#define TIMEOUT 10000
|
||||||
|
|
||||||
|
typedef struct SThreadObj {
|
||||||
|
pthread_t thread;
|
||||||
|
uv_pipe_t *pipe;
|
||||||
|
uv_loop_t *loop;
|
||||||
|
uv_async_t *workerAsync; //
|
||||||
|
int fd;
|
||||||
|
} SThreadObj;
|
||||||
|
|
||||||
|
typedef struct SServerObj {
|
||||||
|
uv_tcp_t server;
|
||||||
|
uv_loop_t *loop;
|
||||||
|
int workerIdx;
|
||||||
|
int numOfThread;
|
||||||
|
SThreadObj **pThreadObj;
|
||||||
|
uv_pipe_t **pipe;
|
||||||
|
} SServerObj;
|
||||||
|
|
||||||
|
typedef struct SConnCtx {
|
||||||
|
uv_tcp_t *pClient;
|
||||||
|
uv_timer_t *pTimer;
|
||||||
|
uv_async_t *pWorkerAsync;
|
||||||
|
int ref;
|
||||||
|
} SConnCtx;
|
||||||
|
|
||||||
|
void echo_write(uv_write_t *req, int status) {
|
||||||
|
if (status < 0) {
|
||||||
|
fprintf(stderr, "Write error %s\n", uv_err_name(status));
|
||||||
|
}
|
||||||
|
printf("write data to client\n");
|
||||||
|
free(req);
|
||||||
|
}
|
||||||
|
|
||||||
|
void echo_read(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) {
|
||||||
|
|
||||||
|
SConnCtx *pConn = container_of(client, SConnCtx, pClient);
|
||||||
|
pConn->ref += 1;
|
||||||
|
printf("read data %d\n", nread, buf->base, buf->len);
|
||||||
|
if (nread > 0) {
|
||||||
|
uv_write_t *req = (uv_write_t *)malloc(sizeof(uv_write_t));
|
||||||
|
// dispatch request to database other process thread
|
||||||
|
// just write out
|
||||||
|
uv_buf_t write_out;
|
||||||
|
write_out.base = buf->base;
|
||||||
|
write_out.len = nread;
|
||||||
|
uv_write((uv_write_t *)req, client, &write_out, 1, echo_write);
|
||||||
|
free(buf->base);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (nread < 0) {
|
||||||
|
if (nread != UV_EOF)
|
||||||
|
fprintf(stderr, "Read error %s\n", uv_err_name(nread));
|
||||||
|
uv_close((uv_handle_t *)client, NULL);
|
||||||
|
}
|
||||||
|
free(buf->base);
|
||||||
|
}
|
||||||
|
|
||||||
|
void alloc_buffer(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) {
|
||||||
|
buf->base = malloc(suggested_size);
|
||||||
|
buf->len = suggested_size;
|
||||||
|
}
|
||||||
|
|
||||||
|
void on_new_connection(uv_stream_t *s, int status) {
|
||||||
|
if (status == -1) {
|
||||||
|
// error!
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
SServerObj *pObj = container_of(s, SServerObj, server);
|
||||||
|
printf("new_connection from client\n");
|
||||||
|
|
||||||
|
uv_tcp_t *client = (uv_tcp_t *)malloc(sizeof(uv_tcp_t));
|
||||||
|
uv_tcp_init(pObj->loop, client);
|
||||||
|
if (uv_accept(s, (uv_stream_t *)client) == 0) {
|
||||||
|
uv_write_t *write_req = (uv_write_t *)malloc(sizeof(uv_write_t));
|
||||||
|
uv_buf_t dummy_buf = uv_buf_init("a", 1);
|
||||||
|
// despatch to worker thread
|
||||||
|
pObj->workerIdx = (pObj->workerIdx + 1) % pObj->numOfThread;
|
||||||
|
uv_write2(write_req, (uv_stream_t *)&(pObj->pipe[pObj->workerIdx][0]),
|
||||||
|
&dummy_buf, 1, (uv_stream_t *)client, echo_write);
|
||||||
|
} else {
|
||||||
|
uv_close((uv_handle_t *)client, NULL);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
void child_on_new_connection(uv_stream_t *q, ssize_t nread,
|
||||||
|
const uv_buf_t *buf) {
|
||||||
|
printf("x child_on_new_connection \n");
|
||||||
|
if (nread < 0) {
|
||||||
|
if (nread != UV_EOF)
|
||||||
|
fprintf(stderr, "Read error %s\n", uv_err_name(nread));
|
||||||
|
uv_close((uv_handle_t *)q, NULL);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
SThreadObj *pObj = (SThreadObj *)container_of(q, struct SThreadObj, pipe);
|
||||||
|
|
||||||
|
uv_pipe_t *pipe = (uv_pipe_t *)q;
|
||||||
|
if (!uv_pipe_pending_count(pipe)) {
|
||||||
|
fprintf(stderr, "No pending count\n");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
uv_handle_type pending = uv_pipe_pending_type(pipe);
|
||||||
|
assert(pending == UV_TCP);
|
||||||
|
|
||||||
|
SConnCtx *pConn = malloc(sizeof(SConnCtx));
|
||||||
|
|
||||||
|
/* init conn timer*/
|
||||||
|
pConn->pTimer = malloc(sizeof(uv_timer_t));
|
||||||
|
uv_timer_init(pObj->loop, pConn->pTimer);
|
||||||
|
|
||||||
|
pConn->pClient = (uv_tcp_t *)malloc(sizeof(uv_tcp_t));
|
||||||
|
pConn->pWorkerAsync = pObj->workerAsync; // thread safty
|
||||||
|
uv_tcp_init(pObj->loop, pConn->pClient);
|
||||||
|
|
||||||
|
if (uv_accept(q, (uv_stream_t *)(pConn->pClient)) == 0) {
|
||||||
|
uv_os_fd_t fd;
|
||||||
|
uv_fileno((const uv_handle_t *)pConn->pClient, &fd);
|
||||||
|
fprintf(stderr, "Worker Accepted fd %d\n", fd);
|
||||||
|
uv_timer_start(pConn->pTimer, timeOutCallBack, TIMEOUT, 0);
|
||||||
|
uv_read_start((uv_stream_t *)(pConn->pClient), alloc_buffer, echo_read);
|
||||||
|
} else {
|
||||||
|
uv_timer_stop(pConn->pTimer);
|
||||||
|
free(pConn->pTimer);
|
||||||
|
uv_close((uv_handle_t *)pConn->pClient, NULL);
|
||||||
|
free(pConn->pClient);
|
||||||
|
free(pConn);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void workerAsyncCallback(uv_async_t *handle) {
|
||||||
|
SThreadObj *pObj = container_of(handle, SThreadObj, workerAsync);
|
||||||
|
// do nothing
|
||||||
|
}
|
||||||
|
void *worker_thread(void *arg) {
|
||||||
|
SThreadObj *pObj = (SThreadObj *)arg;
|
||||||
|
int fd = pObj->fd;
|
||||||
|
pObj->loop = (uv_loop_t *)malloc(sizeof(uv_loop_t));
|
||||||
|
uv_loop_init(pObj->loop);
|
||||||
|
|
||||||
|
uv_pipe_init(pObj->loop, pObj->pipe, 1);
|
||||||
|
uv_pipe_open(pObj->pipe, fd);
|
||||||
|
|
||||||
|
pObj->workerAsync = malloc(sizeof(uv_async_t));
|
||||||
|
uv_async_init(pObj->loop, pObj->workerAsync, workerAsyncCallback);
|
||||||
|
uv_read_start((uv_stream_t *)pObj->pipe, alloc_buffer,
|
||||||
|
child_on_new_connection);
|
||||||
|
|
||||||
|
uv_run(pObj->loop, UV_RUN_DEFAULT);
|
||||||
|
}
|
||||||
|
int main() {
|
||||||
|
|
||||||
|
SServerObj *server = calloc(1, sizeof(SServerObj));
|
||||||
|
server->loop = (uv_loop_t *)malloc(sizeof(uv_loop_t));
|
||||||
|
server->numOfThread = NUM_OF_THREAD;
|
||||||
|
server->workerIdx = 0;
|
||||||
|
server->pThreadObj =
|
||||||
|
(SThreadObj **)calloc(server->numOfThread, sizeof(SThreadObj *));
|
||||||
|
server->pipe = (uv_pipe_t **)calloc(server->numOfThread, sizeof(uv_pipe_t *));
|
||||||
|
|
||||||
|
uv_loop_init(server->loop);
|
||||||
|
|
||||||
|
for (int i = 0; i < server->numOfThread; i++) {
|
||||||
|
server->pThreadObj[i] = (SThreadObj *)calloc(1, sizeof(SThreadObj));
|
||||||
|
server->pipe[i] = (uv_pipe_t *)calloc(2, sizeof(uv_pipe_t));
|
||||||
|
int fds[2];
|
||||||
|
if (uv_socketpair(AF_UNIX, SOCK_STREAM, fds, UV_NONBLOCK_PIPE,
|
||||||
|
UV_NONBLOCK_PIPE) != 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
uv_pipe_init(server->loop, &(server->pipe[i][0]), 1);
|
||||||
|
uv_pipe_open(&(server->pipe[i][0]), fds[1]); // init write
|
||||||
|
|
||||||
|
server->pThreadObj[i]->fd = fds[0];
|
||||||
|
server->pThreadObj[i]->pipe = &(server->pipe[i][1]); // init read
|
||||||
|
int err = pthread_create(&(server->pThreadObj[i]->thread), NULL,
|
||||||
|
worker_thread, (void *)(server->pThreadObj[i]));
|
||||||
|
if (err == 0) {
|
||||||
|
printf("thread %d create\n", i);
|
||||||
|
} else {
|
||||||
|
printf("thread %d create failed", i);
|
||||||
|
}
|
||||||
|
|
||||||
|
uv_tcp_init(server->loop, &server->server);
|
||||||
|
struct sockaddr_in bind_addr;
|
||||||
|
uv_ip4_addr("0.0.0.0", 7000, &bind_addr);
|
||||||
|
uv_tcp_bind(&server->server, (const struct sockaddr *)&bind_addr, 0);
|
||||||
|
int err = 0;
|
||||||
|
if ((err = uv_listen((uv_stream_t *)&server->server, 128,
|
||||||
|
on_new_connection)) != 0) {
|
||||||
|
fprintf(stderr, "Listen error %s\n", uv_err_name(err));
|
||||||
|
return 2;
|
||||||
|
}
|
||||||
|
uv_run(server->loop, UV_RUN_DEFAULT);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue