Merge pull request #27819 from taosdata/fix/TD-31771-3.0
fix: memleak in taos (create table)
This commit is contained in:
commit
8c1a04138a
|
@ -46,6 +46,7 @@ void *taosAcquireRef(int32_t rsetId, int64_t rid);
|
||||||
// release ref, rid is the reference ID returned by taosAddRef
|
// release ref, rid is the reference ID returned by taosAddRef
|
||||||
// return 0 if success. On error, -1 is returned, and terrno is set appropriately
|
// return 0 if success. On error, -1 is returned, and terrno is set appropriately
|
||||||
int32_t taosReleaseRef(int32_t rsetId, int64_t rid);
|
int32_t taosReleaseRef(int32_t rsetId, int64_t rid);
|
||||||
|
int32_t taosReleaseRefEx(int32_t rsetId, int64_t rid, int32_t* isReleased);
|
||||||
|
|
||||||
// return the first reference if rid is 0, otherwise return the next after current reference.
|
// return the first reference if rid is 0, otherwise return the next after current reference.
|
||||||
// if return value is NULL, it means list is over(if terrno is set, it means error happens)
|
// if return value is NULL, it means list is over(if terrno is set, it means error happens)
|
||||||
|
|
|
@ -1165,6 +1165,7 @@ void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) {
|
||||||
(void)atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertRows, pResult->numOfRows);
|
(void)atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertRows, pResult->numOfRows);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
schedulerFreeJob(&pRequest->body.queryJob, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosMemoryFree(pResult);
|
taosMemoryFree(pResult);
|
||||||
|
|
|
@ -13811,7 +13811,6 @@ _OUT:
|
||||||
}
|
}
|
||||||
|
|
||||||
typedef struct SParseFileContext {
|
typedef struct SParseFileContext {
|
||||||
SHashObj* pTbNameHash;
|
|
||||||
SArray* aTagNames;
|
SArray* aTagNames;
|
||||||
bool tagNameFilled;
|
bool tagNameFilled;
|
||||||
STableMeta* pStbMeta;
|
STableMeta* pStbMeta;
|
||||||
|
@ -13946,18 +13945,6 @@ static int32_t parseCsvFile(SMsgBuf* pMsgBuf, SParseContext* pParseCxt, SParseFi
|
||||||
|
|
||||||
code = parseOneStbRow(pMsgBuf, pParFileCxt);
|
code = parseOneStbRow(pMsgBuf, pParFileCxt);
|
||||||
|
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
|
||||||
if (taosHashGet(pParFileCxt->pTbNameHash, pParFileCxt->ctbName.tname, strlen(pParFileCxt->ctbName.tname) + 1) !=
|
|
||||||
NULL) {
|
|
||||||
taosMemoryFreeClear(pParFileCxt->pTag);
|
|
||||||
code = generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_TBNAME_DUPLICATED, pParFileCxt->ctbName.tname);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
code = taosHashPut(pParFileCxt->pTbNameHash, pParFileCxt->ctbName.tname, strlen(pParFileCxt->ctbName.tname) + 1,
|
|
||||||
NULL, 0);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = fillVgroupInfo(pParseCxt, &pParFileCxt->ctbName, &pParFileCxt->vg);
|
code = fillVgroupInfo(pParseCxt, &pParFileCxt->ctbName, &pParFileCxt->vg);
|
||||||
}
|
}
|
||||||
|
@ -13997,7 +13984,6 @@ static void destructParseFileContext(SParseFileContext** ppParFileCxt) {
|
||||||
|
|
||||||
SParseFileContext* pParFileCxt = *ppParFileCxt;
|
SParseFileContext* pParFileCxt = *ppParFileCxt;
|
||||||
|
|
||||||
taosHashCleanup(pParFileCxt->pTbNameHash);
|
|
||||||
taosArrayDestroy(pParFileCxt->aTagNames);
|
taosArrayDestroy(pParFileCxt->aTagNames);
|
||||||
taosMemoryFreeClear(pParFileCxt->pStbMeta);
|
taosMemoryFreeClear(pParFileCxt->pStbMeta);
|
||||||
taosArrayDestroy(pParFileCxt->aTagIndexs);
|
taosArrayDestroy(pParFileCxt->aTagIndexs);
|
||||||
|
@ -14022,15 +14008,6 @@ static int32_t constructParseFileContext(SCreateSubTableFromFileClause* pStmt, S
|
||||||
pParFileCxt->ctbName.acctId = acctId;
|
pParFileCxt->ctbName.acctId = acctId;
|
||||||
strcpy(pParFileCxt->ctbName.dbname, pStmt->useDbName);
|
strcpy(pParFileCxt->ctbName.dbname, pStmt->useDbName);
|
||||||
|
|
||||||
if (NULL == pParFileCxt->pTbNameHash) {
|
|
||||||
pParFileCxt->pTbNameHash =
|
|
||||||
taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), false, HASH_NO_LOCK);
|
|
||||||
if (!pParFileCxt->pTbNameHash) {
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
goto _ERR;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (NULL == pParFileCxt->aTagNames) {
|
if (NULL == pParFileCxt->aTagNames) {
|
||||||
pParFileCxt->aTagNames = taosArrayInit(8, TSDB_COL_NAME_LEN);
|
pParFileCxt->aTagNames = taosArrayInit(8, TSDB_COL_NAME_LEN);
|
||||||
if (NULL == pParFileCxt->aTagNames) {
|
if (NULL == pParFileCxt->aTagNames) {
|
||||||
|
|
|
@ -572,6 +572,7 @@ int32_t schDelayLaunchTask(SSchJob *pJob, SSchTask *pTask);
|
||||||
int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, SQueryNodeAddr *addr, int32_t msgType, void *param);
|
int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, SQueryNodeAddr *addr, int32_t msgType, void *param);
|
||||||
int32_t schAcquireJob(int64_t refId, SSchJob **ppJob);
|
int32_t schAcquireJob(int64_t refId, SSchJob **ppJob);
|
||||||
int32_t schReleaseJob(int64_t refId);
|
int32_t schReleaseJob(int64_t refId);
|
||||||
|
int32_t schReleaseJobEx(int64_t refId, int32_t* released);
|
||||||
void schFreeFlowCtrl(SSchJob *pJob);
|
void schFreeFlowCtrl(SSchJob *pJob);
|
||||||
int32_t schChkJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel);
|
int32_t schChkJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel);
|
||||||
int32_t schDecTaskFlowQuota(SSchJob *pJob, SSchTask *pTask);
|
int32_t schDecTaskFlowQuota(SSchJob *pJob, SSchTask *pTask);
|
||||||
|
|
|
@ -41,6 +41,15 @@ FORCE_INLINE int32_t schReleaseJob(int64_t refId) {
|
||||||
return taosReleaseRef(schMgmt.jobRef, refId);
|
return taosReleaseRef(schMgmt.jobRef, refId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
FORCE_INLINE int32_t schReleaseJobEx(int64_t refId, int32_t* released) {
|
||||||
|
if (0 == refId) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
qDebug("sch release ex jobId:0x%" PRIx64, refId);
|
||||||
|
return taosReleaseRefEx(schMgmt.jobRef, refId, released);
|
||||||
|
}
|
||||||
|
|
||||||
int32_t schDumpEpSet(SEpSet *pEpSet, char** ppRes) {
|
int32_t schDumpEpSet(SEpSet *pEpSet, char** ppRes) {
|
||||||
*ppRes = NULL;
|
*ppRes = NULL;
|
||||||
if (NULL == pEpSet) {
|
if (NULL == pEpSet) {
|
||||||
|
|
|
@ -179,9 +179,12 @@ void schedulerFreeJob(int64_t *jobId, int32_t errCode) {
|
||||||
SCH_JOB_DLOG("start to free job 0x%" PRIx64 ", code:%s", *jobId, tstrerror(errCode));
|
SCH_JOB_DLOG("start to free job 0x%" PRIx64 ", code:%s", *jobId, tstrerror(errCode));
|
||||||
(void)schHandleJobDrop(pJob, errCode); // ignore any error
|
(void)schHandleJobDrop(pJob, errCode); // ignore any error
|
||||||
|
|
||||||
(void)schReleaseJob(*jobId); // ignore error
|
int32_t released = false;
|
||||||
|
(void)schReleaseJobEx(*jobId, &released); // ignore error
|
||||||
|
if (released) {
|
||||||
*jobId = 0;
|
*jobId = 0;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void schedulerDestroy(void) {
|
void schedulerDestroy(void) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
|
@ -56,7 +56,7 @@ static void taosLockList(int64_t *lockedBy);
|
||||||
static void taosUnlockList(int64_t *lockedBy);
|
static void taosUnlockList(int64_t *lockedBy);
|
||||||
static void taosIncRsetCount(SRefSet *pSet);
|
static void taosIncRsetCount(SRefSet *pSet);
|
||||||
static void taosDecRsetCount(SRefSet *pSet);
|
static void taosDecRsetCount(SRefSet *pSet);
|
||||||
static int32_t taosDecRefCount(int32_t rsetId, int64_t rid, int32_t remove);
|
static int32_t taosDecRefCount(int32_t rsetId, int64_t rid, int32_t remove, int32_t* isReleased);
|
||||||
|
|
||||||
int32_t taosOpenRef(int32_t max, RefFp fp) {
|
int32_t taosOpenRef(int32_t max, RefFp fp) {
|
||||||
SRefNode **nodeList;
|
SRefNode **nodeList;
|
||||||
|
@ -182,7 +182,7 @@ int64_t taosAddRef(int32_t rsetId, void *p) {
|
||||||
return rid;
|
return rid;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t taosRemoveRef(int32_t rsetId, int64_t rid) { return taosDecRefCount(rsetId, rid, 1); }
|
int32_t taosRemoveRef(int32_t rsetId, int64_t rid) { return taosDecRefCount(rsetId, rid, 1, NULL); }
|
||||||
|
|
||||||
// if rid is 0, return the first p in hash list, otherwise, return the next after current rid
|
// if rid is 0, return the first p in hash list, otherwise, return the next after current rid
|
||||||
void *taosAcquireRef(int32_t rsetId, int64_t rid) {
|
void *taosAcquireRef(int32_t rsetId, int64_t rid) {
|
||||||
|
@ -251,7 +251,8 @@ void *taosAcquireRef(int32_t rsetId, int64_t rid) {
|
||||||
return p;
|
return p;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t taosReleaseRef(int32_t rsetId, int64_t rid) { return taosDecRefCount(rsetId, rid, 0); }
|
int32_t taosReleaseRef(int32_t rsetId, int64_t rid) { return taosDecRefCount(rsetId, rid, 0, NULL); }
|
||||||
|
int32_t taosReleaseRefEx(int32_t rsetId, int64_t rid, int32_t* isReleased) { return taosDecRefCount(rsetId, rid, 0, isReleased); }
|
||||||
|
|
||||||
// if rid is 0, return the first p in hash list, otherwise, return the next after current rid
|
// if rid is 0, return the first p in hash list, otherwise, return the next after current rid
|
||||||
void *taosIterateRef(int32_t rsetId, int64_t rid) {
|
void *taosIterateRef(int32_t rsetId, int64_t rid) {
|
||||||
|
@ -384,7 +385,7 @@ int32_t taosListRef() {
|
||||||
return num;
|
return num;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t taosDecRefCount(int32_t rsetId, int64_t rid, int32_t remove) {
|
static int32_t taosDecRefCount(int32_t rsetId, int64_t rid, int32_t remove, int32_t* isReleased) {
|
||||||
int32_t hash;
|
int32_t hash;
|
||||||
SRefSet *pSet;
|
SRefSet *pSet;
|
||||||
SRefNode *pNode;
|
SRefNode *pNode;
|
||||||
|
@ -458,6 +459,10 @@ static int32_t taosDecRefCount(int32_t rsetId, int64_t rid, int32_t remove) {
|
||||||
taosDecRsetCount(pSet);
|
taosDecRsetCount(pSet);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (isReleased) {
|
||||||
|
*isReleased = released;
|
||||||
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue