Merge pull request #15766 from taosdata/feature/rpcOptSent
opt: rpc opt sent
This commit is contained in:
commit
d4d382b6d2
|
@ -47,8 +47,6 @@ typedef struct SRpcHandleInfo {
|
||||||
int8_t persistHandle; // persist handle or not
|
int8_t persistHandle; // persist handle or not
|
||||||
int8_t hasEpSet;
|
int8_t hasEpSet;
|
||||||
|
|
||||||
STraceId traceId;
|
|
||||||
|
|
||||||
// app info
|
// app info
|
||||||
void *ahandle; // app handle set by client
|
void *ahandle; // app handle set by client
|
||||||
void *wrapper; // wrapper handle
|
void *wrapper; // wrapper handle
|
||||||
|
@ -58,7 +56,8 @@ typedef struct SRpcHandleInfo {
|
||||||
void *rsp;
|
void *rsp;
|
||||||
int32_t rspLen;
|
int32_t rspLen;
|
||||||
|
|
||||||
// conn info
|
STraceId traceId;
|
||||||
|
|
||||||
SRpcConnInfo conn;
|
SRpcConnInfo conn;
|
||||||
} SRpcHandleInfo;
|
} SRpcHandleInfo;
|
||||||
|
|
||||||
|
|
|
@ -396,7 +396,7 @@ typedef enum ELogicConditionType {
|
||||||
#ifdef WINDOWS
|
#ifdef WINDOWS
|
||||||
#define TSDB_MAX_RPC_THREADS 4 // windows pipe only support 4 connections.
|
#define TSDB_MAX_RPC_THREADS 4 // windows pipe only support 4 connections.
|
||||||
#else
|
#else
|
||||||
#define TSDB_MAX_RPC_THREADS 5
|
#define TSDB_MAX_RPC_THREADS 10
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#define TSDB_QUERY_TYPE_NON_TYPE 0x00u // none type
|
#define TSDB_QUERY_TYPE_NON_TYPE 0x00u // none type
|
||||||
|
|
|
@ -128,7 +128,7 @@ typedef struct {
|
||||||
|
|
||||||
int8_t retryCnt;
|
int8_t retryCnt;
|
||||||
int8_t retryLimit;
|
int8_t retryLimit;
|
||||||
// bool setMaxRetry;
|
|
||||||
STransCtx appCtx; //
|
STransCtx appCtx; //
|
||||||
STransMsg* pRsp; // for synchronous API
|
STransMsg* pRsp; // for synchronous API
|
||||||
tsem_t* pSem; // for synchronous API
|
tsem_t* pSem; // for synchronous API
|
||||||
|
@ -195,17 +195,7 @@ typedef enum { ConnNormal, ConnAcquire, ConnRelease, ConnBroken, ConnInPool } Co
|
||||||
|
|
||||||
#define transLabel(trans) ((STrans*)trans)->label
|
#define transLabel(trans) ((STrans*)trans)->label
|
||||||
|
|
||||||
// int rpcAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey);
|
|
||||||
// void rpcBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey);
|
|
||||||
//// int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen);
|
|
||||||
//
|
|
||||||
// int transAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey);
|
|
||||||
// void transBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey);
|
|
||||||
// bool transCompressMsg(char* msg, int32_t len, int32_t* flen);
|
|
||||||
// bool transDecompressMsg(char* msg, int32_t len, int32_t* flen);
|
|
||||||
|
|
||||||
void transFreeMsg(void* msg);
|
void transFreeMsg(void* msg);
|
||||||
|
|
||||||
//
|
//
|
||||||
typedef struct SConnBuffer {
|
typedef struct SConnBuffer {
|
||||||
char* buf;
|
char* buf;
|
||||||
|
@ -323,7 +313,7 @@ void* transCtxDumpBrokenlinkVal(STransCtx* ctx, int32_t* msgType);
|
||||||
// request list
|
// request list
|
||||||
typedef struct STransReq {
|
typedef struct STransReq {
|
||||||
queue q;
|
queue q;
|
||||||
void* data;
|
uv_write_t wreq;
|
||||||
} STransReq;
|
} STransReq;
|
||||||
|
|
||||||
void transReqQueueInit(queue* q);
|
void transReqQueueInit(queue* q);
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
|
|
||||||
typedef struct SConnList {
|
typedef struct SConnList {
|
||||||
queue conn;
|
queue conn;
|
||||||
|
int32_t size;
|
||||||
} SConnList;
|
} SConnList;
|
||||||
|
|
||||||
typedef struct SCliConn {
|
typedef struct SCliConn {
|
||||||
|
@ -339,8 +340,8 @@ void cliHandleResp(SCliConn* conn) {
|
||||||
tDebug("%s conn %p stop timer", CONN_GET_INST_LABEL(conn), conn);
|
tDebug("%s conn %p stop timer", CONN_GET_INST_LABEL(conn), conn);
|
||||||
uv_timer_stop(conn->timer);
|
uv_timer_stop(conn->timer);
|
||||||
}
|
}
|
||||||
conn->timer->data = NULL;
|
|
||||||
taosArrayPush(pThrd->timerList, &conn->timer);
|
taosArrayPush(pThrd->timerList, &conn->timer);
|
||||||
|
conn->timer->data = NULL;
|
||||||
conn->timer = NULL;
|
conn->timer = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -510,7 +511,7 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) {
|
||||||
SHashObj* pPool = pool;
|
SHashObj* pPool = pool;
|
||||||
SConnList* plist = taosHashGet(pPool, key, strlen(key));
|
SConnList* plist = taosHashGet(pPool, key, strlen(key));
|
||||||
if (plist == NULL) {
|
if (plist == NULL) {
|
||||||
SConnList list;
|
SConnList list = {0};
|
||||||
taosHashPut(pPool, key, strlen(key), (void*)&list, sizeof(list));
|
taosHashPut(pPool, key, strlen(key), (void*)&list, sizeof(list));
|
||||||
plist = taosHashGet(pPool, key, strlen(key));
|
plist = taosHashGet(pPool, key, strlen(key));
|
||||||
QUEUE_INIT(&plist->conn);
|
QUEUE_INIT(&plist->conn);
|
||||||
|
@ -519,15 +520,18 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) {
|
||||||
if (QUEUE_IS_EMPTY(&plist->conn)) {
|
if (QUEUE_IS_EMPTY(&plist->conn)) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
plist->size -= 1;
|
||||||
queue* h = QUEUE_HEAD(&plist->conn);
|
queue* h = QUEUE_HEAD(&plist->conn);
|
||||||
SCliConn* conn = QUEUE_DATA(h, SCliConn, q);
|
SCliConn* conn = QUEUE_DATA(h, SCliConn, q);
|
||||||
conn->status = ConnNormal;
|
conn->status = ConnNormal;
|
||||||
QUEUE_REMOVE(&conn->q);
|
QUEUE_REMOVE(&conn->q);
|
||||||
QUEUE_INIT(&conn->q);
|
QUEUE_INIT(&conn->q);
|
||||||
|
|
||||||
|
if (conn->task != NULL) {
|
||||||
transDQCancel(((SCliThrd*)conn->hostThrd)->timeoutQueue, conn->task);
|
transDQCancel(((SCliThrd*)conn->hostThrd)->timeoutQueue, conn->task);
|
||||||
conn->task = NULL;
|
conn->task = NULL;
|
||||||
|
}
|
||||||
return conn;
|
return conn;
|
||||||
}
|
}
|
||||||
static void addConnToPool(void* pool, SCliConn* conn) {
|
static void addConnToPool(void* pool, SCliConn* conn) {
|
||||||
|
@ -539,6 +543,13 @@ static void addConnToPool(void* pool, SCliConn* conn) {
|
||||||
|
|
||||||
allocConnRef(conn, true);
|
allocConnRef(conn, true);
|
||||||
|
|
||||||
|
if (conn->timer != NULL) {
|
||||||
|
uv_timer_stop(conn->timer);
|
||||||
|
taosArrayPush(thrd->timerList, &conn->timer);
|
||||||
|
conn->timer->data = NULL;
|
||||||
|
conn->timer = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
STrans* pTransInst = thrd->pTransInst;
|
STrans* pTransInst = thrd->pTransInst;
|
||||||
cliReleaseUnfinishedMsg(conn);
|
cliReleaseUnfinishedMsg(conn);
|
||||||
transQueueClear(&conn->cliMsgs);
|
transQueueClear(&conn->cliMsgs);
|
||||||
|
@ -556,13 +567,17 @@ static void addConnToPool(void* pool, SCliConn* conn) {
|
||||||
assert(conn->list != NULL);
|
assert(conn->list != NULL);
|
||||||
QUEUE_INIT(&conn->q);
|
QUEUE_INIT(&conn->q);
|
||||||
QUEUE_PUSH(&conn->list->conn, &conn->q);
|
QUEUE_PUSH(&conn->list->conn, &conn->q);
|
||||||
|
conn->list->size += 1;
|
||||||
|
|
||||||
|
conn->task = NULL;
|
||||||
assert(!QUEUE_IS_EMPTY(&conn->list->conn));
|
assert(!QUEUE_IS_EMPTY(&conn->list->conn));
|
||||||
|
|
||||||
|
if (conn->list->size >= 50) {
|
||||||
STaskArg* arg = taosMemoryCalloc(1, sizeof(STaskArg));
|
STaskArg* arg = taosMemoryCalloc(1, sizeof(STaskArg));
|
||||||
arg->param1 = conn;
|
arg->param1 = conn;
|
||||||
arg->param2 = thrd;
|
arg->param2 = thrd;
|
||||||
conn->task = transDQSched(thrd->timeoutQueue, doCloseIdleConn, arg, CONN_PERSIST_TIME(pTransInst->idleTime));
|
conn->task = transDQSched(thrd->timeoutQueue, doCloseIdleConn, arg, CONN_PERSIST_TIME(pTransInst->idleTime));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
static int32_t allocConnRef(SCliConn* conn, bool update) {
|
static int32_t allocConnRef(SCliConn* conn, bool update) {
|
||||||
if (update) {
|
if (update) {
|
||||||
|
@ -1374,7 +1389,7 @@ int transReleaseCliHandle(void* handle) {
|
||||||
}
|
}
|
||||||
|
|
||||||
STransMsg tmsg = {.info.handle = handle};
|
STransMsg tmsg = {.info.handle = handle};
|
||||||
TRACE_SET_MSGID(&tmsg.info.traceId, tGenIdPI64());
|
// TRACE_SET_MSGID(&tmsg.info.traceId, tGenIdPI64());
|
||||||
|
|
||||||
SCliMsg* cmsg = taosMemoryCalloc(1, sizeof(SCliMsg));
|
SCliMsg* cmsg = taosMemoryCalloc(1, sizeof(SCliMsg));
|
||||||
cmsg->msg = tmsg;
|
cmsg->msg = tmsg;
|
||||||
|
@ -1415,7 +1430,6 @@ int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STran
|
||||||
if (ctx != NULL) {
|
if (ctx != NULL) {
|
||||||
pCtx->appCtx = *ctx;
|
pCtx->appCtx = *ctx;
|
||||||
}
|
}
|
||||||
assert(pTransInst->connType == TAOS_CONN_CLIENT);
|
|
||||||
|
|
||||||
SCliMsg* cliMsg = taosMemoryCalloc(1, sizeof(SCliMsg));
|
SCliMsg* cliMsg = taosMemoryCalloc(1, sizeof(SCliMsg));
|
||||||
cliMsg->ctx = pCtx;
|
cliMsg->ctx = pCtx;
|
||||||
|
|
|
@ -23,33 +23,6 @@ static TdThreadOnce transModuleInit = PTHREAD_ONCE_INIT;
|
||||||
static int32_t refMgt;
|
static int32_t refMgt;
|
||||||
static int32_t instMgt;
|
static int32_t instMgt;
|
||||||
|
|
||||||
int transAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey) {
|
|
||||||
T_MD5_CTX context;
|
|
||||||
int ret = -1;
|
|
||||||
|
|
||||||
tMD5Init(&context);
|
|
||||||
tMD5Update(&context, (uint8_t*)pKey, TSDB_PASSWORD_LEN);
|
|
||||||
tMD5Update(&context, (uint8_t*)pMsg, msgLen);
|
|
||||||
tMD5Update(&context, (uint8_t*)pKey, TSDB_PASSWORD_LEN);
|
|
||||||
tMD5Final(&context);
|
|
||||||
|
|
||||||
if (memcmp(context.digest, pAuth, sizeof(context.digest)) == 0) ret = 0;
|
|
||||||
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
|
|
||||||
void transBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey) {
|
|
||||||
T_MD5_CTX context;
|
|
||||||
|
|
||||||
tMD5Init(&context);
|
|
||||||
tMD5Update(&context, (uint8_t*)pKey, TSDB_PASSWORD_LEN);
|
|
||||||
tMD5Update(&context, (uint8_t*)pMsg, msgLen);
|
|
||||||
tMD5Update(&context, (uint8_t*)pKey, TSDB_PASSWORD_LEN);
|
|
||||||
tMD5Final(&context);
|
|
||||||
|
|
||||||
memcpy(pAuth, context.digest, sizeof(context.digest));
|
|
||||||
}
|
|
||||||
|
|
||||||
bool transCompressMsg(char* msg, int32_t len, int32_t* flen) {
|
bool transCompressMsg(char* msg, int32_t len, int32_t* flen) {
|
||||||
return false;
|
return false;
|
||||||
// SRpcHead* pHead = rpcHeadFromCont(pCont);
|
// SRpcHead* pHead = rpcHeadFromCont(pCont);
|
||||||
|
@ -176,7 +149,6 @@ int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf) {
|
||||||
* info--->|
|
* info--->|
|
||||||
*/
|
*/
|
||||||
SConnBuffer* p = connBuf;
|
SConnBuffer* p = connBuf;
|
||||||
|
|
||||||
uvBuf->base = p->buf + p->len;
|
uvBuf->base = p->buf + p->len;
|
||||||
if (p->left == -1) {
|
if (p->left == -1) {
|
||||||
uvBuf->len = p->cap - p->len;
|
uvBuf->len = p->cap - p->len;
|
||||||
|
@ -184,7 +156,8 @@ int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf) {
|
||||||
if (p->left < p->cap - p->len) {
|
if (p->left < p->cap - p->len) {
|
||||||
uvBuf->len = p->left;
|
uvBuf->len = p->left;
|
||||||
} else {
|
} else {
|
||||||
p->buf = taosMemoryRealloc(p->buf, p->left + p->len);
|
p->cap = p->left + p->len;
|
||||||
|
p->buf = taosMemoryRealloc(p->buf, p->cap);
|
||||||
uvBuf->base = p->buf + p->len;
|
uvBuf->base = p->buf + p->len;
|
||||||
uvBuf->len = p->left;
|
uvBuf->len = p->left;
|
||||||
}
|
}
|
||||||
|
@ -266,14 +239,9 @@ int transAsyncSend(SAsyncPool* pool, queue* q) {
|
||||||
uv_async_t* async = &(pool->asyncs[idx]);
|
uv_async_t* async = &(pool->asyncs[idx]);
|
||||||
SAsyncItem* item = async->data;
|
SAsyncItem* item = async->data;
|
||||||
|
|
||||||
int64_t st = taosGetTimestampUs();
|
|
||||||
taosThreadMutexLock(&item->mtx);
|
taosThreadMutexLock(&item->mtx);
|
||||||
QUEUE_PUSH(&item->qmsg, q);
|
QUEUE_PUSH(&item->qmsg, q);
|
||||||
taosThreadMutexUnlock(&item->mtx);
|
taosThreadMutexUnlock(&item->mtx);
|
||||||
int64_t el = taosGetTimestampUs() - st;
|
|
||||||
if (el > 50) {
|
|
||||||
// tInfo("lock and unlock cost:%d", (int)el);
|
|
||||||
}
|
|
||||||
return uv_async_send(async);
|
return uv_async_send(async);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -349,30 +317,21 @@ void transReqQueueInit(queue* q) {
|
||||||
QUEUE_INIT(q);
|
QUEUE_INIT(q);
|
||||||
}
|
}
|
||||||
void* transReqQueuePush(queue* q) {
|
void* transReqQueuePush(queue* q) {
|
||||||
uv_write_t* req = taosMemoryCalloc(1, sizeof(uv_write_t));
|
STransReq* req = taosMemoryCalloc(1, sizeof(STransReq));
|
||||||
STransReq* wreq = taosMemoryCalloc(1, sizeof(STransReq));
|
req->wreq.data = req;
|
||||||
wreq->data = req;
|
QUEUE_PUSH(q, &req->q);
|
||||||
req->data = wreq;
|
return &req->wreq;
|
||||||
QUEUE_PUSH(q, &wreq->q);
|
|
||||||
return req;
|
|
||||||
}
|
}
|
||||||
void* transReqQueueRemove(void* arg) {
|
void* transReqQueueRemove(void* arg) {
|
||||||
void* ret = NULL;
|
void* ret = NULL;
|
||||||
uv_write_t* req = arg;
|
uv_write_t* wreq = arg;
|
||||||
STransReq* wreq = req && req->data ? req->data : NULL;
|
|
||||||
|
|
||||||
assert(wreq->data == req);
|
STransReq* req = wreq ? wreq->data : NULL;
|
||||||
if (wreq == NULL || wreq->data == NULL) {
|
if (req == NULL) return NULL;
|
||||||
taosMemoryFree(wreq->data);
|
QUEUE_REMOVE(&req->q);
|
||||||
taosMemoryFree(wreq);
|
|
||||||
return req;
|
|
||||||
}
|
|
||||||
|
|
||||||
QUEUE_REMOVE(&wreq->q);
|
ret = wreq && wreq->handle ? wreq->handle->data : NULL;
|
||||||
|
taosMemoryFree(req);
|
||||||
ret = req && req->handle ? req->handle->data : NULL;
|
|
||||||
taosMemoryFree(wreq->data);
|
|
||||||
taosMemoryFree(wreq);
|
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
@ -381,7 +340,6 @@ void transReqQueueClear(queue* q) {
|
||||||
queue* h = QUEUE_HEAD(q);
|
queue* h = QUEUE_HEAD(q);
|
||||||
QUEUE_REMOVE(h);
|
QUEUE_REMOVE(h);
|
||||||
STransReq* req = QUEUE_DATA(h, STransReq, q);
|
STransReq* req = QUEUE_DATA(h, STransReq, q);
|
||||||
taosMemoryFree(req->data);
|
|
||||||
taosMemoryFree(req);
|
taosMemoryFree(req);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -75,7 +75,6 @@ typedef struct SWorkThrd {
|
||||||
SAsyncPool* asyncPool;
|
SAsyncPool* asyncPool;
|
||||||
uv_prepare_t* prepare;
|
uv_prepare_t* prepare;
|
||||||
queue msg;
|
queue msg;
|
||||||
TdThreadMutex msgMtx;
|
|
||||||
|
|
||||||
queue conn;
|
queue conn;
|
||||||
void* pTransInst;
|
void* pTransInst;
|
||||||
|
@ -499,6 +498,7 @@ void uvWorkerAsyncCb(uv_async_t* handle) {
|
||||||
tError("unexcept occurred, continue");
|
tError("unexcept occurred, continue");
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
// release handle to rpc init
|
// release handle to rpc init
|
||||||
if (msg->type == Quit) {
|
if (msg->type == Quit) {
|
||||||
(*transAsyncHandle[msg->type])(msg, pThrd);
|
(*transAsyncHandle[msg->type])(msg, pThrd);
|
||||||
|
@ -743,7 +743,6 @@ static bool addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName) {
|
||||||
pThrd->pipe->data = pThrd;
|
pThrd->pipe->data = pThrd;
|
||||||
|
|
||||||
QUEUE_INIT(&pThrd->msg);
|
QUEUE_INIT(&pThrd->msg);
|
||||||
taosThreadMutexInit(&pThrd->msgMtx, NULL);
|
|
||||||
|
|
||||||
pThrd->prepare = taosMemoryCalloc(1, sizeof(uv_prepare_t));
|
pThrd->prepare = taosMemoryCalloc(1, sizeof(uv_prepare_t));
|
||||||
uv_prepare_init(pThrd->loop, pThrd->prepare);
|
uv_prepare_init(pThrd->loop, pThrd->prepare);
|
||||||
|
|
|
@ -75,15 +75,14 @@ void processShellMsg() {
|
||||||
|
|
||||||
void *handle = pRpcMsg->info.handle;
|
void *handle = pRpcMsg->info.handle;
|
||||||
taosFreeQitem(pRpcMsg);
|
taosFreeQitem(pRpcMsg);
|
||||||
|
//{
|
||||||
{
|
// SRpcMsg nRpcMsg = {0};
|
||||||
SRpcMsg nRpcMsg = {0};
|
// nRpcMsg.pCont = rpcMallocCont(msgSize);
|
||||||
nRpcMsg.pCont = rpcMallocCont(msgSize);
|
// nRpcMsg.contLen = msgSize;
|
||||||
nRpcMsg.contLen = msgSize;
|
// nRpcMsg.info.handle = handle;
|
||||||
nRpcMsg.info.handle = handle;
|
// nRpcMsg.code = TSDB_CODE_CTG_NOT_READY;
|
||||||
nRpcMsg.code = TSDB_CODE_CTG_NOT_READY;
|
// rpcSendResponse(&nRpcMsg);
|
||||||
rpcSendResponse(&nRpcMsg);
|
//}
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
taosUpdateItemSize(qinfo.queue, numOfMsgs);
|
taosUpdateItemSize(qinfo.queue, numOfMsgs);
|
||||||
|
|
Loading…
Reference in New Issue