commit
9e8c63af31
|
@ -702,6 +702,7 @@ int32_t* taosGetErrno();
|
||||||
//index
|
//index
|
||||||
#define TSDB_CODE_INDEX_REBUILDING TAOS_DEF_ERROR_CODE(0, 0x3200)
|
#define TSDB_CODE_INDEX_REBUILDING TAOS_DEF_ERROR_CODE(0, 0x3200)
|
||||||
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -41,7 +41,7 @@ typedef struct SCliConn {
|
||||||
|
|
||||||
// debug and log info
|
// debug and log info
|
||||||
struct sockaddr_in addr;
|
struct sockaddr_in addr;
|
||||||
struct sockaddr_in locaddr;
|
struct sockaddr_in localAddr;
|
||||||
} SCliConn;
|
} SCliConn;
|
||||||
|
|
||||||
typedef struct SCliMsg {
|
typedef struct SCliMsg {
|
||||||
|
@ -54,7 +54,8 @@ typedef struct SCliMsg {
|
||||||
} SCliMsg;
|
} SCliMsg;
|
||||||
|
|
||||||
typedef struct SCliThrdObj {
|
typedef struct SCliThrdObj {
|
||||||
TdThread thread;
|
TdThread thread; // tid
|
||||||
|
int64_t pid; // pid
|
||||||
uv_loop_t* loop;
|
uv_loop_t* loop;
|
||||||
SAsyncPool* asyncPool;
|
SAsyncPool* asyncPool;
|
||||||
uv_timer_t timer;
|
uv_timer_t timer;
|
||||||
|
@ -325,7 +326,7 @@ void cliHandleResp(SCliConn* conn) {
|
||||||
|
|
||||||
tDebug("%s cli conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pTransInst->label, conn,
|
tDebug("%s cli conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pTransInst->label, conn,
|
||||||
TMSG_INFO(pHead->msgType), taosInetNtoa(conn->addr.sin_addr), ntohs(conn->addr.sin_port),
|
TMSG_INFO(pHead->msgType), taosInetNtoa(conn->addr.sin_addr), ntohs(conn->addr.sin_port),
|
||||||
taosInetNtoa(conn->locaddr.sin_addr), ntohs(conn->locaddr.sin_port), transMsg.contLen);
|
taosInetNtoa(conn->localAddr.sin_addr), ntohs(conn->localAddr.sin_port), transMsg.contLen);
|
||||||
|
|
||||||
if (pCtx == NULL && CONN_NO_PERSIST_BY_APP(conn)) {
|
if (pCtx == NULL && CONN_NO_PERSIST_BY_APP(conn)) {
|
||||||
tTrace("except, server continue send while cli ignore it");
|
tTrace("except, server continue send while cli ignore it");
|
||||||
|
@ -643,7 +644,7 @@ void cliSend(SCliConn* pConn) {
|
||||||
uv_buf_t wb = uv_buf_init((char*)pHead, msgLen);
|
uv_buf_t wb = uv_buf_init((char*)pHead, msgLen);
|
||||||
tDebug("%s cli conn %p %s is send to %s:%d, local info %s:%d", CONN_GET_INST_LABEL(pConn), pConn,
|
tDebug("%s cli conn %p %s is send to %s:%d, local info %s:%d", CONN_GET_INST_LABEL(pConn), pConn,
|
||||||
TMSG_INFO(pHead->msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port),
|
TMSG_INFO(pHead->msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port),
|
||||||
taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port));
|
taosInetNtoa(pConn->localAddr.sin_addr), ntohs(pConn->localAddr.sin_port));
|
||||||
|
|
||||||
if (pHead->persist == 1) {
|
if (pHead->persist == 1) {
|
||||||
CONN_SET_PERSIST_BY_APP(pConn);
|
CONN_SET_PERSIST_BY_APP(pConn);
|
||||||
|
@ -668,8 +669,8 @@ void cliConnCb(uv_connect_t* req, int status) {
|
||||||
int addrlen = sizeof(pConn->addr);
|
int addrlen = sizeof(pConn->addr);
|
||||||
uv_tcp_getpeername((uv_tcp_t*)pConn->stream, (struct sockaddr*)&pConn->addr, &addrlen);
|
uv_tcp_getpeername((uv_tcp_t*)pConn->stream, (struct sockaddr*)&pConn->addr, &addrlen);
|
||||||
|
|
||||||
addrlen = sizeof(pConn->locaddr);
|
addrlen = sizeof(pConn->localAddr);
|
||||||
uv_tcp_getsockname((uv_tcp_t*)pConn->stream, (struct sockaddr*)&pConn->locaddr, &addrlen);
|
uv_tcp_getsockname((uv_tcp_t*)pConn->stream, (struct sockaddr*)&pConn->localAddr, &addrlen);
|
||||||
|
|
||||||
tTrace("%s cli conn %p connect to server successfully", CONN_GET_INST_LABEL(pConn), pConn);
|
tTrace("%s cli conn %p connect to server successfully", CONN_GET_INST_LABEL(pConn), pConn);
|
||||||
assert(pConn->stream == req->handle);
|
assert(pConn->stream == req->handle);
|
||||||
|
@ -742,8 +743,7 @@ void cliMayCvtFqdnToIp(SEpSet* pEpSet, SCvtAddr* pCvtAddr) {
|
||||||
void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
|
void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
|
||||||
uint64_t et = taosGetTimestampUs();
|
uint64_t et = taosGetTimestampUs();
|
||||||
uint64_t el = et - pMsg->st;
|
uint64_t el = et - pMsg->st;
|
||||||
tTrace("%s cli msg tran time cost: %" PRIu64 "us, threadID: %" PRId64 "", ((STrans*)pThrd->pTransInst)->label, el,
|
// tTrace("%s cli msg tran time cost: %" PRIu64 "us", ((STrans*)pThrd->pTransInst)->label, el);
|
||||||
pThrd->thread);
|
|
||||||
|
|
||||||
STransConnCtx* pCtx = pMsg->ctx;
|
STransConnCtx* pCtx = pMsg->ctx;
|
||||||
STrans* pTransInst = pThrd->pTransInst;
|
STrans* pTransInst = pThrd->pTransInst;
|
||||||
|
@ -822,6 +822,7 @@ static void cliAsyncCb(uv_async_t* handle) {
|
||||||
|
|
||||||
static void* cliWorkThread(void* arg) {
|
static void* cliWorkThread(void* arg) {
|
||||||
SCliThrdObj* pThrd = (SCliThrdObj*)arg;
|
SCliThrdObj* pThrd = (SCliThrdObj*)arg;
|
||||||
|
pThrd->pid = taosGetSelfPthreadId();
|
||||||
setThreadName("trans-cli-work");
|
setThreadName("trans-cli-work");
|
||||||
uv_run(pThrd->loop, UV_RUN_DEFAULT);
|
uv_run(pThrd->loop, UV_RUN_DEFAULT);
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -1089,7 +1090,7 @@ void transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STra
|
||||||
|
|
||||||
SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[index];
|
SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[index];
|
||||||
|
|
||||||
tDebug("send request at thread:%d, threadID: %" PRId64 ", msg: %p, dst: %s:%d, app:%p", index, thrd->thread, pReq,
|
tDebug("send request at thread:%d, threadID: %08" PRId64 ", msg: %p, dst: %s:%d, app:%p", index, thrd->pid, pReq,
|
||||||
EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle);
|
EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle);
|
||||||
ASSERT(transSendAsync(thrd->asyncPool, &(cliMsg->q)) == 0);
|
ASSERT(transSendAsync(thrd->asyncPool, &(cliMsg->q)) == 0);
|
||||||
}
|
}
|
||||||
|
@ -1118,7 +1119,7 @@ void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransM
|
||||||
cliMsg->type = Normal;
|
cliMsg->type = Normal;
|
||||||
|
|
||||||
SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[index];
|
SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[index];
|
||||||
tDebug("send request at thread:%d, threadID:%" PRId64 ", msg: %p, dst: %s:%d, app:%p", index, thrd->thread, pReq,
|
tDebug("send request at thread:%d, threadID:%08" PRId64 ", msg: %p, dst: %s:%d, app:%p", index, thrd->pid, pReq,
|
||||||
EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle);
|
EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle);
|
||||||
|
|
||||||
transSendAsync(thrd->asyncPool, &(cliMsg->q));
|
transSendAsync(thrd->asyncPool, &(cliMsg->q));
|
||||||
|
@ -1149,7 +1150,7 @@ void transSetDefaultAddr(void* ahandle, const char* ip, const char* fqdn) {
|
||||||
cliMsg->type = Update;
|
cliMsg->type = Update;
|
||||||
|
|
||||||
SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[i];
|
SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[i];
|
||||||
tDebug("update epset at thread:%d, threadID:%" PRId64 "", i, thrd->thread);
|
tDebug("update epset at thread:%d, threadID:%08" PRId64 "", i, thrd->pid);
|
||||||
|
|
||||||
transSendAsync(thrd->asyncPool, &(cliMsg->q));
|
transSendAsync(thrd->asyncPool, &(cliMsg->q));
|
||||||
}
|
}
|
||||||
|
|
|
@ -48,7 +48,7 @@ typedef struct SSvrConn {
|
||||||
|
|
||||||
ConnStatus status;
|
ConnStatus status;
|
||||||
struct sockaddr_in addr;
|
struct sockaddr_in addr;
|
||||||
struct sockaddr_in locaddr;
|
struct sockaddr_in localAddr;
|
||||||
|
|
||||||
int64_t refId;
|
int64_t refId;
|
||||||
int spi;
|
int spi;
|
||||||
|
@ -286,12 +286,12 @@ static void uvHandleReq(SSvrConn* pConn) {
|
||||||
if (pConn->status == ConnNormal && pHead->noResp == 0) {
|
if (pConn->status == ConnNormal && pHead->noResp == 0) {
|
||||||
transRefSrvHandle(pConn);
|
transRefSrvHandle(pConn);
|
||||||
tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pConn, TMSG_INFO(transMsg.msgType),
|
tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pConn, TMSG_INFO(transMsg.msgType),
|
||||||
taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr),
|
taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->localAddr.sin_addr),
|
||||||
ntohs(pConn->locaddr.sin_port), transMsg.contLen);
|
ntohs(pConn->localAddr.sin_port), transMsg.contLen);
|
||||||
} else {
|
} else {
|
||||||
tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d, resp:%d ", pConn,
|
tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d, resp:%d ", pConn,
|
||||||
TMSG_INFO(transMsg.msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port),
|
TMSG_INFO(transMsg.msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port),
|
||||||
taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port), transMsg.contLen, pHead->noResp);
|
taosInetNtoa(pConn->localAddr.sin_addr), ntohs(pConn->localAddr.sin_port), transMsg.contLen, pHead->noResp);
|
||||||
// no ref here
|
// no ref here
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -454,8 +454,8 @@ static void uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) {
|
||||||
char* msg = (char*)pHead;
|
char* msg = (char*)pHead;
|
||||||
int32_t len = transMsgLenFromCont(pMsg->contLen);
|
int32_t len = transMsgLenFromCont(pMsg->contLen);
|
||||||
tDebug("server conn %p %s is sent to %s:%d, local info: %s:%d, msglen:%d", pConn, TMSG_INFO(pHead->msgType),
|
tDebug("server conn %p %s is sent to %s:%d, local info: %s:%d, msglen:%d", pConn, TMSG_INFO(pHead->msgType),
|
||||||
taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr),
|
taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->localAddr.sin_addr),
|
||||||
ntohs(pConn->locaddr.sin_port), len);
|
ntohs(pConn->localAddr.sin_port), len);
|
||||||
pHead->msgLen = htonl(len);
|
pHead->msgLen = htonl(len);
|
||||||
|
|
||||||
wb->base = msg;
|
wb->base = msg;
|
||||||
|
@ -686,8 +686,8 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
addrlen = sizeof(pConn->locaddr);
|
addrlen = sizeof(pConn->localAddr);
|
||||||
if (0 != uv_tcp_getsockname(pConn->pTcp, (struct sockaddr*)&pConn->locaddr, &addrlen)) {
|
if (0 != uv_tcp_getsockname(pConn->pTcp, (struct sockaddr*)&pConn->localAddr, &addrlen)) {
|
||||||
tError("server conn %p failed to get local info", pConn);
|
tError("server conn %p failed to get local info", pConn);
|
||||||
transUnrefSrvHandle(pConn);
|
transUnrefSrvHandle(pConn);
|
||||||
return;
|
return;
|
||||||
|
|
|
@ -568,6 +568,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TSMA_NO_INDEX_IN_CACHE, "No tsma index in ca
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_INVALID_ENV, "Invalid rsma env")
|
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_INVALID_ENV, "Invalid rsma env")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_INVALID_STAT, "Invalid rsma state")
|
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_INVALID_STAT, "Invalid rsma state")
|
||||||
|
|
||||||
|
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_INDEX_REBUILDING, "Index is rebuilding")
|
TAOS_DEFINE_ERROR(TSDB_CODE_INDEX_REBUILDING, "Index is rebuilding")
|
||||||
|
|
||||||
#ifdef TAOS_ERROR_C
|
#ifdef TAOS_ERROR_C
|
||||||
|
|
Loading…
Reference in New Issue