refactor transport

This commit is contained in:
yihaoDeng 2024-09-04 16:50:52 +08:00
parent 603de3976c
commit f9290cf13f
4 changed files with 154 additions and 461 deletions

View File

@ -30,9 +30,9 @@ void mndPostProcessQueryMsg(SRpcMsg *pMsg) {
(void)qWorkerAbortPreprocessQueryMsg(pMnode->pQuery, pMsg);
}
int32_t mndProcessQueryMsg(SRpcMsg *pMsg, SQueueInfo* pInfo) {
int32_t code = -1;
SMnode *pMnode = pMsg->info.node;
int32_t mndProcessQueryMsg(SRpcMsg *pMsg, SQueueInfo *pInfo) {
int32_t code = -1;
SMnode *pMnode = pMsg->info.node;
SReadHandle handle = {.mnd = pMnode, .pMsgCb = &pMnode->msgCb, .pWorkerCb = pInfo->workerCb};
@ -67,26 +67,25 @@ int32_t mndProcessQueryMsg(SRpcMsg *pMsg, SQueueInfo* pInfo) {
return code;
}
static FORCE_INLINE void mnodeFreeSBatchRspMsg(void* p) {
static FORCE_INLINE void mnodeFreeSBatchRspMsg(void *p) {
if (NULL == p) {
return;
}
SBatchRspMsg* pRsp = (SBatchRspMsg*)p;
SBatchRspMsg *pRsp = (SBatchRspMsg *)p;
rpcFreeCont(pRsp->msg);
}
int32_t mndProcessBatchMetaMsg(SRpcMsg *pMsg) {
int32_t code = 0;
int32_t rspSize = 0;
SBatchReq batchReq = {0};
SBatchMsg req = {0};
int32_t code = 0;
int32_t rspSize = 0;
SBatchReq batchReq = {0};
SBatchMsg req = {0};
SBatchRspMsg rsp = {0};
SBatchRsp batchRsp = {0};
SRpcMsg reqMsg = *pMsg;
void *pRsp = NULL;
SMnode *pMnode = pMsg->info.node;
SBatchRsp batchRsp = {0};
SRpcMsg reqMsg = *pMsg;
void *pRsp = NULL;
SMnode *pMnode = pMsg->info.node;
if ((code = tDeserializeSBatchReq(pMsg->pCont, pMsg->contLen, &batchReq)) != 0) {
code = TSDB_CODE_OUT_OF_MEMORY;
@ -94,7 +93,7 @@ int32_t mndProcessBatchMetaMsg(SRpcMsg *pMsg) {
goto _exit;
}
int32_t msgNum = taosArrayGetSize(batchReq.pMsgs);
int32_t msgNum = taosArrayGetSize(batchReq.pMsgs);
if (msgNum >= MAX_META_MSG_IN_BATCH) {
code = TSDB_CODE_INVALID_MSG;
mError("too many msgs %d in mnode batch meta req", msgNum);
@ -108,7 +107,7 @@ int32_t mndProcessBatchMetaMsg(SRpcMsg *pMsg) {
}
for (int32_t i = 0; i < msgNum; ++i) {
SBatchMsg* req = taosArrayGet(batchReq.pMsgs, i);
SBatchMsg *req = taosArrayGet(batchReq.pMsgs, i);
reqMsg.msgType = req->msgType;
reqMsg.pCont = req->msg;

View File

@ -982,6 +982,7 @@ int32_t schAsyncSendMsg(SSchJob *pJob, SSchTask *pTask, SSchTrans *trans, SQuery
SCH_ERR_JRET(code);
}
trans->pHandle = (void *)refId;
pMsgSendInfo->msgInfo.handle = trans->pHandle;
}
if (pJob && pTask) {

View File

@ -80,6 +80,7 @@ typedef struct SCliConn {
HeapNode node; // for heap
int8_t inHeap;
int32_t reqRefCnt;
uint32_t clientIp;
uint32_t serverIp;
@ -172,6 +173,7 @@ static int32_t cliDoConn(SCliThrd* pThrd, SCliConn* conn);
static void cliSendBatch_shareConnCb(uv_write_t* req, int status);
void cliSendBatch_shareConn(SCliConn* pConn);
int32_t cliSend2(SCliConn* conn);
bool cliConnRmReleaseReq(SCliConn* conn, STransMsgHead* pHead);
// register conn timer
static void cliConnTimeout(uv_timer_t* handle);
// register timer for read
@ -190,7 +192,7 @@ static void cliAsyncCb(uv_async_t* handle);
// static void cliIdleCb(uv_idle_t* handle);
// static void cliPrepareCb(uv_prepare_t* handle);
static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd);
// static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd);
static void cliSendBatchCb(uv_write_t* req, int status);
SCliBatch* cliGetHeadFromList(SCliBatchList* pList);
@ -202,11 +204,11 @@ static int32_t allocConnRef(SCliConn* conn, bool update);
static int cliNotifyCb(SCliConn* pConn, SCliReq* pReq, STransMsg* pResp);
void cliResetConnTimer(SCliConn* conn);
static void cliDestroyConn(SCliConn* pConn, bool clear /*clear tcp handle or not*/);
static void cliDestroy(uv_handle_t* handle);
static int32_t cliSend(SCliConn* pConn);
static void cliSendBatch(SCliConn* pConn);
static void cliDestroyConnMsgs(SCliConn* conn, bool destroy);
static void cliDestroyConn(SCliConn* pConn, bool clear /*clear tcp handle or not*/);
static void cliDestroy(uv_handle_t* handle);
// static int32_t cliSend(SCliConn* pConn);
// static void cliSendBatch(SCliConn* pConn);
static void cliDestroyConnMsgs(SCliConn* conn, bool destroy);
static void doFreeTimeoutMsg(void* param);
static int32_t cliPreCheckSessionLimitForMsg(SCliThrd* pThrd, char* addr, SCliReq** pReq);
@ -459,16 +461,6 @@ void cliResetConnTimer(SCliConn* conn) {
void cliHandleBatchResp(SCliConn* conn) { ASSERT(0); }
bool cliShouldAddConnToPool(SCliConn* conn) {
SCliThrd* pThrd = conn->hostThrd;
bool empty = transQueueEmpty(&conn->reqs);
if (empty) {
(void)delConnFromHeapCache(pThrd->connHeapCache, conn);
}
return empty;
}
int32_t cliGetReqBySeq(SCliConn* conn, int32_t seq, SCliReq** pReq) {
int32_t code = 0;
for (int i = 0; i < transQueueSize(&conn->reqs); i++) {
@ -485,6 +477,7 @@ int32_t cliGetReqBySeq(SCliConn* conn, int32_t seq, SCliReq** pReq) {
int8_t cliMayRecycleConn(SCliConn* conn) {
SCliThrd* pThrd = conn->hostThrd;
if (transQueueSize(&conn->reqs) == 0) {
(void)delConnFromHeapCache(pThrd->connHeapCache, conn);
addConnToPool(pThrd->pool, conn);
return 1;
}
@ -494,12 +487,13 @@ int8_t cliMayRecycleConn(SCliConn* conn) {
int32_t cliBuildRespFromCont(SCliReq* pReq, STransMsg* pResp, STransMsgHead* pHead) {
pResp->contLen = transContLenFromMsg(pHead->msgLen);
pResp->pCont = transContFromHead((char*)pHead);
pResp->code = pReq->msg.code;
pResp->msgType = pReq->msg.msgType;
pResp->code = pHead->code;
pResp->msgType = pHead->msgType;
pResp->info.ahandle = pReq->ctx ? pReq->ctx->ahandle : NULL;
pResp->info.traceId = pReq->msg.info.traceId;
pResp->info.hasEpSet = pReq->msg.info.hasEpSet;
pResp->info.cliVer = pReq->msg.info.cliVer;
pResp->info.traceId = pHead->traceId;
pResp->info.hasEpSet = pHead->hasEpSet;
pResp->info.cliVer = htonl(pHead->compatibilityVer);
pResp->info.seqNum = htonl(pHead->seqNum);
return 0;
}
void cliHandleResp2(SCliConn* conn) {
@ -525,17 +519,27 @@ void cliHandleResp2(SCliConn* conn) {
return;
}
pHead->code = htonl(pHead->code);
pHead->msgLen = htonl(pHead->msgLen);
SCliReq* pReq = NULL;
int32_t seq = htonl(pHead->seqNum);
code = cliGetReqBySeq(conn, seq, &pReq);
if (code != 0) {
tDebug("%s conn %p recv unexpected packet, reason:%s", CONN_GET_INST_LABEL(conn), conn, tstrerror(code));
if (cliConnRmReleaseReq(conn, pHead)) {
return;
} else {
}
tDebug("%s conn %p recv unexpected packet, seqNum:%d, reason:%s", CONN_GET_INST_LABEL(conn), conn, seq,
tstrerror(code));
// TODO: notify cb
if (cliMayRecycleConn(conn)) {
return;
}
return;
}
pHead->code = htonl(pHead->code);
pHead->msgLen = htonl(pHead->msgLen);
// TODO handle release req
// if (cliRecvReleaseReq(conn, pHead)) {
// return;
@ -545,6 +549,9 @@ void cliHandleResp2(SCliConn* conn) {
code = cliBuildRespFromCont(pReq, &resp, pHead);
if (code != 0) {
tDebug("%s conn %p recv invalid packet, seq %d not found", CONN_GET_INST_LABEL(conn), conn, seq);
} else {
tDebug("%s conn %p %s received from %s, local info:%s, len:%d, seq:%d", CONN_GET_INST_LABEL(conn), conn,
TMSG_INFO(resp.msgType), conn->dst, conn->src, pHead->msgLen, seq);
}
code = cliNotifyCb(conn, pReq, &resp);
@ -1054,6 +1061,7 @@ static void addConnToPool(void* pool, SCliConn* conn) {
if (conn->status == ConnInPool) {
return;
}
uv_read_stop(conn->stream);
conn->seq = 0;
int32_t code = allocConnRef(conn, true);
@ -1240,7 +1248,7 @@ static int32_t cliCreateConn2(SCliThrd* pThrd, SCliReq* pReq, SCliConn** ppConn)
int32_t port = EPSET_GET_INUSE_PORT(&pReq->ctx->epSet);
TAOS_CHECK_GOTO(cliCreateConn(pThrd, &pConn, ip, port), NULL, _exception);
TAOS_CHECK_GOTO(addConnToHeapCache(pThrd->connHeapCache, pConn), NULL, _exception);
// TAOS_CHECK_GOTO(addConnToHeapCache(pThrd->connHeapCache, pConn), NULL, _exception);
transQueuePush(&pConn->reqs, pReq);
@ -1348,60 +1356,6 @@ static void cliDestroy(uv_handle_t* handle) {
taosMemoryFree(conn);
}
// static bool cliHandleNoResp(SCliConn* conn) {
// bool res = false;
// if (!transQueueEmpty(&conn->reqs)) {
// SCliReq* pReq = transQueueGet(&conn->reqs, 0);
// if (REQUEST_NO_RESP(&pReq->msg)) {
// (void)transQueuePop(&conn->reqs);
// destroyReq(pReq);
// res = true;
// }
// if (res == true) {
// if (cliMaySendCachedMsg(conn) == false) {
// SCliThrd* thrd = conn->hostThrd;
// addConnToPool(thrd->pool, conn);
// res = false;
// } else {
// res = true;
// }
// }
// }
// return res;
// }
static void cliSendCb(uv_write_t* req, int status) {
STUB_RAND_NETWORK_ERR(status);
// SCliConn* pConn = transReqQueueRemove(req);
// if (pConn == NULL) return;
// SCliReq* pReq = transQueueGet(&pConn->reqs, 0);
// if (pReq != NULL) {
// int64_t cost = taosGetTimestampUs() - pReq->st;
// if (cost > 1000 * 50) {
// tTrace("%s conn %p send cost:%dus ", CONN_GET_INST_LABEL(pConn), pConn, (int)cost);
// }
// }
// if (pReq != NULL && pReq->msg.contLen == 0 && pReq->msg.pCont != 0) {
// rpcFreeCont(pReq->msg.pCont);
// pReq->msg.pCont = 0;
// }
// if (status == 0) {
// tDebug("%s conn %p data already was written out", CONN_GET_INST_LABEL(pConn), pConn);
// } else {
// if (!uv_is_closing((uv_handle_t*)&pConn->stream)) {
// tError("%s conn %p failed to write:%s", CONN_GET_INST_LABEL(pConn), pConn, uv_err_name(status));
// cliHandleExcept(pConn, -1);
// }
// return;
// }
// if (cliHandleNoResp(pConn) == true) {
// tTrace("%s conn %p no resp required", CONN_GET_INST_LABEL(pConn), pConn);
// return;
// }
// (void)uv_read_start((uv_stream_t*)pConn->stream, cliAllocRecvBufferCb, cliRecvCb);
}
static void cliHandleBatch_shareConnExcept(SCliConn* conn) {
int32_t code = 0;
@ -1449,8 +1403,6 @@ static int32_t cliShouldSendMsg(SCliConn* conn) {
for (int i = 0; i < transQueueSize(&conn->reqs); i++) {
SCliReq* pReq = transQueueGet(&conn->reqs, i);
if (pReq->sent == 0) {
// pReq->sent = 1;
// pReq->seq = conn->seq;
return 1;
}
}
@ -1463,23 +1415,18 @@ static void cliSendBatch_shareConnCb(uv_write_t* req, int status) {
cliConnRmReqs(conn);
if (status != 0) {
tDebug("%s conn %p failed to send batch msg, reason:%s", CONN_GET_INST_LABEL(conn), conn, uv_err_name(status));
tDebug("%s conn %p failed to send msg, reason:%s", CONN_GET_INST_LABEL(conn), conn, uv_err_name(status));
if (!uv_is_closing((uv_handle_t*)&conn->stream)) {
cliHandleBatch_shareConnExcept(conn);
}
return;
}
int ret = uv_read_start((uv_stream_t*)conn->stream, cliAllocRecvBufferCb, cliRecvCb);
(void)uv_read_start((uv_stream_t*)conn->stream, cliAllocRecvBufferCb, cliRecvCb);
taosMemoryFree(req);
if (ret != 0) {
tError("%s conn %p failed to start read, reason:%s", CONN_GET_INST_LABEL(conn), conn, uv_err_name(ret));
cliHandleBatch_shareConnExcept(conn);
}
if (cliShouldSendMsg(conn) == 1) {
if (!cliMayRecycleConn(conn)) {
cliSendBatch_shareConn(conn);
}
taosMemoryFree(req);
}
void cliSendBatch_shareConn(SCliConn* pConn) {
SCliThrd* pThrd = pConn->hostThrd;
@ -1489,7 +1436,6 @@ void cliSendBatch_shareConn(SCliConn* pConn) {
int32_t totalLen = 0;
if (size == 0) {
tError("%s conn %p not msg to send", pInst->label, pConn);
ASSERT(0);
return;
}
uv_buf_t* wb = taosMemoryCalloc(size, sizeof(uv_buf_t));
@ -1542,108 +1488,24 @@ void cliSendBatch_shareConn(SCliConn* pConn) {
pCliMsg->sent = 1;
pCliMsg->seq = pConn->seq;
STraceId* trace = &pCliMsg->msg.info.traceId;
tGDebug("%s conn %p %s is sent to %s, local info:%s, seq:%d", CONN_GET_INST_LABEL(pConn), pConn,
TMSG_INFO(pReq->msgType), pConn->dst, pConn->src, pConn->seq);
}
if (j == 0) {
taosMemoryFree(wb);
return;
}
uv_write_t* req = taosMemoryCalloc(1, sizeof(uv_write_t));
req->data = pConn;
pConn->shareCnt += 1;
tDebug("%s conn %p start to send batch msg, batch size:%d, len:%d", CONN_GET_INST_LABEL(pConn), pConn, size,
totalLen);
tDebug("%s conn %p start to send msg, batch size:%d, len:%d", CONN_GET_INST_LABEL(pConn), pConn, size, totalLen);
uv_write(req, (uv_stream_t*)pConn->stream, wb, j, cliSendBatch_shareConnCb);
taosMemoryFree(wb);
}
void cliSendBatch(SCliConn* pConn) {
int32_t code = 0;
SCliThrd* pThrd = pConn->hostThrd;
STrans* pInst = pThrd->pInst;
SCliBatch* pBatch = pConn->pBatch;
int32_t wLen = pBatch->wLen;
pBatch->pList->connCnt += 1;
uv_buf_t* wb = taosMemoryCalloc(wLen, sizeof(uv_buf_t));
if (wb == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
tError("%s conn %p failed to send batch msg since:%s", CONN_GET_INST_LABEL(pConn), pConn, tstrerror(code));
goto _exception;
}
int i = 0;
queue* h = NULL;
QUEUE_FOREACH(h, &pBatch->wq) {
SCliReq* pCliMsg = QUEUE_DATA(h, SCliReq, q);
SReqCtx* pCtx = pCliMsg->ctx;
STransMsg* pReq = (STransMsg*)(&pCliMsg->msg);
if (pReq->pCont == 0) {
pReq->pCont = (void*)rpcMallocCont(0);
if (pReq->pCont == NULL) {
code = TSDB_CODE_OUT_OF_BUFFER;
tError("%s conn %p failed to send batch msg since:%s", CONN_GET_INST_LABEL(pConn), pConn, tstrerror(code));
goto _exception;
}
pReq->contLen = 0;
}
pConn->seq++;
int msgLen = transMsgLenFromCont(pReq->contLen);
STransMsgHead* pHead = transHeadFromCont(pReq->pCont);
if (pHead->comp == 0) {
pHead->ahandle = pCtx != NULL ? (uint64_t)pCtx->ahandle : 0;
pHead->noResp = REQUEST_NO_RESP(pReq) ? 1 : 0;
pHead->persist = REQUEST_PERSIS_HANDLE(pReq) ? 1 : 0;
pHead->msgType = pReq->msgType;
pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
pHead->release = REQUEST_RELEASE_HANDLE(pCliMsg) ? 1 : 0;
memcpy(pHead->user, pInst->user, strlen(pInst->user));
pHead->traceId = pReq->info.traceId;
pHead->magicNum = htonl(TRANS_MAGIC_NUM);
pHead->version = TRANS_VER;
pHead->compatibilityVer = htonl(pInst->compatibilityVer);
}
pHead->timestamp = taosHton64(taosGetTimestampUs());
pHead->seqNum = htonl(pConn->seq);
if (pHead->comp == 0 && pReq->info.compressed == 0 && pConn->clientIp != pConn->serverIp) {
if (pInst->compressSize != -1 && pInst->compressSize < pReq->contLen) {
msgLen = transCompressMsg(pReq->pCont, pReq->contLen) + sizeof(STransMsgHead);
pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
}
} else {
msgLen = (int32_t)ntohl((uint32_t)(pHead->msgLen));
}
wb[i++] = uv_buf_init((char*)pHead, msgLen);
}
uv_write_t* req = taosMemoryCalloc(1, sizeof(uv_write_t));
if (req == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
tError("%s conn %p failed to send batch msg since:%s", CONN_GET_INST_LABEL(pConn), pConn, tstrerror(code));
goto _exception;
}
req->data = pConn;
tDebug("%s conn %p start to send batch msg, batch size:%d, msgLen:%d", CONN_GET_INST_LABEL(pConn), pConn,
pBatch->wLen, pBatch->batchSize);
code = uv_write(req, (uv_stream_t*)pConn->stream, wb, wLen, cliSendBatchCb);
if (code != 0) {
tDebug("%s conn %p failed to to send batch msg since %s", CONN_GET_INST_LABEL(pConn), pConn, uv_err_name(code));
goto _exception;
}
taosMemoryFree(wb);
return;
_exception:
cliDestroyBatch(pBatch);
taosMemoryFree(wb);
pConn->pBatch = NULL;
return;
}
// int32_t cliSend2(SCliConn* pConn) {}
int32_t cliSendReq(SCliConn* pConn, SCliReq* pCliMsg) {
int32_t code = 0;
transQueuePush(&pConn->reqs, pCliMsg);
@ -1655,79 +1517,6 @@ int32_t cliSend2(SCliConn* pConn) {
cliSendBatch_shareConn(pConn);
return 0;
}
int32_t cliSend(SCliConn* pConn) {
SCliThrd* pThrd = pConn->hostThrd;
STrans* pInst = pThrd->pInst;
SCliReq* pCliReq = NULL;
int32_t code = cliConnFindToSendMsg(pConn, &pCliReq);
if (code != 0) {
return code;
}
SReqCtx* pCtx = pCliReq->ctx;
STransMsg* pReq = (STransMsg*)(&pCliReq->msg);
if (pReq->pCont == 0) {
pReq->pCont = (void*)rpcMallocCont(0);
if (pReq->pCont == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
tDebug("malloc memory: %p", pReq->pCont);
pReq->contLen = 0;
}
int msgLen = transMsgLenFromCont(pReq->contLen);
STransMsgHead* pHead = transHeadFromCont(pReq->pCont);
if (pHead->comp == 0) {
pHead->ahandle = pCtx != NULL ? (uint64_t)pCtx->ahandle : 0;
pHead->noResp = REQUEST_NO_RESP(pReq) ? 1 : 0;
pHead->persist = REQUEST_PERSIS_HANDLE(pReq) ? 1 : 0;
pHead->msgType = pReq->msgType;
pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
pHead->release = REQUEST_RELEASE_HANDLE(pCliReq) ? 1 : 0;
memcpy(pHead->user, pInst->user, strlen(pInst->user));
pHead->traceId = pReq->info.traceId;
pHead->magicNum = htonl(TRANS_MAGIC_NUM);
pHead->version = TRANS_VER;
pHead->compatibilityVer = htonl(pInst->compatibilityVer);
pHead->seqNum = htonl(pConn->seq++);
}
pHead->timestamp = taosHton64(taosGetTimestampUs());
STraceId* trace = &pReq->info.traceId;
if (pHead->comp == 0 && pReq->info.compressed == 0 && pConn->clientIp != pConn->serverIp) {
if (pInst->compressSize != -1 && pInst->compressSize < pReq->contLen) {
msgLen = transCompressMsg(pReq->pCont, pReq->contLen) + sizeof(STransMsgHead);
pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
}
} else {
msgLen = (int32_t)ntohl((uint32_t)(pHead->msgLen));
}
tGDebug("%s conn %p %s is sent to %s, local info %s, len:%d", CONN_GET_INST_LABEL(pConn), pConn,
TMSG_INFO(pHead->msgType), pConn->dst, pConn->src, msgLen);
uv_buf_t wb = uv_buf_init((char*)pHead, msgLen);
uv_write_t* aReq = transReqQueuePush(&pConn->wreqQueue);
if (aReq == NULL) {
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _exception);
}
pCliReq->sent = 1;
int status = uv_write(aReq, (uv_stream_t*)pConn->stream, &wb, 1, cliSendCb);
if (status != 0) {
tGError("%s conn %p failed to send msg:%s, errmsg:%s", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pReq->msgType),
uv_err_name(status));
TAOS_CHECK_GOTO(TSDB_CODE_THIRDPARTY_ERROR, NULL, _exception);
}
return TSDB_CODE_RPC_ASYNC_IN_PROCESS;
_exception:
return code;
}
static void cliDestroyBatch(SCliBatch* pBatch) {
if (pBatch == NULL) return;
@ -1808,83 +1597,6 @@ _exception2:
// // taosMemoryFree(conn);
return code;
}
static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) {
if (pBatch == NULL || pBatch->wLen == 0 || QUEUE_IS_EMPTY(&pBatch->wq)) {
return;
}
int32_t code = 0;
if (pThrd->quit == true) {
cliDestroyBatch(pBatch);
return;
}
STrans* pInst = pThrd->pInst;
SCliBatchList* pList = pBatch->pList;
bool exceed = false;
SCliConn* conn = getConnFromPool(pThrd, pList->dst, &exceed);
if (conn == NULL && exceed) {
tError("%s failed to send batch msg, batch size:%d, msgLen: %d, conn limit:%d", pInst->label, pBatch->wLen,
pBatch->batchSize, pInst->connLimitNum);
cliDestroyBatch(pBatch);
return;
}
if (conn == NULL) {
code = cliCreateConn(pThrd, &conn, pList->ip, pList->port);
if (code != 0) {
tError("%s failed to send batch msg, batch size:%d, msgLen: %d, conn limit:%d, reason:%s", pInst->label,
pBatch->wLen, pBatch->batchSize, pInst->connLimitNum, tstrerror(code));
cliDestroyBatch(pBatch);
return;
}
conn->pBatch = pBatch;
code = cliDoConn(pThrd, conn);
if (code != 0) {
}
return;
}
conn->pBatch = pBatch;
cliSendBatch(conn);
}
static void cliSendBatchCb(uv_write_t* req, int status) {
STUB_RAND_NETWORK_ERR(status);
SCliConn* conn = req->data;
SCliThrd* thrd = conn->hostThrd;
SCliBatch* p = conn->pBatch;
conn->pBatch = NULL;
SCliBatch* nxtBatch = cliGetHeadFromList(p->pList);
p->pList->connCnt -= 1;
if (status != 0) {
tDebug("%s conn %p failed to send batch msg, batch size:%d, msgLen:%d, reason:%s", CONN_GET_INST_LABEL(conn), conn,
p->wLen, p->batchSize, uv_err_name(status));
if (!uv_is_closing((uv_handle_t*)&conn->stream)) cliHandleExcept(conn, -1);
cliHandleBatchReq(nxtBatch, thrd);
} else {
tDebug("%s conn %p succ to send batch msg, batch size:%d, msgLen:%d", CONN_GET_INST_LABEL(conn), conn, p->wLen,
p->batchSize);
if (!uv_is_closing((uv_handle_t*)&conn->stream) && conn->broken == false) {
if (nxtBatch != NULL) {
conn->pBatch = nxtBatch;
cliSendBatch(conn);
} else {
addConnToPool(thrd->pool, conn);
}
} else {
cliDestroyBatch(nxtBatch);
}
}
cliDestroyBatch(p);
taosMemoryFree(req);
}
static void cliHandleFastFail_resp(SCliConn* pConn, int status) {
SCliThrd* pThrd = pConn->hostThrd;
@ -1963,13 +1675,8 @@ void cliConnCb(uv_connect_t* req, int status) {
cliConnSetSockInfo(pConn);
addConnToHeapCache(pThrd->connHeapCache, pConn);
tTrace("%s conn %p connect to server successfully", CONN_GET_INST_LABEL(pConn), pConn);
if (pConn->pBatch != NULL) {
return cliSendBatch(pConn);
}
if (pConn->inHeap) {
return cliSendBatch_shareConn(pConn);
}
(void)cliSend2(pConn);
}
@ -2236,71 +1943,34 @@ static void doFreeTimeoutMsg(void* param) {
taosMemoryFree(arg);
}
void cliHandleReq__shareConn(SCliThrd* pThrd, SCliReq* pReq) {
int32_t code = 0;
int32_t lino = 0;
STransMsg resp = {0};
code = (pThrd->initCb)(pThrd, pReq, NULL);
TAOS_CHECK_GOTO(code, &lino, _exception);
STraceId* trace = &pReq->msg.info.traceId;
STrans* pInst = pThrd->pInst;
char addr[TSDB_FQDN_LEN + 64] = {0};
char* ip = EPSET_GET_INUSE_IP(&pReq->ctx->epSet);
int32_t port = EPSET_GET_INUSE_PORT(&pReq->ctx->epSet);
CONN_CONSTRUCT_HASH_KEY(addr, ip, port);
SCliConn* pConn = getConnFromHeapCache(pThrd->connHeapCache, addr);
if (pConn == NULL) {
tGTrace("%s conn %p get from heap cache", CONN_GET_INST_LABEL(pConn), pConn);
bool ignore = false;
pConn = getConnFromPool(pThrd, addr, &ignore);
if (pConn != NULL) {
addConnToHeapCache(pThrd->connHeapCache, pConn);
transQueuePush(&pConn->reqs, pReq);
return cliSendBatch_shareConn(pConn);
}
} else {
tGTrace("%s conn %p get from heap cache", CONN_GET_INST_LABEL(pConn), pConn);
transQueuePush(&pConn->reqs, pReq);
cliSendBatch_shareConn(pConn);
return;
}
TAOS_CHECK_GOTO(cliCreateConn(pThrd, &pConn, ip, port), NULL, _exception);
TAOS_CHECK_GOTO(addConnToHeapCache(pThrd->connHeapCache, pConn), NULL, _exception);
transQueuePush(&pConn->reqs, pReq);
code = cliDoConn(pThrd, pConn);
_exception:
resp.code = code;
(void)(pThrd->notifyExceptCb)(pThrd, pReq, &resp);
return;
}
void cliHandleReq__noShareConn(SCliThrd* pThrd, SCliReq* pReq) {
int32_t lino = 0;
STransMsg resp = {0};
int32_t code = (pThrd->initCb)(pThrd, pReq, NULL);
TAOS_CHECK_GOTO(code, &lino, _exception);
char addr[TSDB_FQDN_LEN + 64] = {0};
char* ip = EPSET_GET_INUSE_IP(&pReq->ctx->epSet);
int32_t port = EPSET_GET_INUSE_PORT(&pReq->ctx->epSet);
CONN_CONSTRUCT_HASH_KEY(addr, ip, port);
STrans* pInst = pThrd->pInst;
SCliConn* pConn = NULL;
code = cliGetOrCreateConn(pThrd, pReq, &pConn);
if (code == TSDB_CODE_RPC_MAX_SESSIONS) {
TAOS_CHECK_GOTO(code, &lino, _exception);
} else if (code == TSDB_CODE_RPC_ASYNC_IN_PROCESS) {
// do nothing, notiy
return;
} else {
code = cliSendReq(pConn, pReq);
pConn = getConnFromHeapCache(pThrd->connHeapCache, addr);
if (pConn == NULL) {
code = cliGetOrCreateConn(pThrd, pReq, &pConn);
if (code == TSDB_CODE_RPC_MAX_SESSIONS) {
TAOS_CHECK_GOTO(code, &lino, _exception);
} else if (code == TSDB_CODE_RPC_ASYNC_IN_PROCESS) {
// do nothing, notiy
return;
} else {
ASSERT(code == 0);
addConnToHeapCache(pThrd->connHeapCache, pConn);
}
}
code = cliSendReq(pConn, pReq);
tTrace("%s conn %p ready", pInst->label, pConn);
return;
@ -2311,14 +1981,7 @@ _exception:
return;
}
void cliHandleReq(SCliThrd* pThrd, SCliReq* pReq) {
// STrans* pInst = pThrd->pInst;
// if (pInst->shareConn == 1) {
// return cliHandleReq__shareConn(pThrd, pReq);
// } else {
return cliHandleReq__noShareConn(pThrd, pReq);
//}
}
void cliHandleReq(SCliThrd* pThrd, SCliReq* pReq) { return cliHandleReq__noShareConn(pThrd, pReq); }
static void cliDoReq(queue* wq, SCliThrd* pThrd) {
int count = 0;
@ -2489,23 +2152,23 @@ static void cliDoBatchReq(queue* wq, SCliThrd* pThrd) {
SCliReq* pReq = QUEUE_DATA(h, SCliReq, q);
if (pReq->type == Normal && REQUEST_NO_RESP(&pReq->msg)) {
cliBuildBatch(pReq, h, pThrd);
continue;
}
// if (pReq->type == Normal && REQUEST_NO_RESP(&pReq->msg)) {
// cliBuildBatch(pReq, h, pThrd);
// continue;
// }
(*cliAsyncHandle[pReq->type])(pThrd, pReq);
count++;
}
void** pIter = taosHashIterate(pThrd->batchCache, NULL);
while (pIter != NULL) {
SCliBatchList* batchList = (SCliBatchList*)(*pIter);
SCliBatch* batch = cliGetHeadFromList(batchList);
if (batch != NULL) {
cliHandleBatchReq(batch, pThrd);
}
pIter = (void**)taosHashIterate(pThrd->batchCache, pIter);
}
// void** pIter = taosHashIterate(pThrd->batchCache, NULL);
// while (pIter != NULL) {
// SCliBatchList* batchList = (SCliBatchList*)(*pIter);
// SCliBatch* batch = cliGetHeadFromList(batchList);
// if (batch != NULL) {
// cliHandleBatchReq(batch, pThrd);
// }
// pIter = (void**)taosHashIterate(pThrd->batchCache, pIter);
// }
if (count >= 2) {
tTrace("cli process batch size:%d", count);
@ -2555,6 +2218,20 @@ void cliConnFreeMsgs(SCliConn* conn) {
cmsg->ctx->ahandle = NULL;
}
}
bool cliConnRmReleaseReq(SCliConn* conn, STransMsgHead* pHead) {
if (pHead->release == 1 && (pHead->msgLen) == sizeof(*pHead)) {
for (int i = 0; i < transQueueSize(&conn->reqs); i++) {
SCliReq* pReq = transQueueGet(&conn->reqs, i);
if (pHead->ahandle == (uint64_t)pReq->ctx->ahandle) {
tDebug("%s conn %p receive release request, refId:%" PRId64 "", CONN_GET_INST_LABEL(conn), conn, conn->refId);
transQueueRm(&conn->reqs, i);
return true;
}
}
}
return false;
}
bool cliRecvReleaseReq(SCliConn* conn, STransMsgHead* pHead) {
if (pHead->release == 1 && (pHead->msgLen) == sizeof(*pHead)) {
uint64_t ahandle = pHead->ahandle;
@ -3778,7 +3455,6 @@ int32_t transSetDefaultAddr(void* pInstRef, const char* ip, const char* fqdn) {
pReq->ctx = pCtx;
pReq->type = Update;
// pReq->refId = (int64_t)pInstRef;
SCliThrd* thrd = ((SCliObj*)pInst->tcphandle)->pThreadObj[i];
tDebug("%s update epset at thread:%08" PRId64, pInst->label, thrd->pid);
@ -3898,7 +3574,10 @@ static SCliConn* getConnFromHeapCache(SHashObj* pConnHeapCache, char* key) {
if (code != 0) {
tDebug("failed to get conn from heap cache for key:%s", key);
return NULL;
} else {
tDebug("get conn %p from heap cache for key:%s, status:%d, refCnt:%d", pConn, key, pConn->inHeap, pConn->reqRefCnt);
}
return pConn;
}
static int32_t addConnToHeapCache(SHashObj* pConnHeapCacahe, SCliConn* pConn) {
@ -3908,7 +3587,9 @@ static int32_t addConnToHeapCache(SHashObj* pConnHeapCacahe, SCliConn* pConn) {
if (code != 0) {
return code;
}
return transHeapInsert(p, pConn);
code = transHeapInsert(p, pConn);
tDebug("add conn %p to heap cache for key:%s,status:%d, refCnt:%d", pConn, pConn->dstAddr, pConn->inHeap,
pConn->reqRefCnt);
}
static int32_t delConnFromHeapCache(SHashObj* pConnHeapCache, SCliConn* pConn) {
@ -3961,7 +3642,9 @@ int32_t transHeapGet(SHeap* heap, SCliConn** p) {
}
int32_t transHeapInsert(SHeap* heap, SCliConn* p) {
// impl later
p->reqRefCnt++;
if (p->inHeap == 1) {
tDebug("failed to insert conn %p since already in heap", p);
return TSDB_CODE_DUP_KEY;
}
@ -3972,8 +3655,18 @@ int32_t transHeapInsert(SHeap* heap, SCliConn* p) {
int32_t transHeapDelete(SHeap* heap, SCliConn* p) {
// impl later
if (p->inHeap == 0) {
tDebug("failed to del conn %p since not in heap", p);
return TSDB_CODE_INVALID_PARA;
}
heapRemove(heap->heap, &p->node);
p->inHeap = 0;
p->reqRefCnt--;
if (p->reqRefCnt == 0) {
heapRemove(heap->heap, &p->node);
tDebug("delete conn %p delete from heap", p);
} else if (p->reqRefCnt < 0) {
tDebug("conn %p has %d reqs, not delete from heap,assert", p, p->reqRefCnt);
} else {
tDebug("conn %p has %d reqs, not delete from heap", p, p->reqRefCnt);
}
return 0;
}

View File

@ -662,21 +662,22 @@ static int uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) {
return TSDB_CODE_INVALID_MSG;
}
if (pConn->status == ConnNormal) {
pHead->msgType = (0 == pMsg->msgType ? pConn->inType + 1 : pMsg->msgType);
if (smsg->type == Release) pHead->msgType = 0;
} else {
if (smsg->type == Release) {
pHead->msgType = 0;
pConn->status = ConnNormal;
destroyConnRegArg(pConn);
transUnrefSrvHandle(pConn);
} else {
// set up resp msg type
pHead->msgType = (0 == pMsg->msgType ? pConn->inType + 1 : pMsg->msgType);
}
}
// if (pConn->status == ConnNormal) {
// pHead->msgType = (0 == pMsg->msgType ? pConn->inType + 1 : pMsg->msgType);
// if (smsg->type == Release) pHead->msgType = 0;
// } else {
// if (smsg->type == Release) {
// pHead->msgType = 0;
// pConn->status = ConnNormal;
// destroyConnRegArg(pConn);
// transUnrefSrvHandle(pConn);
// } else {
// // set up resp msg type
// pHead->msgType = (0 == pMsg->msgType ? pConn->inType + 1 : pMsg->msgType);
// }
// }
pHead->msgType = (0 == pMsg->msgType ? pConn->inType + 1 : pMsg->msgType);
pHead->release = smsg->type == Release ? 1 : 0;
pHead->code = htonl(pMsg->code);
pHead->msgLen = htonl(pMsg->contLen + sizeof(STransMsgHead));
@ -835,13 +836,13 @@ static void uvShutDownCb(uv_shutdown_t* req, int status) {
}
static bool uvRecvReleaseReq(SSvrConn* pConn, STransMsgHead* pHead) {
if ((pHead)->release == 1 && (pHead->msgLen) == sizeof(*pHead)) {
int32_t code = reallocConnRef(pConn);
if (code != 0) {
destroyConn(pConn, true);
return true;
}
tTrace("conn %p received release request", pConn);
// int32_t code = reallocConnRef(pConn);
// if (code != 0) {
// destroyConn(pConn, true);
// return true;
// }
tTrace("conn %p received release request", pConn);
STraceId traceId = pHead->traceId;
(void)transClearBuffer(&pConn->readBuf);
transFreeMsg(transContFromHead((char*)pHead));
@ -850,7 +851,11 @@ static bool uvRecvReleaseReq(SSvrConn* pConn, STransMsgHead* pHead) {
}
pConn->status = ConnRelease;
STransMsg tmsg = {.code = 0, .info.handle = (void*)pConn, .info.traceId = traceId, .info.ahandle = (void*)0x9527};
STransMsg tmsg = {.code = 0,
.info.handle = (void*)pConn,
.info.traceId = traceId,
.info.ahandle = (void*)0x9527,
.info.seqNum = htonl(pHead->seqNum)};
SSvrMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSvrMsg));
srvMsg->msg = tmsg;
srvMsg->type = Release;
@ -1590,11 +1595,6 @@ void uvHandleRelease(SSvrMsg* msg, SWorkThrd* thrd) {
int32_t code = 0;
SSvrConn* conn = msg->pConn;
if (conn->status == ConnAcquire) {
code = reallocConnRef(conn);
if (code != 0) {
destroyConn(conn, true);
return;
}
if (!transQueuePush(&conn->srvMsgs, msg)) {
return;
}