fix: fix fd limit crash

This commit is contained in:
yihaoDeng 2023-02-03 12:08:16 +08:00
parent b1192d5084
commit cdf362b588
3 changed files with 38 additions and 7 deletions

View File

@ -1460,6 +1460,7 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
tscError("failed to sched msg to tsc, tsc ready to quit");
rpcFreeCont(pMsg->pCont);
taosMemoryFree(arg->pEpset);
destroySendMsgInfo(pMsg->info.ahandle);
taosMemoryFree(arg);
}
}

View File

@ -1275,7 +1275,11 @@ void* transInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads,
for (int i = 0; i < cli->numOfThreads; i++) {
SCliThrd* pThrd = createThrdObj(shandle);
int err = taosThreadCreate(&pThrd->thread, NULL, cliWorkThread, (void*)(pThrd));
if (pThrd == NULL) {
return NULL;
}
int err = taosThreadCreate(&pThrd->thread, NULL, cliWorkThread, (void*)(pThrd));
if (err == 0) {
tDebug("success to create tranport-cli thread:%d", i);
}
@ -1332,9 +1336,22 @@ static SCliThrd* createThrdObj(void* trans) {
taosThreadMutexInit(&pThrd->msgMtx, NULL);
pThrd->loop = (uv_loop_t*)taosMemoryMalloc(sizeof(uv_loop_t));
uv_loop_init(pThrd->loop);
int err = uv_loop_init(pThrd->loop);
if (err != 0) {
tError("failed to init uv_loop, reason:%s", uv_err_name(err));
taosMemoryFree(pThrd->loop);
taosThreadMutexDestroy(&pThrd->msgMtx);
taosMemoryFree(pThrd);
return NULL;
}
pThrd->asyncPool = transAsyncPoolCreate(pThrd->loop, 8, pThrd, cliAsyncCb);
if (pThrd->asyncPool == NULL) {
uv_loop_close(pThrd->loop);
taosMemoryFree(pThrd->loop);
taosThreadMutexDestroy(&pThrd->msgMtx);
taosMemoryFree(pThrd);
return NULL;
}
pThrd->prepare = taosMemoryCalloc(1, sizeof(uv_prepare_t));
uv_prepare_init(pThrd->loop, pThrd->prepare);

View File

@ -214,24 +214,37 @@ SAsyncPool* transAsyncPoolCreate(uv_loop_t* loop, int sz, void* arg, AsyncCB cb)
pool->nAsync = sz;
pool->asyncs = taosMemoryCalloc(1, sizeof(uv_async_t) * pool->nAsync);
for (int i = 0; i < pool->nAsync; i++) {
int i = 0, err = 0;
for (i = 0; i < pool->nAsync; i++) {
uv_async_t* async = &(pool->asyncs[i]);
SAsyncItem* item = taosMemoryCalloc(1, sizeof(SAsyncItem));
item->pThrd = arg;
QUEUE_INIT(&item->qmsg);
taosThreadMutexInit(&item->mtx, NULL);
uv_async_t* async = &(pool->asyncs[i]);
uv_async_init(loop, async, cb);
async->data = item;
err = uv_async_init(loop, async, cb);
if (err != 0) {
tError("failed to init async, reason:%s", uv_err_name(err));
break;
}
}
if (i != pool->nAsync) {
transAsyncPoolDestroy(pool);
pool = NULL;
}
return pool;
}
void transAsyncPoolDestroy(SAsyncPool* pool) {
for (int i = 0; i < pool->nAsync; i++) {
uv_async_t* async = &(pool->asyncs[i]);
SAsyncItem* item = async->data;
if (item == NULL) continue;
taosThreadMutexDestroy(&item->mtx);
taosMemoryFree(item);
}