fix: memory leak
This commit is contained in:
parent
cb1318a76b
commit
ef05b1bc95
|
@ -218,6 +218,7 @@ void doSetOneRowPtr(SReqResultInfo* pResultInfo);
|
||||||
void setResPrecision(SReqResultInfo* pResInfo, int32_t precision);
|
void setResPrecision(SReqResultInfo* pResInfo, int32_t precision);
|
||||||
int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp, bool convertUcs4);
|
int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp, bool convertUcs4);
|
||||||
void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols);
|
void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols);
|
||||||
|
void doFreeReqResultInfo(SReqResultInfo* pResInfo);
|
||||||
|
|
||||||
static FORCE_INLINE SReqResultInfo* tmqGetCurResInfo(TAOS_RES* res) {
|
static FORCE_INLINE SReqResultInfo* tmqGetCurResInfo(TAOS_RES* res) {
|
||||||
SMqRspObj* msg = (SMqRspObj*)res;
|
SMqRspObj* msg = (SMqRspObj*)res;
|
||||||
|
|
|
@ -30,15 +30,15 @@
|
||||||
#define TSC_VAR_RELEASED 0
|
#define TSC_VAR_RELEASED 0
|
||||||
|
|
||||||
SAppInfo appInfo;
|
SAppInfo appInfo;
|
||||||
int32_t clientReqRefPool = -1;
|
int32_t clientReqRefPool = -1;
|
||||||
int32_t clientConnRefPool = -1;
|
int32_t clientConnRefPool = -1;
|
||||||
|
|
||||||
static TdThreadOnce tscinit = PTHREAD_ONCE_INIT;
|
static TdThreadOnce tscinit = PTHREAD_ONCE_INIT;
|
||||||
volatile int32_t tscInitRes = 0;
|
volatile int32_t tscInitRes = 0;
|
||||||
|
|
||||||
static void registerRequest(SRequestObj *pRequest) {
|
static void registerRequest(SRequestObj *pRequest) {
|
||||||
STscObj *pTscObj = acquireTscObj(pRequest->pTscObj->id);
|
STscObj *pTscObj = acquireTscObj(pRequest->pTscObj->id);
|
||||||
|
|
||||||
assert(pTscObj != NULL);
|
assert(pTscObj != NULL);
|
||||||
|
|
||||||
// connection has been released already, abort creating request.
|
// connection has been released already, abort creating request.
|
||||||
|
@ -49,8 +49,8 @@ static void registerRequest(SRequestObj *pRequest) {
|
||||||
if (pTscObj->pAppInfo) {
|
if (pTscObj->pAppInfo) {
|
||||||
SInstanceSummary *pSummary = &pTscObj->pAppInfo->summary;
|
SInstanceSummary *pSummary = &pTscObj->pAppInfo->summary;
|
||||||
|
|
||||||
int32_t total = atomic_add_fetch_64((int64_t*)&pSummary->totalRequests, 1);
|
int32_t total = atomic_add_fetch_64((int64_t *)&pSummary->totalRequests, 1);
|
||||||
int32_t currentInst = atomic_add_fetch_64((int64_t*)&pSummary->currentRequests, 1);
|
int32_t currentInst = atomic_add_fetch_64((int64_t *)&pSummary->currentRequests, 1);
|
||||||
tscDebug("0x%" PRIx64 " new Request from connObj:0x%" PRIx64
|
tscDebug("0x%" PRIx64 " new Request from connObj:0x%" PRIx64
|
||||||
", current:%d, app current:%d, total:%d, reqId:0x%" PRIx64,
|
", current:%d, app current:%d, total:%d, reqId:0x%" PRIx64,
|
||||||
pRequest->self, pRequest->pTscObj->id, num, currentInst, total, pRequest->requestId);
|
pRequest->self, pRequest->pTscObj->id, num, currentInst, total, pRequest->requestId);
|
||||||
|
@ -60,16 +60,16 @@ static void registerRequest(SRequestObj *pRequest) {
|
||||||
static void deregisterRequest(SRequestObj *pRequest) {
|
static void deregisterRequest(SRequestObj *pRequest) {
|
||||||
assert(pRequest != NULL);
|
assert(pRequest != NULL);
|
||||||
|
|
||||||
STscObj * pTscObj = pRequest->pTscObj;
|
STscObj *pTscObj = pRequest->pTscObj;
|
||||||
SInstanceSummary *pActivity = &pTscObj->pAppInfo->summary;
|
SInstanceSummary *pActivity = &pTscObj->pAppInfo->summary;
|
||||||
|
|
||||||
int32_t currentInst = atomic_sub_fetch_64((int64_t*)&pActivity->currentRequests, 1);
|
int32_t currentInst = atomic_sub_fetch_64((int64_t *)&pActivity->currentRequests, 1);
|
||||||
int32_t num = atomic_sub_fetch_32(&pTscObj->numOfReqs, 1);
|
int32_t num = atomic_sub_fetch_32(&pTscObj->numOfReqs, 1);
|
||||||
|
|
||||||
int64_t duration = taosGetTimestampUs() - pRequest->metric.start;
|
int64_t duration = taosGetTimestampUs() - pRequest->metric.start;
|
||||||
tscDebug("0x%" PRIx64 " free Request from connObj: 0x%" PRIx64 ", reqId:0x%" PRIx64 " elapsed:%" PRIu64
|
tscDebug("0x%" PRIx64 " free Request from connObj: 0x%" PRIx64 ", reqId:0x%" PRIx64 " elapsed:%" PRIu64
|
||||||
" ms, current:%d, app current:%d",
|
" ms, current:%d, app current:%d",
|
||||||
pRequest->self, pTscObj->id, pRequest->requestId, duration/1000, num, currentInst);
|
pRequest->self, pTscObj->id, pRequest->requestId, duration / 1000, num, currentInst);
|
||||||
releaseTscObj(pTscObj->id);
|
releaseTscObj(pTscObj->id);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -109,12 +109,12 @@ void *openTransporter(const char *user, const char *auth, int32_t numOfThread) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void closeAllRequests(SHashObj *pRequests) {
|
void closeAllRequests(SHashObj *pRequests) {
|
||||||
void *pIter = taosHashIterate(pRequests, NULL);
|
void *pIter = taosHashIterate(pRequests, NULL);
|
||||||
while (pIter != NULL) {
|
while (pIter != NULL) {
|
||||||
int64_t *rid = pIter;
|
int64_t *rid = pIter;
|
||||||
|
|
||||||
releaseRequest(*rid);
|
releaseRequest(*rid);
|
||||||
|
|
||||||
pIter = taosHashIterate(pRequests, pIter);
|
pIter = taosHashIterate(pRequests, pIter);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -144,7 +144,7 @@ void *createTscObj(const char *user, const char *auth, const char *db, SAppInstI
|
||||||
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
pObj->pAppInfo = pAppInfo;
|
pObj->pAppInfo = pAppInfo;
|
||||||
tstrncpy(pObj->user, user, sizeof(pObj->user));
|
tstrncpy(pObj->user, user, sizeof(pObj->user));
|
||||||
memcpy(pObj->pass, auth, TSDB_PASSWORD_LEN);
|
memcpy(pObj->pass, auth, TSDB_PASSWORD_LEN);
|
||||||
|
@ -160,13 +160,9 @@ void *createTscObj(const char *user, const char *auth, const char *db, SAppInstI
|
||||||
return pObj;
|
return pObj;
|
||||||
}
|
}
|
||||||
|
|
||||||
STscObj *acquireTscObj(int64_t rid) {
|
STscObj *acquireTscObj(int64_t rid) { return (STscObj *)taosAcquireRef(clientConnRefPool, rid); }
|
||||||
return (STscObj *)taosAcquireRef(clientConnRefPool, rid);
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t releaseTscObj(int64_t rid) {
|
int32_t releaseTscObj(int64_t rid) { return taosReleaseRef(clientConnRefPool, rid); }
|
||||||
return taosReleaseRef(clientConnRefPool, rid);
|
|
||||||
}
|
|
||||||
|
|
||||||
void *createRequest(STscObj *pObj, __taos_async_fn_t fp, void *param, int32_t type) {
|
void *createRequest(STscObj *pObj, __taos_async_fn_t fp, void *param, int32_t type) {
|
||||||
assert(pObj != NULL);
|
assert(pObj != NULL);
|
||||||
|
@ -189,11 +185,11 @@ void *createRequest(STscObj *pObj, __taos_async_fn_t fp, void *param, int32_t ty
|
||||||
tsem_init(&pRequest->body.rspSem, 0, 0);
|
tsem_init(&pRequest->body.rspSem, 0, 0);
|
||||||
|
|
||||||
registerRequest(pRequest);
|
registerRequest(pRequest);
|
||||||
|
|
||||||
return pRequest;
|
return pRequest;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void doFreeReqResultInfo(SReqResultInfo *pResInfo) {
|
void doFreeReqResultInfo(SReqResultInfo *pResInfo) {
|
||||||
taosMemoryFreeClear(pResInfo->pRspMsg);
|
taosMemoryFreeClear(pResInfo->pRspMsg);
|
||||||
taosMemoryFreeClear(pResInfo->length);
|
taosMemoryFreeClear(pResInfo->length);
|
||||||
taosMemoryFreeClear(pResInfo->row);
|
taosMemoryFreeClear(pResInfo->row);
|
||||||
|
@ -216,7 +212,7 @@ static void doDestroyRequest(void *p) {
|
||||||
assert(RID_VALID(pRequest->self));
|
assert(RID_VALID(pRequest->self));
|
||||||
|
|
||||||
taosHashRemove(pRequest->pTscObj->pRequests, &pRequest->self, sizeof(pRequest->self));
|
taosHashRemove(pRequest->pTscObj->pRequests, &pRequest->self, sizeof(pRequest->self));
|
||||||
|
|
||||||
taosMemoryFreeClear(pRequest->msgBuf);
|
taosMemoryFreeClear(pRequest->msgBuf);
|
||||||
taosMemoryFreeClear(pRequest->sqlstr);
|
taosMemoryFreeClear(pRequest->sqlstr);
|
||||||
taosMemoryFreeClear(pRequest->pDb);
|
taosMemoryFreeClear(pRequest->pDb);
|
||||||
|
@ -243,14 +239,9 @@ void destroyRequest(SRequestObj *pRequest) {
|
||||||
taosRemoveRef(clientReqRefPool, pRequest->self);
|
taosRemoveRef(clientReqRefPool, pRequest->self);
|
||||||
}
|
}
|
||||||
|
|
||||||
SRequestObj *acquireRequest(int64_t rid) {
|
SRequestObj *acquireRequest(int64_t rid) { return (SRequestObj *)taosAcquireRef(clientReqRefPool, rid); }
|
||||||
return (SRequestObj *)taosAcquireRef(clientReqRefPool, rid);
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t releaseRequest(int64_t rid) {
|
|
||||||
return taosReleaseRef(clientReqRefPool, rid);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
int32_t releaseRequest(int64_t rid) { return taosReleaseRef(clientReqRefPool, rid); }
|
||||||
|
|
||||||
void taos_init_imp(void) {
|
void taos_init_imp(void) {
|
||||||
// In the APIs of other program language, taos_cleanup is not available yet.
|
// In the APIs of other program language, taos_cleanup is not available yet.
|
||||||
|
@ -379,7 +370,7 @@ uint64_t generateRequestId() {
|
||||||
}
|
}
|
||||||
|
|
||||||
uint64_t id = 0;
|
uint64_t id = 0;
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
int64_t ts = taosGetTimestampMs();
|
int64_t ts = taosGetTimestampMs();
|
||||||
uint64_t pid = taosGetPId();
|
uint64_t pid = taosGetPId();
|
||||||
|
|
|
@ -131,6 +131,16 @@ void taos_free_result(TAOS_RES *res) {
|
||||||
if (TD_RES_QUERY(res)) {
|
if (TD_RES_QUERY(res)) {
|
||||||
SRequestObj *pRequest = (SRequestObj *)res;
|
SRequestObj *pRequest = (SRequestObj *)res;
|
||||||
destroyRequest(pRequest);
|
destroyRequest(pRequest);
|
||||||
|
} else if (TD_RES_TMQ(res)) {
|
||||||
|
SMqRspObj *pRsp = (SMqRspObj *)res;
|
||||||
|
if (pRsp->rsp.blockData) taosArrayDestroyP(pRsp->rsp.blockData, taosMemoryFree);
|
||||||
|
if (pRsp->rsp.blockDataLen) taosArrayDestroy(pRsp->rsp.blockDataLen);
|
||||||
|
if (pRsp->rsp.blockSchema) taosArrayDestroy(pRsp->rsp.blockSchema);
|
||||||
|
if (pRsp->rsp.blockTbName) taosArrayDestroy(pRsp->rsp.blockTbName);
|
||||||
|
if (pRsp->rsp.blockTags) taosArrayDestroy(pRsp->rsp.blockTags);
|
||||||
|
if (pRsp->rsp.blockTagSchema) taosArrayDestroy(pRsp->rsp.blockTagSchema);
|
||||||
|
pRsp->resInfo.pRspMsg = NULL;
|
||||||
|
doFreeReqResultInfo(&pRsp->resInfo);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -380,7 +380,7 @@ int64_t taosWriteFile(TdFilePtr pFile, const void *buf, int64_t count) {
|
||||||
#if FILE_WITH_LOCK
|
#if FILE_WITH_LOCK
|
||||||
taosThreadRwlockWrlock(&(pFile->rwlock));
|
taosThreadRwlockWrlock(&(pFile->rwlock));
|
||||||
#endif
|
#endif
|
||||||
/*assert(pFile->fd >= 0); // Please check if you have closed the file.*/
|
assert(pFile->fd >= 0); // Please check if you have closed the file.
|
||||||
|
|
||||||
int64_t nleft = count;
|
int64_t nleft = count;
|
||||||
int64_t nwritten = 0;
|
int64_t nwritten = 0;
|
||||||
|
|
|
@ -221,7 +221,7 @@ static void *taosThreadToOpenNewFile(void *param) {
|
||||||
tsLogObj.logHandle->pFile = pFile;
|
tsLogObj.logHandle->pFile = pFile;
|
||||||
tsLogObj.lines = 0;
|
tsLogObj.lines = 0;
|
||||||
tsLogObj.openInProgress = 0;
|
tsLogObj.openInProgress = 0;
|
||||||
taosSsleep(3);
|
taosSsleep(10);
|
||||||
taosCloseLogByFd(pOldFile);
|
taosCloseLogByFd(pOldFile);
|
||||||
|
|
||||||
uInfo(" new log file:%d is opened", tsLogObj.flag);
|
uInfo(" new log file:%d is opened", tsLogObj.flag);
|
||||||
|
|
Loading…
Reference in New Issue