refactor transport

This commit is contained in:
Yihao Deng 2024-07-22 06:50:01 +00:00
parent 9936e965cf
commit ee7b67e018
7 changed files with 397 additions and 148 deletions

View File

@ -90,6 +90,8 @@ int32_t taosGetErrSize();
#define TSDB_CODE_RPC_NETWORK_ERROR TAOS_DEF_ERROR_CODE(0, 0x0023)
#define TSDB_CODE_RPC_NETWORK_BUSY TAOS_DEF_ERROR_CODE(0, 0x0024)
#define TSDB_CODE_HTTP_MODULE_QUIT TAOS_DEF_ERROR_CODE(0, 0x0025)
#define TSDB_CODE_RPC_MODULE_QUIT TAOS_DEF_ERROR_CODE(0, 0x0026)
#define TSDB_CODE_RPC_ASYNC_MODULE_QUIT TAOS_DEF_ERROR_CODE(0, 0x0027)

View File

@ -103,11 +103,11 @@ typedef void* queue[2];
#define TRANS_MAGIC_NUM 0x5f375a86
#define TRANS_NOVALID_PACKET(src) ((src) != TRANS_MAGIC_NUM ? 1 : 0)
typedef struct SRpcMsg STransMsg;
typedef SRpcCtx STransCtx;
typedef SRpcCtxVal STransCtxVal;
typedef SRpcInfo STrans;
typedef SRpcConnInfo STransHandleInfo;
typedef struct SRpcMsg STransMsg;
typedef SRpcCtx STransCtx;
typedef SRpcCtxVal STransCtxVal;
typedef SRpcInfo STrans;
typedef SRpcConnInfo STransHandleInfo;
// ref mgt handle
typedef struct SExHandle {
@ -250,10 +250,10 @@ typedef struct {
int8_t stop;
} SAsyncPool;
SAsyncPool* transAsyncPoolCreate(uv_loop_t* loop, int sz, void* arg, AsyncCB cb);
void transAsyncPoolDestroy(SAsyncPool* pool);
int transAsyncSend(SAsyncPool* pool, queue* mq);
bool transAsyncPoolIsEmpty(SAsyncPool* pool);
int32_t transAsyncPoolCreate(uv_loop_t* loop, int sz, void* arg, AsyncCB cb, SAsyncPool** pPool);
void transAsyncPoolDestroy(SAsyncPool* pool);
int transAsyncSend(SAsyncPool* pool, queue* mq);
bool transAsyncPoolIsEmpty(SAsyncPool* pool);
#define TRANS_DESTROY_ASYNC_POOL_MSG(pool, msgType, freeFunc, param) \
do { \

View File

@ -76,9 +76,9 @@ static void httpHandleReq(SHttpMsg* msg);
static void httpHandleQuit(SHttpMsg* msg);
static int32_t httpSendQuit(SHttpModule* http, int64_t chanId);
static int32_t httpCreateMsg(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen,
EHttpCompFlag flag, int64_t chanId, SHttpMsg** httpMsg);
static void httpDestroyMsg(SHttpMsg* msg);
static int32_t httpCreateMsg(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen,
EHttpCompFlag flag, int64_t chanId, SHttpMsg** httpMsg);
static void httpDestroyMsg(SHttpMsg* msg);
static bool httpFailFastShoudIgnoreMsg(SHashObj* pTable, char* server, int16_t port);
static void httpFailFastMayUpdate(SHashObj* pTable, char* server, int16_t port, int8_t succ);
@ -91,27 +91,27 @@ static int32_t taosSendHttpReportImplByChan(const char* server, const char* uri,
static int32_t taosBuildHttpHeader(const char* server, const char* uri, int32_t contLen, char* pHead, int32_t headLen,
EHttpCompFlag flag) {
int32_t code = 0;
EHttpCompFlag flag) {
int32_t code = 0;
int32_t len = 0;
if (flag == HTTP_FLAT) {
len = snprintf(pHead, headLen,
"POST %s HTTP/1.1\n"
"Host: %s\n"
"Content-Type: application/json\n"
"Content-Length: %d\n\n",
uri, server, contLen);
"POST %s HTTP/1.1\n"
"Host: %s\n"
"Content-Type: application/json\n"
"Content-Length: %d\n\n",
uri, server, contLen);
if (len < 0 || len >= headLen) {
code = TSDB_CODE_OUT_OF_RANGE;
}
} else if (flag == HTTP_GZIP) {
len = snprintf(pHead, headLen,
"POST %s HTTP/1.1\n"
"Host: %s\n"
"Content-Type: application/json\n"
"Content-Encoding: gzip\n"
"Content-Length: %d\n\n",
uri, server, contLen);
"POST %s HTTP/1.1\n"
"Host: %s\n"
"Content-Type: application/json\n"
"Content-Encoding: gzip\n"
"Content-Length: %d\n\n",
uri, server, contLen);
if (len < 0 || len >= headLen) {
code = TSDB_CODE_OUT_OF_RANGE;
}
@ -127,7 +127,7 @@ static int32_t taosCompressHttpRport(char* pSrc, int32_t srcLen) {
void* pDest = taosMemoryMalloc(destLen);
if (pDest == NULL) {
code= TSDB_CODE_OUT_OF_MEMORY;
code = TSDB_CODE_OUT_OF_MEMORY;
goto _OVER;
}
@ -184,7 +184,7 @@ _OVER:
if (code == 0) {
memcpy(pSrc, pDest, gzipStream.total_out);
code = gzipStream.total_out;
}
}
taosMemoryFree(pDest);
return code;
}
@ -635,8 +635,8 @@ void httpModuleDestroy2(SHttpModule* http) {
static int32_t taosSendHttpReportImplByChan(const char* server, const char* uri, uint16_t port, char* pCont,
int32_t contLen, EHttpCompFlag flag, int64_t chanId) {
SHttpModule* load = NULL;
SHttpMsg *msg = NULL;
int32_t code = httpCreateMsg(server, uri, port, pCont, contLen, flag, chanId,&msg);
SHttpMsg* msg = NULL;
int32_t code = httpCreateMsg(server, uri, port, pCont, contLen, flag, chanId, &msg);
if (code != 0) {
goto _ERROR;
}
@ -718,9 +718,8 @@ int64_t transInitHttpChanImpl() {
goto _ERROR;
}
http->asyncPool = transAsyncPoolCreate(http->loop, 1, http, httpAsyncCb);
if (http->asyncPool == NULL) {
code = terrno;
code = transAsyncPoolCreate(http->loop, 1, http, httpAsyncCb, &http->asyncPool);
if (code != 0) {
goto _ERROR;
}

View File

@ -2063,6 +2063,7 @@ static FORCE_INLINE void destroyCmsgAndAhandle(void* param) {
}
static SCliThrd* createThrdObj(void* trans) {
int32_t code = 0;
STrans* pTransInst = trans;
SCliThrd* pThrd = (SCliThrd*)taosMemoryCalloc(1, sizeof(SCliThrd));
@ -2080,10 +2081,9 @@ static SCliThrd* createThrdObj(void* trans) {
return NULL;
}
int32_t nSync = pTransInst->supportBatch ? 4 : 8;
pThrd->asyncPool = transAsyncPoolCreate(pThrd->loop, nSync, pThrd, cliAsyncCb);
if (pThrd->asyncPool == NULL) {
tError("failed to init async pool");
code = transAsyncPoolCreate(pThrd->loop, nSync, pThrd, cliAsyncCb, &pThrd->asyncPool);
if (code != 0) {
tError("failed to init async pool since:%s", tstrerror(code));
uv_loop_close(pThrd->loop);
taosMemoryFree(pThrd->loop);
taosThreadMutexDestroy(&pThrd->msgMtx);
@ -2561,9 +2561,7 @@ SCliThrd* transGetWorkThrd(STrans* trans, int64_t handle) {
return pThrd;
}
int transReleaseCliHandle(void* handle) {
int idx = -1;
bool valid = false;
int32_t code = 0;
SCliThrd* pThrd = transGetWorkThrdFromHandle(NULL, (int64_t)handle);
if (pThrd == NULL) {
return TSDB_CODE_RPC_BROKEN_LINK;
@ -2573,9 +2571,17 @@ int transReleaseCliHandle(void* handle) {
TRACE_SET_MSGID(&tmsg.info.traceId, tGenIdPI64());
STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx));
if (pCtx == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pCtx->ahandle = tmsg.info.ahandle;
SCliMsg* cmsg = taosMemoryCalloc(1, sizeof(SCliMsg));
if (cmsg == NULL) {
taosMemoryFree(pCtx);
return TSDB_CODE_OUT_OF_MEMORY;
}
cmsg->msg = tmsg;
cmsg->st = taosGetTimestampUs();
cmsg->type = Release;
@ -2584,15 +2590,19 @@ int transReleaseCliHandle(void* handle) {
STraceId* trace = &tmsg.info.traceId;
tGDebug("send release request at thread:%08" PRId64 ", malloc memory:%p", pThrd->pid, cmsg);
if (0 != transAsyncSend(pThrd->asyncPool, &cmsg->q)) {
if ((code = transAsyncSend(pThrd->asyncPool, &cmsg->q)) != 0) {
destroyCmsg(cmsg);
return TSDB_CODE_RPC_BROKEN_LINK;
return code == TSDB_CODE_RPC_ASYNC_MODULE_QUIT ? TSDB_CODE_RPC_MODULE_QUIT : code;
}
return 0;
return code;
}
static SCliMsg* transInitMsg(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* ctx) {
static int32_t transInitMsg(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* ctx, SCliMsg** pCliMsg) {
TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64());
STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx));
if (pCtx == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
epsetAssign(&pCtx->epSet, pEpSet);
epsetAssign(&pCtx->origEpSet, pEpSet);
@ -2602,13 +2612,20 @@ static SCliMsg* transInitMsg(void* shandle, const SEpSet* pEpSet, STransMsg* pRe
if (ctx != NULL) pCtx->appCtx = *ctx;
SCliMsg* cliMsg = taosMemoryCalloc(1, sizeof(SCliMsg));
if (cliMsg == NULL) {
taosMemoryFree(pCtx);
return TSDB_CODE_OUT_OF_MEMORY;
}
cliMsg->ctx = pCtx;
cliMsg->msg = *pReq;
cliMsg->st = taosGetTimestampUs();
cliMsg->type = Normal;
cliMsg->refId = (int64_t)shandle;
QUEUE_INIT(&cliMsg->seqq);
return cliMsg;
*pCliMsg = cliMsg;
return 0;
}
int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* ctx) {
@ -2617,7 +2634,7 @@ int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STran
transFreeMsg(pReq->pCont);
return TSDB_CODE_RPC_BROKEN_LINK;
}
int32_t code = 0;
int64_t handle = (int64_t)pReq->info.handle;
SCliThrd* pThrd = transGetWorkThrd(pTransInst, handle);
if (pThrd == NULL) {
@ -2630,7 +2647,10 @@ int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STran
if (exh != NULL) {
taosWLockLatch(&exh->latch);
if (exh->handle == NULL && exh->inited != 0) {
SCliMsg* pCliMsg = transInitMsg(shandle, pEpSet, pReq, ctx);
SCliMsg* pCliMsg = NULL;
code = transInitMsg(shandle, pEpSet, pReq, ctx, &pCliMsg);
ASSERT(code == 0);
QUEUE_PUSH(&exh->q, &pCliMsg->seqq);
taosWUnLockLatch(&exh->latch);
tDebug("msg refId: %" PRId64 "", handle);
@ -2642,43 +2662,65 @@ int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STran
transReleaseExHandle(transGetRefMgt(), handle);
}
}
SCliMsg* pCliMsg = transInitMsg(shandle, pEpSet, pReq, ctx);
SCliMsg* pCliMsg = NULL;
code = transInitMsg(shandle, pEpSet, pReq, ctx, &pCliMsg);
if (code != 0) {
transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
return code;
}
STraceId* trace = &pReq->info.traceId;
tGDebug("%s send request at thread:%08" PRId64 ", dst:%s:%d, app:%p", transLabel(pTransInst), pThrd->pid,
EPSET_GET_INUSE_IP(pEpSet), EPSET_GET_INUSE_PORT(pEpSet), pReq->info.ahandle);
if (0 != transAsyncSend(pThrd->asyncPool, &(pCliMsg->q))) {
if ((code = transAsyncSend(pThrd->asyncPool, &(pCliMsg->q))) != 0) {
destroyCmsg(pCliMsg);
transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
return TSDB_CODE_RPC_BROKEN_LINK;
return (code == TSDB_CODE_RPC_ASYNC_MODULE_QUIT ? TSDB_CODE_RPC_MODULE_QUIT : code);
}
transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
return 0;
}
int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp) {
STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle);
STransMsg* pTransRsp = taosMemoryCalloc(1, sizeof(STransMsg));
STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle);
if (pTransInst == NULL) {
transFreeMsg(pReq->pCont);
taosMemoryFree(pTransRsp);
return TSDB_CODE_RPC_BROKEN_LINK;
}
int32_t code = 0;
STransMsg* pTransRsp = taosMemoryCalloc(1, sizeof(STransMsg));
if (pTransRsp == NULL) {
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _RETURN1);
}
SCliThrd* pThrd = transGetWorkThrd(pTransInst, (int64_t)pReq->info.handle);
if (pThrd == NULL) {
transFreeMsg(pReq->pCont);
taosMemoryFree(pTransRsp);
transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
return TSDB_CODE_RPC_BROKEN_LINK;
TAOS_CHECK_GOTO(TSDB_CODE_RPC_BROKEN_LINK, NULL, _RETURN1);
}
tsem_t* sem = taosMemoryCalloc(1, sizeof(tsem_t));
tsem_init(sem, 0, 0);
if (sem == NULL) {
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _RETURN1);
}
code = tsem_init(sem, 0, 0);
if (code != 0) {
taosMemoryFree(sem);
code = TAOS_SYSTEM_ERROR(errno);
TAOS_CHECK_GOTO(code, NULL, _RETURN1);
}
TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64());
STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx));
if (pCtx == NULL) {
sem_destroy(sem);
taosMemoryFree(sem);
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _RETURN1);
}
epsetAssign(&pCtx->epSet, pEpSet);
epsetAssign(&pCtx->origEpSet, pEpSet);
pCtx->ahandle = pReq->info.ahandle;
@ -2687,6 +2729,13 @@ int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMs
pCtx->pRsp = pTransRsp;
SCliMsg* cliMsg = taosMemoryCalloc(1, sizeof(SCliMsg));
if (cliMsg == NULL) {
sem_destroy(sem);
taosMemoryFree(sem);
taosMemoryFree(pCtx);
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _RETURN1);
}
cliMsg->ctx = pCtx;
cliMsg->msg = *pReq;
cliMsg->st = taosGetTimestampUs();
@ -2697,11 +2746,10 @@ int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMs
tGDebug("%s send request at thread:%08" PRId64 ", dst:%s:%d, app:%p", transLabel(pTransInst), pThrd->pid,
EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle);
int ret = transAsyncSend(pThrd->asyncPool, &cliMsg->q);
if (ret != 0) {
code = transAsyncSend(pThrd->asyncPool, &cliMsg->q);
if (code != 0) {
destroyCmsg(cliMsg);
ret = TSDB_CODE_RPC_BROKEN_LINK;
goto _RETURN;
TAOS_CHECK_GOTO((code == TSDB_CODE_RPC_ASYNC_MODULE_QUIT ? TSDB_CODE_RPC_MODULE_QUIT : code), NULL, _RETURN);
}
tsem_wait(sem);
@ -2712,13 +2760,28 @@ _RETURN:
taosMemoryFree(sem);
transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
taosMemoryFree(pTransRsp);
return ret;
return code;
_RETURN1:
transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
taosMemoryFree(pTransRsp);
taosMemoryFree(pReq->pCont);
return code;
}
int64_t transCreateSyncMsg(STransMsg* pTransMsg) {
int32_t transCreateSyncMsg(STransMsg* pTransMsg, int64_t* refId) {
int32_t code = 0;
tsem2_t* sem = taosMemoryCalloc(1, sizeof(tsem2_t));
tsem2_init(sem, 0, 0);
if (sem == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
if (tsem2_init(sem, 0, 0) != 0) {
TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), NULL, _EXIT);
}
STransSyncMsg* pSyncMsg = taosMemoryCalloc(1, sizeof(STransSyncMsg));
if (pSyncMsg == NULL) {
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _EXIT);
}
taosInitRWLatch(&pSyncMsg->latch);
pSyncMsg->inited = 0;
@ -2726,39 +2789,69 @@ int64_t transCreateSyncMsg(STransMsg* pTransMsg) {
pSyncMsg->pSem = sem;
pSyncMsg->hasEpSet = 0;
return taosAddRef(transGetSyncMsgMgt(), pSyncMsg);
int64_t id = taosAddRef(transGetSyncMsgMgt(), pSyncMsg);
if (id < 0) {
TAOS_CHECK_GOTO(TSDB_CODE_REF_INVALID_ID, NULL, _EXIT);
} else {
*refId = id;
}
return 0;
_EXIT:
tsem2_destroy(sem);
taosMemoryFree(sem);
taosMemoryFree(pSyncMsg);
return code;
}
int transSendRecvWithTimeout(void* shandle, SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp, int8_t* epUpdated,
int32_t timeoutMs) {
STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle);
STransMsg* pTransMsg = taosMemoryCalloc(1, sizeof(STransMsg));
int32_t code = 0;
STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle);
if (pTransInst == NULL) {
transFreeMsg(pReq->pCont);
taosMemoryFree(pTransMsg);
return TSDB_CODE_RPC_BROKEN_LINK;
}
STransMsg* pTransMsg = taosMemoryCalloc(1, sizeof(STransMsg));
if (pTransMsg == NULL) {
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _RETURN2);
}
SCliThrd* pThrd = transGetWorkThrd(pTransInst, (int64_t)pReq->info.handle);
if (pThrd == NULL) {
transFreeMsg(pReq->pCont);
taosMemoryFree(pTransMsg);
transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
return TSDB_CODE_RPC_BROKEN_LINK;
TAOS_CHECK_GOTO(TSDB_CODE_RPC_BROKEN_LINK, NULL, _RETURN2);
}
TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64());
STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx));
if (pCtx == NULL) {
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _RETURN2);
}
epsetAssign(&pCtx->epSet, pEpSet);
epsetAssign(&pCtx->origEpSet, pEpSet);
pCtx->ahandle = pReq->info.ahandle;
pCtx->msgType = pReq->msgType;
pCtx->syncMsgRef = transCreateSyncMsg(pTransMsg);
if ((code = transCreateSyncMsg(pTransMsg, &pCtx->syncMsgRef)) != 0) {
taosMemoryFree(pCtx);
TAOS_CHECK_GOTO(code, NULL, _RETURN2);
}
int64_t ref = pCtx->syncMsgRef;
STransSyncMsg* pSyncMsg = taosAcquireRef(transGetSyncMsgMgt(), ref);
if (pSyncMsg == NULL) {
taosMemoryFree(pCtx);
TAOS_CHECK_GOTO(TSDB_CODE_REF_INVALID_ID, NULL, _RETURN2);
}
SCliMsg* cliMsg = taosMemoryCalloc(1, sizeof(SCliMsg));
if (cliMsg == NULL) {
taosMemoryFree(pCtx);
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _RETURN2);
}
cliMsg->ctx = pCtx;
cliMsg->msg = *pReq;
cliMsg->st = taosGetTimestampUs();
@ -2769,17 +2862,17 @@ int transSendRecvWithTimeout(void* shandle, SEpSet* pEpSet, STransMsg* pReq, STr
tGDebug("%s send request at thread:%08" PRId64 ", dst:%s:%d, app:%p", transLabel(pTransInst), pThrd->pid,
EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle);
int ret = transAsyncSend(pThrd->asyncPool, &cliMsg->q);
if (ret != 0) {
code = transAsyncSend(pThrd->asyncPool, &cliMsg->q);
if (code != 0) {
destroyCmsg(cliMsg);
ret = TSDB_CODE_RPC_BROKEN_LINK;
TAOS_CHECK_GOTO(code == TSDB_CODE_RPC_ASYNC_MODULE_QUIT ? TSDB_CODE_RPC_MODULE_QUIT : code, NULL, _RETURN);
goto _RETURN;
}
ret = tsem2_timewait(pSyncMsg->pSem, timeoutMs);
if (ret < 0) {
code = tsem2_timewait(pSyncMsg->pSem, timeoutMs);
if (code < 0) {
pRsp->code = TSDB_CODE_TIMEOUT_ERROR;
ret = TSDB_CODE_TIMEOUT_ERROR;
code = TSDB_CODE_TIMEOUT_ERROR;
} else {
memcpy(pRsp, pSyncMsg->pRsp, sizeof(STransMsg));
pSyncMsg->pRsp->pCont = NULL;
@ -2787,13 +2880,18 @@ int transSendRecvWithTimeout(void* shandle, SEpSet* pEpSet, STransMsg* pReq, STr
epsetAssign(pEpSet, &pSyncMsg->epSet);
*epUpdated = 1;
}
ret = 0;
code = 0;
}
_RETURN:
transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
taosReleaseRef(transGetSyncMsgMgt(), ref);
taosRemoveRef(transGetSyncMsgMgt(), ref);
return ret;
return code;
_RETURN2:
transFreeMsg(pReq->pCont);
taosMemoryFree(pTransMsg);
transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
return code;
}
/*
*
@ -2810,11 +2908,24 @@ int transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) {
tstrncpy(cvtAddr.fqdn, fqdn, sizeof(cvtAddr.fqdn));
cvtAddr.cvt = true;
}
for (int i = 0; i < pTransInst->numOfThreads; i++) {
int32_t code = 0;
int8_t i = 0;
for (i = 0; i < pTransInst->numOfThreads; i++) {
STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx));
if (pCtx == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
break;
}
pCtx->cvtAddr = cvtAddr;
SCliMsg* cliMsg = taosMemoryCalloc(1, sizeof(SCliMsg));
if (cliMsg == NULL) {
taosMemoryFree(pCtx);
code = TSDB_CODE_OUT_OF_MEMORY;
break;
}
cliMsg->ctx = pCtx;
cliMsg->type = Update;
cliMsg->refId = (int64_t)shandle;
@ -2822,21 +2933,30 @@ int transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) {
SCliThrd* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[i];
tDebug("%s update epset at thread:%08" PRId64, pTransInst->label, thrd->pid);
if (transAsyncSend(thrd->asyncPool, &(cliMsg->q)) != 0) {
if ((code = transAsyncSend(thrd->asyncPool, &(cliMsg->q))) != 0) {
destroyCmsg(cliMsg);
transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
code = (code == TSDB_CODE_RPC_ASYNC_MODULE_QUIT ? TSDB_CODE_RPC_MODULE_QUIT : code);
break;
}
}
transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
return 0;
return code;
}
int64_t transAllocHandle() {
SExHandle* exh = taosMemoryCalloc(1, sizeof(SExHandle));
if (exh == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
exh->refId = transAddExHandle(transGetRefMgt(), exh);
if (exh->refId < 0) {
taosMemoryFree(exh);
return TSDB_CODE_REF_INVALID_ID;
}
SExHandle* self = transAcquireExHandle(transGetRefMgt(), exh->refId);
ASSERT(exh == self);
if (exh != self) {
taosMemoryFree(exh);
return TSDB_CODE_REF_INVALID_ID;

View File

@ -67,7 +67,11 @@ int32_t transDecompressMsg(char** msg, int32_t len) {
STransCompMsg* pComp = (STransCompMsg*)pCont;
int32_t oriLen = htonl(pComp->contLen);
char* buf = taosMemoryCalloc(1, oriLen + sizeof(STransMsgHead));
char* buf = taosMemoryCalloc(1, oriLen + sizeof(STransMsgHead));
if (buf == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
STransMsgHead* pNewHead = (STransMsgHead*)buf;
int32_t decompLen = LZ4_decompress_safe(pCont + sizeof(STransCompMsg), (char*)pNewHead->content,
len - sizeof(STransMsgHead) - sizeof(STransCompMsg), oriLen);
@ -78,7 +82,7 @@ int32_t transDecompressMsg(char** msg, int32_t len) {
taosMemoryFree(pHead);
*msg = buf;
if (decompLen != oriLen) {
return -1;
return TSDB_CODE_INVALID_MSG;
}
return 0;
}
@ -222,19 +226,19 @@ int transSetConnOption(uv_tcp_t* stream, int keepalive) {
// int ret = uv_tcp_keepalive(stream, 5, 60);
}
SAsyncPool* transAsyncPoolCreate(uv_loop_t* loop, int sz, void* arg, AsyncCB cb) {
int32_t transAsyncPoolCreate(uv_loop_t* loop, int sz, void* arg, AsyncCB cb, SAsyncPool** pPool) {
SAsyncPool* pool = taosMemoryCalloc(1, sizeof(SAsyncPool));
if (pool == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
return TSDB_CODE_OUT_OF_MEMORY;
// return NULL;
}
int32_t code = 0;
pool->nAsync = sz;
pool->asyncs = taosMemoryCalloc(1, sizeof(uv_async_t) * pool->nAsync);
if (pool->asyncs == NULL) {
taosMemoryFree(pool);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
return TSDB_CODE_OUT_OF_MEMORY;
}
int i = 0, err = 0;
@ -243,7 +247,7 @@ SAsyncPool* transAsyncPoolCreate(uv_loop_t* loop, int sz, void* arg, AsyncCB cb)
SAsyncItem* item = taosMemoryCalloc(1, sizeof(SAsyncItem));
if (item == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
code = TSDB_CODE_OUT_OF_MEMORY;
break;
}
item->pThrd = arg;
@ -254,7 +258,7 @@ SAsyncPool* transAsyncPoolCreate(uv_loop_t* loop, int sz, void* arg, AsyncCB cb)
err = uv_async_init(loop, async, cb);
if (err != 0) {
tError("failed to init async, reason:%s", uv_err_name(err));
terrno = TSDB_CODE_THIRDPARTY_ERROR;
code = TSDB_CODE_THIRDPARTY_ERROR;
break;
}
}
@ -264,7 +268,9 @@ SAsyncPool* transAsyncPoolCreate(uv_loop_t* loop, int sz, void* arg, AsyncCB cb)
pool = NULL;
}
return pool;
*pPool = pool;
return 0;
// return pool;
}
void transAsyncPoolDestroy(SAsyncPool* pool) {
@ -289,7 +295,7 @@ bool transAsyncPoolIsEmpty(SAsyncPool* pool) {
}
int transAsyncSend(SAsyncPool* pool, queue* q) {
if (atomic_load_8(&pool->stop) == 1) {
return -1;
return TSDB_CODE_RPC_ASYNC_MODULE_QUIT;
}
int idx = pool->index % pool->nAsync;
@ -303,7 +309,12 @@ int transAsyncSend(SAsyncPool* pool, queue* q) {
taosThreadMutexLock(&item->mtx);
QUEUE_PUSH(&item->qmsg, q);
taosThreadMutexUnlock(&item->mtx);
return uv_async_send(async);
int ret = uv_async_send(async);
if (ret != 0) {
tError("failed to send async,reason:%s", uv_err_name(ret));
return TSDB_CODE_THIRDPARTY_ERROR;
}
return 0;
}
void transCtxInit(STransCtx* ctx) {

View File

@ -181,8 +181,8 @@ static void* transWorkerThread(void* arg);
static void* transAcceptThread(void* arg);
// add handle loop
static bool addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName);
static bool addHandleToAcceptloop(void* arg);
static int32_t addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName);
static int32_t addHandleToAcceptloop(void* arg);
#define SRV_RELEASE_UV(loop) \
do { \
@ -221,8 +221,16 @@ static bool uvCheckIp(SIpV4Range* pRange, int32_t ip) {
}
SIpWhiteListTab* uvWhiteListCreate() {
SIpWhiteListTab* pWhiteList = taosMemoryCalloc(1, sizeof(SIpWhiteListTab));
if (pWhiteList == NULL) {
return NULL;
}
pWhiteList->pList = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), 0, HASH_NO_LOCK);
if (pWhiteList->pList == NULL) {
taosMemoryFree(pWhiteList);
return NULL;
}
pWhiteList->ver = -1;
return pWhiteList;
}
@ -1005,17 +1013,36 @@ void uvOnPipeConnectionCb(uv_connect_t* connect, int status) {
SWorkThrd* pThrd = container_of(connect, SWorkThrd, connect_req);
uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb);
}
static bool addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName) {
static int32_t addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName) {
int32_t code = 0;
pThrd->loop = (uv_loop_t*)taosMemoryMalloc(sizeof(uv_loop_t));
if (0 != uv_loop_init(pThrd->loop)) {
return false;
if (pThrd->loop == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
if ((code = uv_loop_init(pThrd->loop)) != 0) {
tError("failed to init loop since %s", uv_err_name(code));
return TSDB_CODE_THIRDPARTY_ERROR;
}
#if defined(WINDOWS) || defined(DARWIN)
uv_pipe_init(pThrd->loop, pThrd->pipe, 1);
code = uv_pipe_init(pThrd->loop, pThrd->pipe, 1);
if (code != 0) {
tError("failed to init pip since %s", uv_err_name(code));
return TSDB_CODE_THIRDPARTY_ERROR;
}
#else
uv_pipe_init(pThrd->loop, pThrd->pipe, 1);
uv_pipe_open(pThrd->pipe, pThrd->fd);
code = uv_pipe_init(pThrd->loop, pThrd->pipe, 1);
if (code != 0) {
tError("failed to init pip since %s", uv_err_name(code));
return TSDB_CODE_THIRDPARTY_ERROR;
}
code = uv_pipe_open(pThrd->pipe, pThrd->fd);
if (code != 0) {
tError("failed to open pip since %s", uv_err_name(code));
return TSDB_CODE_THIRDPARTY_ERROR;
}
#endif
pThrd->pipe->data = pThrd;
@ -1023,50 +1050,90 @@ static bool addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName) {
QUEUE_INIT(&pThrd->msg);
pThrd->prepare = taosMemoryCalloc(1, sizeof(uv_prepare_t));
uv_prepare_init(pThrd->loop, pThrd->prepare);
uv_prepare_start(pThrd->prepare, uvPrepareCb);
if (pThrd->prepare == NULL) {
tError("failed to init prepare");
return TSDB_CODE_OUT_OF_MEMORY;
}
code = uv_prepare_init(pThrd->loop, pThrd->prepare);
if (code != 0) {
tError("failed to init prepare since %s", uv_err_name(code));
return TSDB_CODE_THIRDPARTY_ERROR;
}
code = uv_prepare_start(pThrd->prepare, uvPrepareCb);
if (code != 0) {
tError("failed to start prepare since %s", uv_err_name(code));
return TSDB_CODE_THIRDPARTY_ERROR;
}
pThrd->prepare->data = pThrd;
// conn set
QUEUE_INIT(&pThrd->conn);
pThrd->asyncPool = transAsyncPoolCreate(pThrd->loop, 8, pThrd, uvWorkerAsyncCb);
code = transAsyncPoolCreate(pThrd->loop, 8, pThrd, uvWorkerAsyncCb, &pThrd->asyncPool);
if (code != 0) {
tError("failed to init async pool since:%s", tstrerror(code));
return code;
}
#if defined(WINDOWS) || defined(DARWIN)
uv_pipe_connect(&pThrd->connect_req, pThrd->pipe, pipeName, uvOnPipeConnectionCb);
code = uv_pipe_connect(&pThrd->connect_req, pThrd->pipe, pipeName, uvOnPipeConnectionCb);
if (code != 0) {
tError("failed to start connect pipe:%s", uv_err_name(code));
return TSDB_CODE_THIRDPARTY_ERROR;
}
#else
uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb);
code = uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb);
if (code != 0) {
tError("failed to start read pipe:%s", uv_err_name(code));
return TSDB_CODE_THIRDPARTY_ERROR;
}
#endif
return true;
return 0;
}
static bool addHandleToAcceptloop(void* arg) {
static int32_t addHandleToAcceptloop(void* arg) {
// impl later
SServerObj* srv = arg;
int err = 0;
if ((err = uv_tcp_init(srv->loop, &srv->server)) != 0) {
tError("failed to init accept server:%s", uv_err_name(err));
return false;
int code = 0;
if ((code = uv_tcp_init(srv->loop, &srv->server)) != 0) {
tError("failed to init accept server since %s", uv_err_name(code));
return TSDB_CODE_THIRDPARTY_ERROR;
}
// register an async here to quit server gracefully
srv->pAcceptAsync = taosMemoryCalloc(1, sizeof(uv_async_t));
uv_async_init(srv->loop, srv->pAcceptAsync, uvAcceptAsyncCb);
if (srv->pAcceptAsync == NULL) {
tError("failed to create async since %s", tstrerror(TSDB_CODE_OUT_OF_MEMORY));
return TSDB_CODE_OUT_OF_MEMORY;
}
code = uv_async_init(srv->loop, srv->pAcceptAsync, uvAcceptAsyncCb);
if (code != 0) {
tError("failed to init async since:%s", uv_err_name(code));
return TSDB_CODE_THIRDPARTY_ERROR;
}
srv->pAcceptAsync->data = srv;
struct sockaddr_in bind_addr;
uv_ip4_addr("0.0.0.0", srv->port, &bind_addr);
if ((err = uv_tcp_bind(&srv->server, (const struct sockaddr*)&bind_addr, 0)) != 0) {
tError("failed to bind:%s", uv_err_name(err));
return false;
if ((code = uv_ip4_addr("0.0.0.0", srv->port, &bind_addr)) != 0) {
tError("failed to bind addr since %s", uv_err_name(code));
return TSDB_CODE_THIRDPARTY_ERROR;
}
if ((err = uv_listen((uv_stream_t*)&srv->server, 4096 * 2, uvOnAcceptCb)) != 0) {
tError("failed to listen:%s", uv_err_name(err));
terrno = TSDB_CODE_RPC_PORT_EADDRINUSE;
return false;
if ((code = uv_tcp_bind(&srv->server, (const struct sockaddr*)&bind_addr, 0)) != 0) {
tError("failed to bind since %s", uv_err_name(code));
return TSDB_CODE_THIRDPARTY_ERROR;
}
return true;
if ((code = uv_listen((uv_stream_t*)&srv->server, 4096 * 2, uvOnAcceptCb)) != 0) {
tError("failed to listen since %s", uv_err_name(code));
return TSDB_CODE_RPC_PORT_EADDRINUSE;
}
return 0;
}
void* transWorkerThread(void* arg) {
setThreadName("trans-svr-work");
SWorkThrd* pThrd = (SWorkThrd*)arg;
@ -1079,6 +1146,9 @@ static FORCE_INLINE SSvrConn* createConn(void* hThrd) {
SWorkThrd* pThrd = hThrd;
SSvrConn* pConn = (SSvrConn*)taosMemoryCalloc(1, sizeof(SSvrConn));
if (pConn == NULL) {
return NULL;
}
transReqQueueInit(&pConn->wreqQueue);
QUEUE_INIT(&pConn->queue);
@ -1204,24 +1274,41 @@ static void uvPipeListenCb(uv_stream_t* handle, int status) {
}
void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) {
int32_t code = 0;
SServerObj* srv = taosMemoryCalloc(1, sizeof(SServerObj));
srv->loop = (uv_loop_t*)taosMemoryMalloc(sizeof(uv_loop_t));
if (srv == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
tError("failed to init server since: %s", tstrerror(code));
return NULL;
}
srv->ip = ip;
srv->port = port;
srv->numOfThreads = numOfThreads;
srv->workerIdx = 0;
srv->numOfWorkerReady = 0;
srv->loop = (uv_loop_t*)taosMemoryMalloc(sizeof(uv_loop_t));
srv->pThreadObj = (SWorkThrd**)taosMemoryCalloc(srv->numOfThreads, sizeof(SWorkThrd*));
srv->pipe = (uv_pipe_t**)taosMemoryCalloc(srv->numOfThreads, sizeof(uv_pipe_t*));
srv->ip = ip;
srv->port = port;
uv_loop_init(srv->loop);
if (srv->loop == NULL || srv->pThreadObj == NULL || srv->pipe == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto End;
}
char pipeName[PATH_MAX];
code = uv_loop_init(srv->loop);
if (code != 0) {
tError("failed to init server since: %s", uv_err_name(code));
code = TSDB_CODE_THIRDPARTY_ERROR;
goto End;
}
if (false == taosValidIpAndPort(srv->ip, srv->port)) {
terrno = TAOS_SYSTEM_ERROR(errno);
code = TAOS_SYSTEM_ERROR(errno);
tError("invalid ip/port, %d:%d, reason:%s", srv->ip, srv->port, terrstr());
goto End;
}
char pipeName[PATH_MAX];
#if defined(WINDOWS) || defined(DARWIN)
int ret = uv_pipe_init(srv->loop, &srv->pipeListen, 0);
@ -1259,7 +1346,7 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
srv->pipe[i] = (uv_pipe_t*)taosMemoryCalloc(2, sizeof(uv_pipe_t));
thrd->pipe = &(srv->pipe[i][1]); // init read
if (false == addHandleToWorkloop(thrd, pipeName)) {
if ((code = addHandleToWorkloop(thrd, pipeName)) != 0) {
goto End;
}
@ -1276,27 +1363,53 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
for (int i = 0; i < srv->numOfThreads; i++) {
SWorkThrd* thrd = (SWorkThrd*)taosMemoryCalloc(1, sizeof(SWorkThrd));
if (thrd == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto End;
}
thrd->pTransInst = shandle;
thrd->quit = false;
thrd->pTransInst = shandle;
thrd->pWhiteList = uvWhiteListCreate();
srv->pipe[i] = (uv_pipe_t*)taosMemoryCalloc(2, sizeof(uv_pipe_t));
srv->pThreadObj[i] = thrd;
uv_os_sock_t fds[2];
if (uv_socketpair(SOCK_STREAM, 0, fds, UV_NONBLOCK_PIPE, UV_NONBLOCK_PIPE) != 0) {
if (thrd->pWhiteList == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto End;
}
uv_pipe_init(srv->loop, &(srv->pipe[i][0]), 1);
uv_pipe_open(&(srv->pipe[i][0]), fds[1]);
srv->pipe[i] = (uv_pipe_t*)taosMemoryCalloc(2, sizeof(uv_pipe_t));
if (srv->pipe[i] == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto End;
}
srv->pThreadObj[i] = thrd;
uv_os_sock_t fds[2];
if ((code = uv_socketpair(SOCK_STREAM, 0, fds, UV_NONBLOCK_PIPE, UV_NONBLOCK_PIPE)) != 0) {
tError("failed to create pipe, errmsg: %s", uv_err_name(code));
code = TSDB_CODE_THIRDPARTY_ERROR;
goto End;
}
code = uv_pipe_init(srv->loop, &(srv->pipe[i][0]), 1);
if (code != 0) {
tError("failed to init pipe, errmsg: %s", uv_err_name(code));
code = TSDB_CODE_THIRDPARTY_ERROR;
goto End;
}
code = uv_pipe_open(&(srv->pipe[i][0]), fds[1]);
if (code != 0) {
tError("failed to init pipe, errmsg: %s", uv_err_name(code));
code = TSDB_CODE_THIRDPARTY_ERROR;
goto End;
}
thrd->pipe = &(srv->pipe[i][1]); // init read
thrd->fd = fds[0];
if (false == addHandleToWorkloop(thrd, pipeName)) {
if ((code = addHandleToWorkloop(thrd, pipeName)) != 0) {
goto End;
}
@ -1311,15 +1424,17 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
}
#endif
if (false == addHandleToAcceptloop(srv)) {
if ((code = addHandleToAcceptloop(srv)) != 0) {
goto End;
}
int err = taosThreadCreate(&srv->thread, NULL, transAcceptThread, (void*)srv);
if (err == 0) {
code = taosThreadCreate(&srv->thread, NULL, transAcceptThread, (void*)srv);
if (code == 0) {
tDebug("success to create accept-thread");
} else {
tError("failed to create accept-thread");
code = TAOS_SYSTEM_ERROR(errno);
tError("failed to create accept-thread since %s", tstrerror(code));
goto End;
// clear all resource later
}

View File

@ -55,8 +55,10 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RPC_TIMEOUT, "Conn read timeout")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED, "some vnode/qnode/mnode(s) out of service")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_MAX_SESSIONS, "rpc open too many session")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_NETWORK_ERROR, "rpc network error")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_NETWORK_BUSY, "rpc network busy")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_NETWORK_BUSY, "rpc network busy")
TAOS_DEFINE_ERROR(TSDB_CODE_HTTP_MODULE_QUIT, "http-report already quit")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_MODULE_QUIT, "rpc module already quit")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_ASYNC_MODULE_QUIT, "rpc async module already quit")
//common & util
TAOS_DEFINE_ERROR(TSDB_CODE_TIME_UNSYNCED, "Client and server's time is not synchronized")