opt rpc
This commit is contained in:
parent
10d378b4a6
commit
d05b4ebb7d
|
@ -7,8 +7,7 @@
|
||||||
*
|
*
|
||||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
* FITNESS FOR A PARTICULAR PURPOSE. *
|
||||||
*
|
|
||||||
* You should have received a copy of the GNU Affero General Public License
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
@ -211,6 +210,7 @@ typedef struct SConnBuffer {
|
||||||
char* buf;
|
char* buf;
|
||||||
int len;
|
int len;
|
||||||
int cap;
|
int cap;
|
||||||
|
int left;
|
||||||
int total;
|
int total;
|
||||||
} SConnBuffer;
|
} SConnBuffer;
|
||||||
|
|
||||||
|
@ -282,6 +282,8 @@ int transClearBuffer(SConnBuffer* buf);
|
||||||
int transDestroyBuffer(SConnBuffer* buf);
|
int transDestroyBuffer(SConnBuffer* buf);
|
||||||
int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf);
|
int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf);
|
||||||
bool transReadComplete(SConnBuffer* connBuf);
|
bool transReadComplete(SConnBuffer* connBuf);
|
||||||
|
int transResetBuffer(SConnBuffer* connBuf);
|
||||||
|
int transDumpFromBuffer(SConnBuffer* connBuf, char** buf);
|
||||||
|
|
||||||
int transSetConnOption(uv_tcp_t* stream);
|
int transSetConnOption(uv_tcp_t* stream);
|
||||||
|
|
||||||
|
|
|
@ -17,10 +17,10 @@
|
||||||
#ifdef USE_UV
|
#ifdef USE_UV
|
||||||
#include <uv.h>
|
#include <uv.h>
|
||||||
#endif
|
#endif
|
||||||
#include "zlib.h"
|
|
||||||
#include "thttp.h"
|
|
||||||
#include "taoserror.h"
|
#include "taoserror.h"
|
||||||
|
#include "thttp.h"
|
||||||
#include "tlog.h"
|
#include "tlog.h"
|
||||||
|
#include "zlib.h"
|
||||||
|
|
||||||
static int32_t taosBuildHttpHeader(const char* server, int32_t contLen, char* pHead, int32_t headLen,
|
static int32_t taosBuildHttpHeader(const char* server, int32_t contLen, char* pHead, int32_t headLen,
|
||||||
EHttpCompFlag flag) {
|
EHttpCompFlag flag) {
|
||||||
|
|
|
@ -323,7 +323,8 @@ void cliHandleResp(SCliConn* conn) {
|
||||||
SCliThrd* pThrd = conn->hostThrd;
|
SCliThrd* pThrd = conn->hostThrd;
|
||||||
STrans* pTransInst = pThrd->pTransInst;
|
STrans* pTransInst = pThrd->pTransInst;
|
||||||
|
|
||||||
STransMsgHead* pHead = (STransMsgHead*)(conn->readBuf.buf);
|
STransMsgHead* pHead = NULL;
|
||||||
|
transDumpFromBuffer(&conn->readBuf, (char**)&pHead);
|
||||||
pHead->code = htonl(pHead->code);
|
pHead->code = htonl(pHead->code);
|
||||||
pHead->msgLen = htonl(pHead->msgLen);
|
pHead->msgLen = htonl(pHead->msgLen);
|
||||||
|
|
||||||
|
@ -366,7 +367,6 @@ void cliHandleResp(SCliConn* conn) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// buf's mem alread translated to transMsg.pCont
|
// buf's mem alread translated to transMsg.pCont
|
||||||
transClearBuffer(&conn->readBuf);
|
|
||||||
if (!CONN_NO_PERSIST_BY_APP(conn)) {
|
if (!CONN_NO_PERSIST_BY_APP(conn)) {
|
||||||
transMsg.info.handle = (void*)conn->refId;
|
transMsg.info.handle = (void*)conn->refId;
|
||||||
tDebug("%s conn %p ref by app", CONN_GET_INST_LABEL(conn), conn);
|
tDebug("%s conn %p ref by app", CONN_GET_INST_LABEL(conn), conn);
|
||||||
|
@ -636,6 +636,8 @@ static SCliConn* cliCreateConn(SCliThrd* pThrd) {
|
||||||
transReqQueueInit(&conn->wreqQueue);
|
transReqQueueInit(&conn->wreqQueue);
|
||||||
|
|
||||||
transQueueInit(&conn->cliMsgs, NULL);
|
transQueueInit(&conn->cliMsgs, NULL);
|
||||||
|
|
||||||
|
transInitBuffer(&conn->readBuf);
|
||||||
QUEUE_INIT(&conn->q);
|
QUEUE_INIT(&conn->q);
|
||||||
conn->hostThrd = pThrd;
|
conn->hostThrd = pThrd;
|
||||||
conn->status = ConnNormal;
|
conn->status = ConnNormal;
|
||||||
|
@ -651,8 +653,9 @@ static void cliDestroyConn(SCliConn* conn, bool clear) {
|
||||||
QUEUE_REMOVE(&conn->q);
|
QUEUE_REMOVE(&conn->q);
|
||||||
QUEUE_INIT(&conn->q);
|
QUEUE_INIT(&conn->q);
|
||||||
transRemoveExHandle(transGetRefMgt(), conn->refId);
|
transRemoveExHandle(transGetRefMgt(), conn->refId);
|
||||||
conn->refId = -1;
|
transDestroyBuffer(&conn->readBuf);
|
||||||
|
|
||||||
|
conn->refId = -1;
|
||||||
if (conn->task != NULL) transDQCancel(((SCliThrd*)conn->hostThrd)->timeoutQueue, conn->task);
|
if (conn->task != NULL) transDQCancel(((SCliThrd*)conn->hostThrd)->timeoutQueue, conn->task);
|
||||||
|
|
||||||
if (clear) {
|
if (clear) {
|
||||||
|
@ -678,7 +681,6 @@ static void cliDestroy(uv_handle_t* handle) {
|
||||||
tTrace("%s conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn);
|
tTrace("%s conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn);
|
||||||
transReqQueueClear(&conn->wreqQueue);
|
transReqQueueClear(&conn->wreqQueue);
|
||||||
|
|
||||||
transDestroyBuffer(&conn->readBuf);
|
|
||||||
taosMemoryFree(conn);
|
taosMemoryFree(conn);
|
||||||
}
|
}
|
||||||
static bool cliHandleNoResp(SCliConn* conn) {
|
static bool cliHandleNoResp(SCliConn* conn) {
|
||||||
|
|
|
@ -16,6 +16,8 @@
|
||||||
|
|
||||||
#include "transComm.h"
|
#include "transComm.h"
|
||||||
|
|
||||||
|
#define BUFFER_CAP 4096
|
||||||
|
|
||||||
static TdThreadOnce transModuleInit = PTHREAD_ONCE_INIT;
|
static TdThreadOnce transModuleInit = PTHREAD_ONCE_INIT;
|
||||||
|
|
||||||
static int32_t refMgt;
|
static int32_t refMgt;
|
||||||
|
@ -111,12 +113,56 @@ int transGetSockDebugInfo(struct sockaddr* sockname, char* dst) {
|
||||||
return r;
|
return r;
|
||||||
}
|
}
|
||||||
int transInitBuffer(SConnBuffer* buf) {
|
int transInitBuffer(SConnBuffer* buf) {
|
||||||
transClearBuffer(buf);
|
buf->cap = BUFFER_CAP;
|
||||||
|
buf->buf = taosMemoryCalloc(1, BUFFER_CAP);
|
||||||
|
buf->left = -1;
|
||||||
|
buf->len = 0;
|
||||||
|
buf->total = 0;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
int transDestroyBuffer(SConnBuffer* buf) {
|
||||||
|
taosMemoryFree(buf->buf);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int transClearBuffer(SConnBuffer* buf) {
|
int transClearBuffer(SConnBuffer* buf) {
|
||||||
memset(buf, 0, sizeof(*buf));
|
SConnBuffer* p = buf;
|
||||||
buf->total = -1;
|
if (p->cap > BUFFER_CAP) {
|
||||||
|
p->cap = BUFFER_CAP;
|
||||||
|
p->buf = taosMemoryRealloc(p->buf, BUFFER_CAP);
|
||||||
|
}
|
||||||
|
p->left = -1;
|
||||||
|
p->len = 0;
|
||||||
|
p->total = 0;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int transDumpFromBuffer(SConnBuffer* connBuf, char** buf) {
|
||||||
|
SConnBuffer* p = connBuf;
|
||||||
|
if (p->left != 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
int total = connBuf->total;
|
||||||
|
*buf = taosMemoryCalloc(1, total);
|
||||||
|
memcpy(*buf, p->buf, total);
|
||||||
|
|
||||||
|
transResetBuffer(connBuf);
|
||||||
|
return total;
|
||||||
|
}
|
||||||
|
|
||||||
|
int transResetBuffer(SConnBuffer* connBuf) {
|
||||||
|
SConnBuffer* p = connBuf;
|
||||||
|
if (p->total <= p->len) {
|
||||||
|
int left = p->len - p->total;
|
||||||
|
memmove(p->buf, p->buf + p->total, left);
|
||||||
|
p->left = -1;
|
||||||
|
p->total = 0;
|
||||||
|
p->len = left;
|
||||||
|
} else {
|
||||||
|
p->left = -1;
|
||||||
|
p->total = 0;
|
||||||
|
p->len = 0;
|
||||||
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf) {
|
int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf) {
|
||||||
|
@ -126,54 +172,39 @@ int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf) {
|
||||||
* |<------STransMsgHead------->|<-------------------userdata--------------->|<-----auth data----->|<----user
|
* |<------STransMsgHead------->|<-------------------userdata--------------->|<-----auth data----->|<----user
|
||||||
* info--->|
|
* info--->|
|
||||||
*/
|
*/
|
||||||
static const int CAPACITY = sizeof(STransMsgHead);
|
|
||||||
|
|
||||||
SConnBuffer* p = connBuf;
|
SConnBuffer* p = connBuf;
|
||||||
if (p->cap == 0) {
|
|
||||||
p->buf = (char*)taosMemoryCalloc(CAPACITY, sizeof(char));
|
|
||||||
tTrace("internal malloc mem:%p, size:%d", p->buf, CAPACITY);
|
|
||||||
p->len = 0;
|
|
||||||
p->cap = CAPACITY;
|
|
||||||
p->total = -1;
|
|
||||||
|
|
||||||
uvBuf->base = p->buf;
|
|
||||||
uvBuf->len = CAPACITY;
|
|
||||||
} else if (p->total == -1 && p->len < CAPACITY) {
|
|
||||||
uvBuf->base = p->buf + p->len;
|
|
||||||
uvBuf->len = CAPACITY - p->len;
|
|
||||||
} else {
|
|
||||||
p->cap = p->total;
|
|
||||||
p->buf = taosMemoryRealloc(p->buf, p->cap);
|
|
||||||
tTrace("internal realloc mem:%p, size:%d", p->buf, p->cap);
|
|
||||||
|
|
||||||
uvBuf->base = p->buf + p->len;
|
uvBuf->base = p->buf + p->len;
|
||||||
|
if (p->left == -1) {
|
||||||
uvBuf->len = p->cap - p->len;
|
uvBuf->len = p->cap - p->len;
|
||||||
|
} else {
|
||||||
|
if (p->left < p->cap - p->len) {
|
||||||
|
uvBuf->len = p->left;
|
||||||
|
} else {
|
||||||
|
p->buf = taosMemoryRealloc(p->buf, p->left + p->len);
|
||||||
|
uvBuf->base = p->buf + p->len;
|
||||||
|
uvBuf->len = p->left;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
// check whether already read complete
|
// check whether already read complete
|
||||||
bool transReadComplete(SConnBuffer* connBuf) {
|
bool transReadComplete(SConnBuffer* connBuf) {
|
||||||
if (connBuf->total == -1 && connBuf->len >= sizeof(STransMsgHead)) {
|
SConnBuffer* p = connBuf;
|
||||||
|
if (p->len >= sizeof(STransMsgHead)) {
|
||||||
|
if (p->left == -1) {
|
||||||
STransMsgHead head;
|
STransMsgHead head;
|
||||||
memcpy((char*)&head, connBuf->buf, sizeof(head));
|
memcpy((char*)&head, connBuf->buf, sizeof(head));
|
||||||
int32_t msgLen = (int32_t)htonl(head.msgLen);
|
int32_t msgLen = (int32_t)htonl(head.msgLen);
|
||||||
connBuf->total = msgLen;
|
p->total = msgLen;
|
||||||
}
|
}
|
||||||
if (connBuf->len == connBuf->cap && connBuf->total == connBuf->cap) {
|
if (p->total >= p->len) {
|
||||||
return true;
|
p->left = p->total - p->len;
|
||||||
|
} else {
|
||||||
|
p->left = 0;
|
||||||
}
|
}
|
||||||
return false;
|
|
||||||
}
|
|
||||||
int transPackMsg(STransMsgHead* msgHead, bool sercured, bool auth) { return 0; }
|
|
||||||
|
|
||||||
int transUnpackMsg(STransMsgHead* msgHead) { return 0; }
|
|
||||||
int transDestroyBuffer(SConnBuffer* buf) {
|
|
||||||
if (buf->cap > 0) {
|
|
||||||
taosMemoryFreeClear(buf->buf);
|
|
||||||
}
|
}
|
||||||
transClearBuffer(buf);
|
return p->left == 0 ? true : false;
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int transSetConnOption(uv_tcp_t* stream) {
|
int transSetConnOption(uv_tcp_t* stream) {
|
||||||
|
|
|
@ -212,9 +212,10 @@ static void uvHandleActivityTimeout(uv_timer_t* handle) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static void uvHandleReq(SSvrConn* pConn) {
|
static void uvHandleReq(SSvrConn* pConn) {
|
||||||
SConnBuffer* pBuf = &pConn->readBuf;
|
STransMsgHead* msg = NULL;
|
||||||
char* msg = pBuf->buf;
|
int msgLen = 0;
|
||||||
uint32_t msgLen = pBuf->len;
|
|
||||||
|
msgLen = transDumpFromBuffer(&pConn->readBuf, (char**)&msg);
|
||||||
|
|
||||||
STransMsgHead* pHead = (STransMsgHead*)msg;
|
STransMsgHead* pHead = (STransMsgHead*)msg;
|
||||||
pHead->code = htonl(pHead->code);
|
pHead->code = htonl(pHead->code);
|
||||||
|
@ -761,6 +762,7 @@ static SSvrConn* createConn(void* hThrd) {
|
||||||
memset(&pConn->regArg, 0, sizeof(pConn->regArg));
|
memset(&pConn->regArg, 0, sizeof(pConn->regArg));
|
||||||
pConn->broken = false;
|
pConn->broken = false;
|
||||||
pConn->status = ConnNormal;
|
pConn->status = ConnNormal;
|
||||||
|
transInitBuffer(&pConn->readBuf);
|
||||||
|
|
||||||
SExHandle* exh = taosMemoryMalloc(sizeof(SExHandle));
|
SExHandle* exh = taosMemoryMalloc(sizeof(SExHandle));
|
||||||
exh->handle = pConn;
|
exh->handle = pConn;
|
||||||
|
|
Loading…
Reference in New Issue