refactor transport

This commit is contained in:
Yihao Deng 2024-07-24 01:00:20 +00:00
parent 6eb9774edb
commit bd2e129686
1 changed files with 33 additions and 25 deletions

View File

@ -114,7 +114,7 @@ typedef struct SCliThrd {
SDelayQueue* timeoutQueue;
SDelayQueue* waitConnQueue;
uint64_t nextTimeout; // next timeout
void* pTransInst; //
STrans* pTransInst; //
int connCount;
void (*destroyAhandleFp)(void* ahandle);
@ -2052,12 +2052,12 @@ bool cliRecvReleaseReq(SCliConn* conn, STransMsgHead* pHead) {
}
static void* cliWorkThread(void* arg) {
char threadName[TSDB_LABEL_LEN] = {0};
SCliThrd* pThrd = (SCliThrd*)arg;
pThrd->pid = taosGetSelfPthreadId();
char threadName[TSDB_LABEL_LEN] = {0};
STrans* pInst = pThrd->pTransInst;
strtolower(threadName, pInst->label);
strtolower(threadName, pThrd->pTransInst->label);
setThreadName(threadName);
uv_run(pThrd->loop, UV_RUN_DEFAULT);
@ -2121,11 +2121,9 @@ static FORCE_INLINE void destroyCmsg(void* arg) {
taosMemoryFree(pMsg);
}
static FORCE_INLINE void destroyCmsgWrapper(void* arg, void* param) {
SCliMsg* pMsg = arg;
if (pMsg == NULL) {
return;
}
if (arg == NULL) return;
SCliMsg* pMsg = arg;
SCliThrd* pThrd = param;
if (pMsg->msg.info.notFreeAhandle == 0 && pThrd != NULL) {
if (pThrd->destroyAhandleFp) (*pThrd->destroyAhandleFp)(pMsg->msg.info.ahandle);
@ -2168,8 +2166,7 @@ static int32_t createThrdObj(void* trans, SCliThrd** ppThrd) {
code = uv_loop_init(pThrd->loop);
if (code != 0) {
tError("failed to init uv_loop, reason:%s", uv_err_name(code));
code = TSDB_CODE_THIRDPARTY_ERROR;
TAOS_CHECK_GOTO(code, NULL, _end);
TAOS_CHECK_GOTO(TSDB_CODE_THIRDPARTY_ERROR, NULL, _end);
}
int32_t nSync = pTransInst->supportBatch ? 4 : 8;
@ -2188,8 +2185,7 @@ static int32_t createThrdObj(void* trans, SCliThrd** ppThrd) {
code = uv_prepare_init(pThrd->loop, pThrd->prepare);
if (code != 0) {
tError("failed to create prepre since:%s", uv_err_name(code));
code = TSDB_CODE_THIRDPARTY_ERROR;
TAOS_CHECK_GOTO(code, NULL, _end);
TAOS_CHECK_GOTO(TSDB_CODE_THIRDPARTY_ERROR, NULL, _end);
}
pThrd->prepare->data = pThrd;
@ -2197,14 +2193,13 @@ static int32_t createThrdObj(void* trans, SCliThrd** ppThrd) {
pThrd->timerList = taosArrayInit(timerSize, sizeof(void*));
if (pThrd->timerList == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TAOS_CHECK_GOTO(code, NULL, _end);
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end);
}
for (int i = 0; i < timerSize; i++) {
uv_timer_t* timer = taosMemoryCalloc(1, sizeof(uv_timer_t));
if (timer == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TAOS_CHECK_GOTO(code, NULL, _end);
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end);
}
uv_timer_init(pThrd->loop, timer);
taosArrayPush(pThrd->timerList, &timer);
@ -2213,7 +2208,7 @@ static int32_t createThrdObj(void* trans, SCliThrd** ppThrd) {
pThrd->pool = createConnPool(4);
if (pThrd->pool == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TAOS_CHECK_GOTO(code, NULL, _end);
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end);
}
if ((code = transDQCreate(pThrd->loop, &pThrd->delayQueue)) != 0) {
TAOS_CHECK_GOTO(code, NULL, _end);
@ -2230,19 +2225,16 @@ static int32_t createThrdObj(void* trans, SCliThrd** ppThrd) {
pThrd->destroyAhandleFp = pTransInst->destroyFp;
pThrd->fqdn2ipCache = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
if (pThrd->fqdn2ipCache == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TAOS_CHECK_GOTO(code, NULL, _end);
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end);
}
pThrd->failFastCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
if (pThrd->failFastCache == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TAOS_CHECK_GOTO(code, NULL, _end);
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end);
}
pThrd->batchCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
if (pThrd->batchCache == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TAOS_CHECK_GOTO(code, NULL, _end);
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end);
}
pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pTransInst->idleTime);
@ -2324,12 +2316,23 @@ static FORCE_INLINE void transDestroyConnCtx(STransConnCtx* ctx) {
taosMemoryFree(ctx);
}
void cliSendQuit(SCliThrd* thrd) {
int32_t cliSendQuit(SCliThrd* thrd) {
// cli can stop gracefully
int32_t code = 0;
SCliMsg* msg = taosMemoryCalloc(1, sizeof(SCliMsg));
if (msg == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
msg->type = Quit;
transAsyncSend(thrd->asyncPool, &msg->q);
if ((code = transAsyncSend(thrd->asyncPool, &msg->q)) != 0) {
code = (code == TSDB_CODE_RPC_ASYNC_MODULE_QUIT ? TSDB_CODE_RPC_MODULE_QUIT : code);
taosMemoryFree(msg);
return code;
}
atomic_store_8(&thrd->asyncPool->stop, 1);
return 0;
}
void cliWalkCb(uv_handle_t* handle, void* arg) {
if (!uv_is_closing(handle)) {
@ -2653,9 +2656,14 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
}
void transCloseClient(void* arg) {
int32_t code = 0;
SCliObj* cli = arg;
for (int i = 0; i < cli->numOfThreads; i++) {
cliSendQuit(cli->pThreadObj[i]);
code = cliSendQuit(cli->pThreadObj[i]);
if (code != 0) {
tError("failed to send quit to thread:%d, reason:%s", i, tstrerror(code));
}
destroyThrdObj(cli->pThreadObj[i]);
}
taosMemoryFree(cli->pThreadObj);