Merge remote-tracking branch 'origin/3.0' into enh/opt-transport
This commit is contained in:
parent
08f962a1d6
commit
e7510e8cb3
|
@ -615,7 +615,7 @@ void cliHandleResp(SCliConn* conn) {
|
|||
}
|
||||
if (code != 0) {
|
||||
tWarn("%s conn %p recv unexpected packet, msgType:%s, seqNum:%" PRId64 ", qId:%" PRId64
|
||||
", the sever may sends repeated response,reason:%s",
|
||||
", the sever may sends repeated response since %s",
|
||||
CONN_GET_INST_LABEL(conn), conn, TMSG_INFO(pHead->msgType), seq, qId, tstrerror(code));
|
||||
// TODO: notify cb
|
||||
taosMemoryFree(pHead);
|
||||
|
@ -1195,7 +1195,7 @@ static void cliBatchSendCb(uv_write_t* req, int status) {
|
|||
}
|
||||
cliConnRmReqs(conn);
|
||||
if (status != 0) {
|
||||
tDebug("%s conn %p failed to send msg, reason:%s", CONN_GET_INST_LABEL(conn), conn, uv_err_name(status));
|
||||
tDebug("%s conn %p failed to send msg since %s", CONN_GET_INST_LABEL(conn), conn, uv_err_name(status));
|
||||
TAOS_UNUSED(transUnrefCliHandle(conn));
|
||||
return;
|
||||
}
|
||||
|
@ -1214,7 +1214,7 @@ static void cliBatchSendCb(uv_write_t* req, int status) {
|
|||
if (!cliMayRecycleConn(conn)) {
|
||||
code = cliBatchSend(conn);
|
||||
if (code != 0) {
|
||||
tDebug("%s conn %p failed to send msg, reason:%s", CONN_GET_INST_LABEL(conn), conn, tstrerror(code));
|
||||
tDebug("%s conn %p failed to send msg since %s", CONN_GET_INST_LABEL(conn), conn, tstrerror(code));
|
||||
TAOS_UNUSED(transUnrefCliHandle(conn));
|
||||
}
|
||||
}
|
||||
|
@ -1335,7 +1335,7 @@ int32_t cliBatchSend(SCliConn* pConn) {
|
|||
tDebug("%s conn %p start to send msg, batch size:%d, len:%d", CONN_GET_INST_LABEL(pConn), pConn, size, totalLen);
|
||||
int32_t ret = uv_write(req, (uv_stream_t*)pConn->stream, wb, j, cliBatchSendCb);
|
||||
if (ret != 0) {
|
||||
tError("%s conn %p failed to send msg, reason:%s", CONN_GET_INST_LABEL(pConn), pConn, uv_err_name(ret));
|
||||
tError("%s conn %p failed to send msg since %s", CONN_GET_INST_LABEL(pConn), pConn, uv_err_name(ret));
|
||||
freeWReqToWQ(&pConn->wq, req->data);
|
||||
code = TSDB_CODE_THIRDPARTY_ERROR;
|
||||
TAOS_UNUSED(transUnrefCliHandle(pConn));
|
||||
|
@ -1387,12 +1387,12 @@ static int32_t cliDoConn(SCliThrd* pThrd, SCliConn* conn) {
|
|||
|
||||
int ret = uv_tcp_open((uv_tcp_t*)conn->stream, fd);
|
||||
if (ret != 0) {
|
||||
tError("%s conn %p failed to set stream, reason:%s", transLabel(pInst), conn, uv_err_name(ret));
|
||||
tError("%s conn %p failed to set stream since %s", transLabel(pInst), conn, uv_err_name(ret));
|
||||
TAOS_CHECK_GOTO(TSDB_CODE_THIRDPARTY_ERROR, &lino, _exception1);
|
||||
}
|
||||
ret = transSetConnOption((uv_tcp_t*)conn->stream, 20);
|
||||
if (ret != 0) {
|
||||
tError("%s conn %p failed to set socket opt, reason:%s", transLabel(pInst), conn, uv_err_name(ret));
|
||||
tError("%s conn %p failed to set socket opt since %s", transLabel(pInst), conn, uv_err_name(ret));
|
||||
TAOS_CHECK_GOTO(TSDB_CODE_THIRDPARTY_ERROR, &lino, _exception1);
|
||||
return code;
|
||||
}
|
||||
|
@ -1400,7 +1400,7 @@ static int32_t cliDoConn(SCliThrd* pThrd, SCliConn* conn) {
|
|||
transRefCliHandle(conn);
|
||||
ret = uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb);
|
||||
if (ret != 0) {
|
||||
tError("failed connect to %s, reason:%s", conn->dstAddr, uv_err_name(ret));
|
||||
tError("failed connect to %s since %s", conn->dstAddr, uv_err_name(ret));
|
||||
TAOS_CHECK_GOTO(TSDB_CODE_THIRDPARTY_ERROR, &lino, _exception1);
|
||||
}
|
||||
|
||||
|
@ -1408,19 +1408,19 @@ static int32_t cliDoConn(SCliThrd* pThrd, SCliConn* conn) {
|
|||
transRefCliHandle(conn);
|
||||
ret = uv_timer_start(conn->timer, cliConnTimeout, TRANS_CONN_TIMEOUT, 0);
|
||||
if (ret != 0) {
|
||||
tError("%s conn %p failed to start timer, reason:%s", transLabel(pInst), conn, uv_err_name(ret));
|
||||
tError("%s conn %p failed to start timer since %s", transLabel(pInst), conn, uv_err_name(ret));
|
||||
TAOS_CHECK_GOTO(TSDB_CODE_THIRDPARTY_ERROR, &lino, _exception2);
|
||||
}
|
||||
return TSDB_CODE_RPC_ASYNC_IN_PROCESS;
|
||||
|
||||
_exception1:
|
||||
tError("%s conn %p failed to do connect, reason:%s", transLabel(pInst), conn, tstrerror(code));
|
||||
tError("%s conn %p failed to do connect since %s", transLabel(pInst), conn, tstrerror(code));
|
||||
cliDestroyConn(conn, true);
|
||||
return code;
|
||||
|
||||
_exception2:
|
||||
TAOS_UNUSED(transUnrefCliHandle(conn));
|
||||
tError("%s conn %p failed to do connect, reason:%s", transLabel(pInst), conn, tstrerror(code));
|
||||
tError("%s conn %p failed to do connect since %s", transLabel(pInst), conn, tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -1490,7 +1490,7 @@ void cliConnCb(uv_connect_t* req, int status) {
|
|||
STUB_RAND_NETWORK_ERR(status);
|
||||
|
||||
if (status != 0) {
|
||||
tDebug("%s conn %p failed to connect to %s, reason:%s", CONN_GET_INST_LABEL(pConn), pConn, pConn->dstAddr,
|
||||
tDebug("%s conn %p failed to connect to %s since %s", CONN_GET_INST_LABEL(pConn), pConn, pConn->dstAddr,
|
||||
uv_strerror(status));
|
||||
TAOS_UNUSED(transUnrefCliHandle(pConn));
|
||||
return;
|
||||
|
@ -1806,7 +1806,7 @@ void cliHandleBatchReq(SCliThrd* pThrd, SCliReq* pReq) {
|
|||
_exception:
|
||||
resp.code = code;
|
||||
STraceId* trace = &pReq->msg.info.traceId;
|
||||
tGWarn("%s failed to process req, reason:%s", pInst->label, tstrerror(code));
|
||||
tGWarn("%s failed to process req since %s", pInst->label, tstrerror(code));
|
||||
|
||||
code = (pThrd->notifyExceptCb)(pThrd, pReq, &resp);
|
||||
if (code != 0) {
|
||||
|
@ -1917,7 +1917,7 @@ static void cliBuildBatch(SCliReq* pReq, queue* h, SCliThrd* pThrd) {
|
|||
static int32_t createBatchList(SCliBatchList** ppBatchList, char* key, char* ip, uint32_t port) {
|
||||
SCliBatchList* pBatchList = taosMemoryCalloc(1, sizeof(SCliBatchList));
|
||||
if (pBatchList == NULL) {
|
||||
tError("failed to create batch list, reason:%s", tstrerror(TSDB_CODE_OUT_OF_MEMORY));
|
||||
tError("failed to create batch list since %s", tstrerror(TSDB_CODE_OUT_OF_MEMORY));
|
||||
return terrno;
|
||||
}
|
||||
QUEUE_INIT(&pBatchList->wq);
|
||||
|
@ -1933,7 +1933,7 @@ static int32_t createBatchList(SCliBatchList** ppBatchList, char* key, char* ip,
|
|||
taosMemoryFree(pBatchList->ip);
|
||||
taosMemoryFree(pBatchList->dst);
|
||||
taosMemoryFree(pBatchList);
|
||||
tError("failed to create batch list, reason:%s", tstrerror(TSDB_CODE_OUT_OF_MEMORY));
|
||||
tError("failed to create batch list since %s", tstrerror(TSDB_CODE_OUT_OF_MEMORY));
|
||||
return terrno;
|
||||
}
|
||||
*ppBatchList = pBatchList;
|
||||
|
@ -1957,7 +1957,7 @@ static void destroyBatchList(SCliBatchList* pList) {
|
|||
static int32_t createBatch(SCliBatch** ppBatch, SCliBatchList* pList, SCliReq* pReq) {
|
||||
SCliBatch* pBatch = taosMemoryCalloc(1, sizeof(SCliBatch));
|
||||
if (pBatch == NULL) {
|
||||
tError("failed to create batch, reason:%s", tstrerror(TSDB_CODE_OUT_OF_MEMORY));
|
||||
tError("failed to create batch since %s", tstrerror(TSDB_CODE_OUT_OF_MEMORY));
|
||||
return terrno;
|
||||
}
|
||||
|
||||
|
@ -1985,13 +1985,13 @@ static void cliAsyncCb(uv_async_t* handle) {
|
|||
// batch process to avoid to lock/unlock frequently
|
||||
queue wq;
|
||||
if (taosThreadMutexLock(&item->mtx) != 0) {
|
||||
tError("failed to lock mutex, reason:%s", tstrerror(terrno));
|
||||
tError("failed to lock mutex since %s", tstrerror(terrno));
|
||||
}
|
||||
|
||||
QUEUE_MOVE(&item->qmsg, &wq);
|
||||
|
||||
if (taosThreadMutexUnlock(&item->mtx) != 0) {
|
||||
tError("failed to unlock mutex, reason:%s", tstrerror(terrno));
|
||||
tError("failed to unlock mutex since %s", tstrerror(terrno));
|
||||
}
|
||||
|
||||
cliDealFunc[pInst->supportBatch](&wq, pThrd);
|
||||
|
@ -2146,7 +2146,7 @@ static int32_t createThrdObj(void* trans, SCliThrd** ppThrd) {
|
|||
|
||||
code = uv_loop_init(pThrd->loop);
|
||||
if (code != 0) {
|
||||
tError("failed to init uv_loop, reason:%s", uv_err_name(code));
|
||||
tError("failed to init uv_loop since %s", uv_err_name(code));
|
||||
TAOS_CHECK_GOTO(TSDB_CODE_THIRDPARTY_ERROR, NULL, _end);
|
||||
}
|
||||
|
||||
|
@ -2264,7 +2264,7 @@ static void destroyThrdObj(SCliThrd* pThrd) {
|
|||
}
|
||||
|
||||
if (taosThreadJoin(pThrd->thread, NULL) != 0) {
|
||||
tTrace("failed to join thread, reason:%s", tstrerror(terrno));
|
||||
tTrace("failed to join thread since %s", tstrerror(terrno));
|
||||
}
|
||||
|
||||
CLI_RELEASE_UV(pThrd->loop);
|
||||
|
@ -2591,7 +2591,7 @@ void cliRetryUpdateRule(SReqCtx* pCtx, int8_t noDelay) {
|
|||
int32_t cliRetryDoSched(SCliReq* pReq, SCliThrd* pThrd) {
|
||||
int32_t code = cliSchedMsgToNextNode(pReq, pThrd);
|
||||
if (code != 0) {
|
||||
tError("failed to sched msg to next node, reason:%s", tstrerror(code));
|
||||
tError("failed to sched msg to next node since %s", tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
return 0;
|
||||
|
@ -2657,7 +2657,7 @@ bool cliMayRetry(SCliConn* pConn, SCliReq* pReq, STransMsg* pResp) {
|
|||
code = cliRetryDoSched(pReq, pThrd);
|
||||
if (code != 0) {
|
||||
pResp->code = code;
|
||||
tError("failed to sched msg to next node, reason:%s", tstrerror(code));
|
||||
tError("failed to sched msg to next node since %s", tstrerror(code));
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
|
@ -2756,7 +2756,7 @@ void transCloseClient(void* arg) {
|
|||
for (int i = 0; i < cli->numOfThreads; i++) {
|
||||
code = cliSendQuit(cli->pThreadObj[i]);
|
||||
if (code != 0) {
|
||||
tError("failed to send quit to thread:%d, reason:%s", i, tstrerror(code));
|
||||
tError("failed to send quit to thread:%d since %s", i, tstrerror(code));
|
||||
}
|
||||
|
||||
destroyThrdObj(cli->pThreadObj[i]);
|
||||
|
@ -3317,14 +3317,14 @@ static int32_t getOrCreateHeap(SHashObj* pConnHeapCache, char* key, SHeap** pHea
|
|||
SHeap heap = {0};
|
||||
code = transHeapInit(&heap, compareHeapNode);
|
||||
if (code != 0) {
|
||||
tError("failed to init heap cache for key:%s, reason: %s", key, tstrerror(code));
|
||||
tError("failed to init heap cache for key:%s since %s", key, tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
code = taosHashPut(pConnHeapCache, key, klen, &heap, sizeof(heap));
|
||||
if (code != 0) {
|
||||
transHeapDestroy(&heap);
|
||||
tError("failed to put heap to cache for key:%s, reason: %s", key, tstrerror(code));
|
||||
tError("failed to put heap to cache for key:%s since %s", key, tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
p = taosHashGet(pConnHeapCache, key, klen);
|
||||
|
|
|
@ -184,7 +184,7 @@ int32_t transResetBuffer(SConnBuffer* connBuf, int8_t resetBuf) {
|
|||
}
|
||||
}
|
||||
} else {
|
||||
tError("failed to reset buffer, total:%d, len:%d, reason:%s", p->total, p->len, tstrerror(TSDB_CODE_INVALID_MSG));
|
||||
tError("failed to reset buffer, total:%d, len:%d since %s", p->total, p->len, tstrerror(TSDB_CODE_INVALID_MSG));
|
||||
return TSDB_CODE_INVALID_MSG;
|
||||
}
|
||||
return 0;
|
||||
|
@ -281,7 +281,7 @@ int32_t transAsyncPoolCreate(uv_loop_t* loop, int sz, void* arg, AsyncCB cb, SAs
|
|||
async->data = item;
|
||||
err = uv_async_init(loop, async, cb);
|
||||
if (err != 0) {
|
||||
tError("failed to init async, reason:%s", uv_err_name(err));
|
||||
tError("failed to init async since %s", uv_err_name(err));
|
||||
code = TSDB_CODE_THIRDPARTY_ERROR;
|
||||
break;
|
||||
}
|
||||
|
@ -342,7 +342,7 @@ int transAsyncSend(SAsyncPool* pool, queue* q) {
|
|||
|
||||
int ret = uv_async_send(async);
|
||||
if (ret != 0) {
|
||||
tError("failed to send async,reason:%s", uv_err_name(ret));
|
||||
tError("failed to send async since %s", uv_err_name(ret));
|
||||
return TSDB_CODE_THIRDPARTY_ERROR;
|
||||
}
|
||||
return 0;
|
||||
|
@ -389,7 +389,7 @@ void transCtxMerge(STransCtx* dst, STransCtx* src) {
|
|||
|
||||
int32_t code = taosHashPut(dst->args, key, klen, sVal, sizeof(*sVal));
|
||||
if (code != 0) {
|
||||
tError("failed to put val to hash, reason:%s", tstrerror(code));
|
||||
tError("failed to put val to hash since %s", tstrerror(code));
|
||||
}
|
||||
iter = taosHashIterate(src->args, iter);
|
||||
}
|
||||
|
@ -821,7 +821,7 @@ int32_t transUtilSIpRangeToStr(SIpV4Range* pRange, char* buf) {
|
|||
|
||||
int32_t err = uv_inet_ntop(AF_INET, &addr, buf, 32);
|
||||
if (err != 0) {
|
||||
tError("failed to convert ip to string, reason:%s", uv_strerror(err));
|
||||
tError("failed to convert ip to string since %s", uv_strerror(err));
|
||||
return TSDB_CODE_THIRDPARTY_ERROR;
|
||||
}
|
||||
|
||||
|
|
|
@ -690,8 +690,8 @@ void uvOnSendCb(uv_write_t* req, int status) {
|
|||
|
||||
SSvrRespMsg* smsg = QUEUE_DATA(head, SSvrRespMsg, q);
|
||||
STraceId* trace = &smsg->msg.info.traceId;
|
||||
tGDebug("%s conn %p failed to send, seqNum:%" PRId64 ", qid:%" PRId64 ", reason:%s", transLabel(conn->pInst),
|
||||
conn, smsg->msg.info.seqNum, smsg->msg.info.qId, uv_err_name(status));
|
||||
tGDebug("%s conn %p failed to send, seqNum:%" PRId64 ", qid:%" PRId64 " since %s", transLabel(conn->pInst), conn,
|
||||
smsg->msg.info.seqNum, smsg->msg.info.qId, uv_err_name(status));
|
||||
destroySmsg(smsg);
|
||||
}
|
||||
|
||||
|
@ -705,7 +705,7 @@ static void uvOnPipeWriteCb(uv_write_t* req, int status) {
|
|||
if (status == 0) {
|
||||
tTrace("success to dispatch conn to work thread");
|
||||
} else {
|
||||
tError("fail to dispatch conn to work thread, reason:%s", uv_strerror(status));
|
||||
tError("fail to dispatch conn to work thread since %s", uv_strerror(status));
|
||||
}
|
||||
if (!uv_is_closing((uv_handle_t*)req->data)) {
|
||||
uv_close((uv_handle_t*)req->data, uvFreeCb);
|
||||
|
@ -844,7 +844,7 @@ static FORCE_INLINE void uvStartSendRespImpl(SSvrRespMsg* smsg) {
|
|||
|
||||
int32_t ret = uv_write(req, (uv_stream_t*)pConn->pTcp, pBuf, bufNum, uvOnSendCb);
|
||||
if (ret != 0) {
|
||||
tError("conn %p failed to write data, reason:%s", pConn, uv_err_name(ret));
|
||||
tError("conn %p failed to write data since %s", pConn, uv_err_name(ret));
|
||||
pConn->broken = true;
|
||||
while (!QUEUE_IS_EMPTY(&pWreq->node)) {
|
||||
queue* head = QUEUE_HEAD(&pWreq->node);
|
||||
|
@ -1063,7 +1063,7 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
|
|||
tError("read error %s", uv_err_name(nread));
|
||||
}
|
||||
// TODO(log other failure reason)
|
||||
tWarn("failed to create connect:%p, reason: %s", q, uv_err_name(nread));
|
||||
tWarn("failed to create connect:%p since %s", q, uv_err_name(nread));
|
||||
taosMemoryFree(buf->base);
|
||||
uv_close((uv_handle_t*)q, NULL);
|
||||
return;
|
||||
|
@ -1504,7 +1504,7 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
|
|||
|
||||
if (false == taosValidIpAndPort(srv->ip, srv->port)) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
tError("invalid ip/port, %d:%d, reason:%s", srv->ip, srv->port, terrstr());
|
||||
tError("invalid ip/port, %d:%d since %s", srv->ip, srv->port, terrstr());
|
||||
goto End;
|
||||
}
|
||||
char pipeName[PATH_MAX];
|
||||
|
|
Loading…
Reference in New Issue