[TD-98] fix memory leaks and invalid write
This commit is contained in:
parent
ebd6256ea9
commit
93bd14716c
|
@ -232,7 +232,7 @@ void tscDoQuery(SSqlObj* pSql);
|
||||||
* @param pPrevSql
|
* @param pPrevSql
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void* param, SSqlObj* pPrevSql);
|
SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void* param, int32_t cmd, SSqlObj* pPrevSql);
|
||||||
void addGroupInfoForSubquery(SSqlObj* pParentObj, SSqlObj* pSql, int32_t subClauseIndex, int32_t tableIndex);
|
void addGroupInfoForSubquery(SSqlObj* pParentObj, SSqlObj* pSql, int32_t subClauseIndex, int32_t tableIndex);
|
||||||
|
|
||||||
void doAddGroupColumnForSubquery(SQueryInfo* pQueryInfo, int32_t tagIndex);
|
void doAddGroupColumnForSubquery(SQueryInfo* pQueryInfo, int32_t tagIndex);
|
||||||
|
|
|
@ -1070,14 +1070,14 @@ int doParseInsertSql(SSqlObj *pSql, char *str) {
|
||||||
* interrupted position.
|
* interrupted position.
|
||||||
*/
|
*/
|
||||||
if (TSDB_CODE_ACTION_IN_PROGRESS == code) {
|
if (TSDB_CODE_ACTION_IN_PROGRESS == code) {
|
||||||
tscTrace("async insert and waiting to get meter meta, then continue parse sql from offset: %" PRId64, pos);
|
tscTrace("%p waiting for get table meta during insert, then resume from offset: %" PRId64 " , %s", pSql,
|
||||||
|
pos, pSql->asyncTblPos);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo add to return
|
// todo add to return
|
||||||
tscError("async insert parse error, code:%d, %s", code, tstrerror(code));
|
tscError("%p async insert parse error, code:%d, %s", pSql, code, tstrerror(code));
|
||||||
pSql->asyncTblPos = NULL;
|
pSql->asyncTblPos = NULL;
|
||||||
|
|
||||||
goto _error_clean; // TODO: should _clean or _error_clean to async flow ????
|
goto _error_clean; // TODO: should _clean or _error_clean to async flow ????
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -279,7 +279,7 @@ int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) {
|
||||||
assert(pSubQueryInfo->exprsInfo.numOfExprs == 1); // ts_comp query only requires one resutl columns
|
assert(pSubQueryInfo->exprsInfo.numOfExprs == 1); // ts_comp query only requires one resutl columns
|
||||||
taos_free_result(pPrevSub);
|
taos_free_result(pPrevSub);
|
||||||
|
|
||||||
SSqlObj *pNew = createSubqueryObj(pSql, (int16_t) i, tscJoinQueryCallback, pSupporter, NULL);
|
SSqlObj *pNew = createSubqueryObj(pSql, (int16_t) i, tscJoinQueryCallback, TSDB_SQL_SELECT, pSupporter, NULL);
|
||||||
if (pNew == NULL) {
|
if (pNew == NULL) {
|
||||||
tscDestroyJoinSupporter(pSupporter);
|
tscDestroyJoinSupporter(pSupporter);
|
||||||
success = false;
|
success = false;
|
||||||
|
@ -834,7 +834,7 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSubquerySu
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
SSqlObj *pNew = createSubqueryObj(pSql, tableIndex, tscJoinQueryCallback, pSupporter, NULL);
|
SSqlObj *pNew = createSubqueryObj(pSql, tableIndex, tscJoinQueryCallback, TSDB_SQL_SELECT, pSupporter, NULL);
|
||||||
if (pNew == NULL) {
|
if (pNew == NULL) {
|
||||||
return TSDB_CODE_CLI_OUT_OF_MEMORY;
|
return TSDB_CODE_CLI_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
@ -1386,7 +1386,7 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR
|
||||||
static SSqlObj *tscCreateSqlObjForSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj *prevSqlObj) {
|
static SSqlObj *tscCreateSqlObjForSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj *prevSqlObj) {
|
||||||
const int32_t table_index = 0;
|
const int32_t table_index = 0;
|
||||||
|
|
||||||
SSqlObj *pNew = createSubqueryObj(pSql, table_index, tscRetrieveDataRes, trsupport, prevSqlObj);
|
SSqlObj *pNew = createSubqueryObj(pSql, table_index, tscRetrieveDataRes, trsupport, TSDB_SQL_SELECT, prevSqlObj);
|
||||||
if (pNew != NULL) { // the sub query of two-stage super table query
|
if (pNew != NULL) { // the sub query of two-stage super table query
|
||||||
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
|
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
|
||||||
pQueryInfo->type |= TSDB_QUERY_TYPE_STABLE_SUBQUERY;
|
pQueryInfo->type |= TSDB_QUERY_TYPE_STABLE_SUBQUERY;
|
||||||
|
@ -1545,7 +1545,7 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
|
||||||
pSupporter->pSql = pSql;
|
pSupporter->pSql = pSql;
|
||||||
pSupporter->pState = pState;
|
pSupporter->pState = pState;
|
||||||
|
|
||||||
SSqlObj *pNew = createSubqueryObj(pSql, 0, multiVnodeInsertMerge, pSupporter, NULL);
|
SSqlObj *pNew = createSubqueryObj(pSql, 0, multiVnodeInsertMerge, pSupporter, TSDB_SQL_INSERT, NULL);
|
||||||
if (pNew == NULL) {
|
if (pNew == NULL) {
|
||||||
tscError("%p failed to malloc buffer for subObj, orderOfSub:%d, reason:%s", pSql, i, strerror(errno));
|
tscError("%p failed to malloc buffer for subObj, orderOfSub:%d, reason:%s", pSql, i, strerror(errno));
|
||||||
break;
|
break;
|
||||||
|
@ -1563,7 +1563,6 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
|
||||||
|
|
||||||
for (int32_t j = 0; j < pSql->numOfSubs; ++j) {
|
for (int32_t j = 0; j < pSql->numOfSubs; ++j) {
|
||||||
SSqlObj *pSub = pSql->pSubs[j];
|
SSqlObj *pSub = pSql->pSubs[j];
|
||||||
pSub->cmd.command = TSDB_SQL_INSERT;
|
|
||||||
int32_t code = tscCopyDataBlockToPayload(pSub, pDataBlocks->pData[j]);
|
int32_t code = tscCopyDataBlockToPayload(pSub, pDataBlocks->pData[j]);
|
||||||
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
|
|
@ -192,7 +192,7 @@ SVnodeSidList* tscGetVnodeSidList(SSuperTableMeta* pMetricmeta, int32_t vnodeIdx
|
||||||
|
|
||||||
return (SVnodeSidList*)(pMetricmeta->list[vnodeIdx] + (char*)pMetricmeta);
|
return (SVnodeSidList*)(pMetricmeta->list[vnodeIdx] + (char*)pMetricmeta);
|
||||||
#endif
|
#endif
|
||||||
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
STableIdInfo* tscGetMeterSidInfo(SVnodeSidList* pSidList, int32_t idx) {
|
STableIdInfo* tscGetMeterSidInfo(SVnodeSidList* pSidList, int32_t idx) {
|
||||||
|
@ -662,7 +662,7 @@ int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOff
|
||||||
strncpy(dataBuf->tableId, name, TSDB_TABLE_ID_LEN);
|
strncpy(dataBuf->tableId, name, TSDB_TABLE_ID_LEN);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* The metermeta may be released since the metermeta cache are completed clean by other thread
|
* The table meta may be released since the table meta cache are completed clean by other thread
|
||||||
* due to operation such as drop database. So here we add the reference count directly instead of invoke
|
* due to operation such as drop database. So here we add the reference count directly instead of invoke
|
||||||
* taosGetDataFromCache, which may return NULL value.
|
* taosGetDataFromCache, which may return NULL value.
|
||||||
*/
|
*/
|
||||||
|
@ -1755,29 +1755,8 @@ bool tscShouldFreeAsyncSqlObj(SSqlObj* pSql) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t command = pSql->cmd.command;
|
int32_t command = pSql->cmd.command;
|
||||||
if (command == TSDB_SQL_CONNECT) {
|
if (command == TSDB_SQL_CONNECT || command == TSDB_SQL_INSERT) {
|
||||||
return true;
|
return true;
|
||||||
}
|
|
||||||
|
|
||||||
if (command == TSDB_SQL_INSERT) {
|
|
||||||
SSqlCmd* pCmd = &pSql->cmd;
|
|
||||||
|
|
||||||
/*
|
|
||||||
* in case of multi-vnode insertion, the object should not be released until all
|
|
||||||
* data blocks have been submit to vnode.
|
|
||||||
*/
|
|
||||||
SDataBlockList* pDataBlocks = pCmd->pDataBlocks;
|
|
||||||
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
|
|
||||||
|
|
||||||
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
|
||||||
assert(pQueryInfo->numOfTables == 1 || pQueryInfo->numOfTables == 2);
|
|
||||||
|
|
||||||
if (pDataBlocks == NULL || pTableMetaInfo->vnodeIndex >= pDataBlocks->nSize) {
|
|
||||||
tscTrace("%p object should be release since all data blocks have been submit", pSql);
|
|
||||||
return true;
|
|
||||||
} else {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
return tscKeepConn[command] == 0 ||
|
return tscKeepConn[command] == 0 ||
|
||||||
(pSql->res.code != TSDB_CODE_ACTION_IN_PROGRESS && pSql->res.code != TSDB_CODE_SUCCESS);
|
(pSql->res.code != TSDB_CODE_ACTION_IN_PROGRESS && pSql->res.code != TSDB_CODE_SUCCESS);
|
||||||
|
@ -1997,7 +1976,7 @@ void tscResetForNextRetrieve(SSqlRes* pRes) {
|
||||||
pRes->numOfRows = 0;
|
pRes->numOfRows = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void* param, SSqlObj* pPrevSql) {
|
SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void* param, int32_t cmd, SSqlObj* pPrevSql) {
|
||||||
SSqlCmd* pCmd = &pSql->cmd;
|
SSqlCmd* pCmd = &pSql->cmd;
|
||||||
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, tableIndex);
|
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, tableIndex);
|
||||||
|
|
||||||
|
@ -2020,7 +1999,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
|
||||||
|
|
||||||
memcpy(&pNew->cmd, pCmd, sizeof(SSqlCmd));
|
memcpy(&pNew->cmd, pCmd, sizeof(SSqlCmd));
|
||||||
|
|
||||||
pNew->cmd.command = TSDB_SQL_SELECT;
|
pNew->cmd.command = cmd;
|
||||||
pNew->cmd.payload = NULL;
|
pNew->cmd.payload = NULL;
|
||||||
pNew->cmd.allocSize = 0;
|
pNew->cmd.allocSize = 0;
|
||||||
|
|
||||||
|
@ -2105,25 +2084,31 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
|
||||||
pNew->param = param;
|
pNew->param = param;
|
||||||
|
|
||||||
char key[TSDB_MAX_TAGS_LEN + 1] = {0};
|
char key[TSDB_MAX_TAGS_LEN + 1] = {0};
|
||||||
tscGetMetricMetaCacheKey(pQueryInfo, key, uid);
|
if (cmd == TSDB_SQL_SELECT) {
|
||||||
|
tscGetMetricMetaCacheKey(pQueryInfo, key, uid);
|
||||||
|
}
|
||||||
|
|
||||||
#ifdef _DEBUG_VIEW
|
#ifdef _DEBUG_VIEW
|
||||||
printf("the metricmeta key is:%s\n", key);
|
tscTrace("the metricmeta key is:%s", key);
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
char* name = pTableMetaInfo->name;
|
char* name = pTableMetaInfo->name;
|
||||||
STableMetaInfo* pFinalInfo = NULL;
|
STableMetaInfo* pFinalInfo = NULL;
|
||||||
|
|
||||||
if (pPrevSql == NULL) {
|
if (pPrevSql == NULL) {
|
||||||
STableMeta* pTableMeta = taosCacheAcquireByName(tscCacheHandle, name);
|
STableMeta* pTableMeta = taosCacheAcquireByName(tscCacheHandle, name);
|
||||||
SSuperTableMeta* pMetricMeta = taosCacheAcquireByName(tscCacheHandle, key);
|
|
||||||
|
SSuperTableMeta* pMetricMeta = NULL;
|
||||||
|
if (cmd == TSDB_SQL_SELECT) {
|
||||||
|
pMetricMeta = taosCacheAcquireByName(tscCacheHandle, key);
|
||||||
|
}
|
||||||
|
|
||||||
pFinalInfo = tscAddMeterMetaInfo(pNewQueryInfo, name, pTableMeta, pMetricMeta, pTableMetaInfo->numOfTags,
|
pFinalInfo = tscAddMeterMetaInfo(pNewQueryInfo, name, pTableMeta, pMetricMeta, pTableMetaInfo->numOfTags,
|
||||||
pTableMetaInfo->tagColumnIndex);
|
pTableMetaInfo->tagColumnIndex);
|
||||||
} else { // transfer the ownership of pTableMeta/pMetricMeta to the newly create sql object.
|
} else { // transfer the ownership of pTableMeta/pMetricMeta to the newly create sql object.
|
||||||
STableMetaInfo* pPrevInfo = tscGetTableMetaInfoFromCmd(&pPrevSql->cmd, pPrevSql->cmd.clauseIndex, 0);
|
// STableMetaInfo* pPrevInfo = tscGetTableMetaInfoFromCmd(&pPrevSql->cmd, pPrevSql->cmd.clauseIndex, 0);
|
||||||
|
|
||||||
STableMeta* pPrevMeterMeta = taosCacheTransfer(tscCacheHandle, (void**)&pPrevInfo->pTableMeta);
|
// STableMeta* pPrevMeterMeta = taosCacheTransfer(tscCacheHandle, (void**)&pPrevInfo->pTableMeta);
|
||||||
// SSuperTableMeta* pPrevMetricMeta = taosCacheTransfer(tscCacheHandle, (void**)&pPrevInfo->pMetricMeta);
|
// SSuperTableMeta* pPrevMetricMeta = taosCacheTransfer(tscCacheHandle, (void**)&pPrevInfo->pMetricMeta);
|
||||||
|
|
||||||
// pFinalInfo = tscAddMeterMetaInfo(pNewQueryInfo, name, pPrevMeterMeta, pPrevMetricMeta, pTableMetaInfo->numOfTags,
|
// pFinalInfo = tscAddMeterMetaInfo(pNewQueryInfo, name, pPrevMeterMeta, pPrevMetricMeta, pTableMetaInfo->numOfTags,
|
||||||
|
@ -2135,14 +2120,18 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
|
||||||
// assert(pFinalInfo->pMetricMeta != NULL);
|
// assert(pFinalInfo->pMetricMeta != NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
tscTrace(
|
if (cmd == TSDB_SQL_SELECT) {
|
||||||
"%p new subquery: %p, tableIndex:%d, vnodeIdx:%d, type:%d, exprInfo:%d, colList:%d,"
|
tscTrace(
|
||||||
"fieldInfo:%d, name:%s, qrang:%" PRId64 " - %" PRId64 " order:%d, limit:%" PRId64,
|
"%p new subquery: %p, tableIndex:%d, vnodeIdx:%d, type:%d, exprInfo:%d, colList:%d,"
|
||||||
pSql, pNew, tableIndex, pTableMetaInfo->vnodeIndex, pNewQueryInfo->type, pNewQueryInfo->exprsInfo.numOfExprs,
|
"fieldInfo:%d, name:%s, qrang:%" PRId64 " - %" PRId64 " order:%d, limit:%" PRId64,
|
||||||
pNewQueryInfo->colList.numOfCols, pNewQueryInfo->fieldsInfo.numOfOutputCols, pFinalInfo->name, pNewQueryInfo->stime,
|
pSql, pNew, tableIndex, pTableMetaInfo->vnodeIndex, pNewQueryInfo->type, pNewQueryInfo->exprsInfo.numOfExprs,
|
||||||
pNewQueryInfo->etime, pNewQueryInfo->order.order, pNewQueryInfo->limit.limit);
|
pNewQueryInfo->colList.numOfCols, pNewQueryInfo->fieldsInfo.numOfOutputCols, pFinalInfo->name, pNewQueryInfo->stime,
|
||||||
|
pNewQueryInfo->etime, pNewQueryInfo->order.order, pNewQueryInfo->limit.limit);
|
||||||
tscPrintSelectClause(pNew, 0);
|
|
||||||
|
tscPrintSelectClause(pNew, 0);
|
||||||
|
} else {
|
||||||
|
tscTrace("%p new sub insertion: %p, vnodeIdx:%d", pSql, pNew, pTableMetaInfo->vnodeIndex);
|
||||||
|
}
|
||||||
|
|
||||||
return pNew;
|
return pNew;
|
||||||
}
|
}
|
||||||
|
@ -2226,12 +2215,12 @@ char* tscGetErrorMsgPayload(SSqlCmd* pCmd) { return pCmd->payload; }
|
||||||
* in case of multi-vnode super table projection query and the result does not reach the limitation.
|
* in case of multi-vnode super table projection query and the result does not reach the limitation.
|
||||||
*/
|
*/
|
||||||
bool hasMoreVnodesToTry(SSqlObj* pSql) {
|
bool hasMoreVnodesToTry(SSqlObj* pSql) {
|
||||||
SSqlCmd* pCmd = &pSql->cmd;
|
// SSqlCmd* pCmd = &pSql->cmd;
|
||||||
SSqlRes* pRes = &pSql->res;
|
// SSqlRes* pRes = &pSql->res;
|
||||||
|
|
||||||
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
|
// SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
|
||||||
|
|
||||||
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
// STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
||||||
// if (!UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo) || (pTableMetaInfo->pMetricMeta == NULL)) {
|
// if (!UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo) || (pTableMetaInfo->pMetricMeta == NULL)) {
|
||||||
return false;
|
return false;
|
||||||
// }
|
// }
|
||||||
|
|
|
@ -392,9 +392,9 @@ typedef struct SSqlFuncExprMsg {
|
||||||
} SSqlFuncExprMsg;
|
} SSqlFuncExprMsg;
|
||||||
|
|
||||||
typedef struct SSqlBinaryExprInfo {
|
typedef struct SSqlBinaryExprInfo {
|
||||||
struct tSQLBinaryExpr *pBinExpr; /* for binary expression */
|
struct tExprNode *pBinExpr; /* for binary expression */
|
||||||
int32_t numOfCols; /* binary expression involves the readed number of columns*/
|
int32_t numOfCols; /* binary expression involves the readed number of columns*/
|
||||||
SColIndexEx * pReqColumns; /* source column list */
|
SColIndexEx * pReqColumns; /* source column list */
|
||||||
} SSqlBinaryExprInfo;
|
} SSqlBinaryExprInfo;
|
||||||
|
|
||||||
typedef struct SSqlFunctionExpr {
|
typedef struct SSqlFunctionExpr {
|
||||||
|
|
|
@ -1191,7 +1191,7 @@ static void *mgmtBuildCreateChildTableMsg(SCMCreateTableMsg *pMsg, SChildTableOb
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
memcpy(pCreate->tableId, pTable->info.tableId, TSDB_TABLE_ID_LEN + 1);
|
memcpy(pCreate->tableId, pTable->info.tableId, TSDB_TABLE_ID_LEN);
|
||||||
pCreate->contLen = htonl(contLen);
|
pCreate->contLen = htonl(contLen);
|
||||||
pCreate->vgId = htonl(pTable->vgId);
|
pCreate->vgId = htonl(pTable->vgId);
|
||||||
pCreate->tableType = pTable->info.type;
|
pCreate->tableType = pTable->info.type;
|
||||||
|
@ -1318,7 +1318,7 @@ static SChildTableObj* mgmtDoCreateChildTable(SCMCreateTableMsg *pCreate, SVgObj
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
mTrace("table:%s, create ctable in vgroup, uid:%" PRIu64 , pTable->info.tableId, pTable->uid);
|
mTrace("table:%s, create table in vgroup, id:%d, uid:%" PRIu64 , pTable->info.tableId, pTable->sid, pTable->uid);
|
||||||
return pTable;
|
return pTable;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1639,7 +1639,7 @@ void mgmtGetChildTableMeta(SQueuedMsg *pMsg) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
memcpy(pCreateMsg->schema, pInfo->tags, sizeof(STagData));
|
memcpy(pCreateMsg->schema, pInfo->tags, sizeof(STagData));
|
||||||
strcpy(pCreateMsg->tableId, pInfo->tableId);
|
strncpy(pCreateMsg->tableId, pInfo->tableId, tListLen(pInfo->tableId));
|
||||||
|
|
||||||
SQueuedMsg *newMsg = malloc(sizeof(SQueuedMsg));
|
SQueuedMsg *newMsg = malloc(sizeof(SQueuedMsg));
|
||||||
memcpy(newMsg, pMsg, sizeof(SQueuedMsg));
|
memcpy(newMsg, pMsg, sizeof(SQueuedMsg));
|
||||||
|
|
|
@ -450,10 +450,10 @@ void *taosCacheAcquireByName(SCacheObj *pCacheObj, const char *key) {
|
||||||
|
|
||||||
if (ptNode != NULL) {
|
if (ptNode != NULL) {
|
||||||
atomic_add_fetch_32(&pCacheObj->statistics.hitCount, 1);
|
atomic_add_fetch_32(&pCacheObj->statistics.hitCount, 1);
|
||||||
pTrace("key:%s is retrieved from cache,refcnt:%d", key, T_REF_VAL_GET(*ptNode));
|
pTrace("key:%s is retrieved from cache, %p refcnt:%d", key, (*ptNode), T_REF_VAL_GET(*ptNode));
|
||||||
} else {
|
} else {
|
||||||
atomic_add_fetch_32(&pCacheObj->statistics.missCount, 1);
|
atomic_add_fetch_32(&pCacheObj->statistics.missCount, 1);
|
||||||
pTrace("key:%s not in cache,retrieved failed", key);
|
pTrace("key:%s not in cache, retrieved failed", key);
|
||||||
}
|
}
|
||||||
|
|
||||||
atomic_add_fetch_32(&pCacheObj->statistics.totalAccess, 1);
|
atomic_add_fetch_32(&pCacheObj->statistics.totalAccess, 1);
|
||||||
|
@ -472,7 +472,7 @@ void *taosCacheAcquireByData(SCacheObj *pCacheObj, void *data) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t ref = T_REF_INC(ptNode);
|
int32_t ref = T_REF_INC(ptNode);
|
||||||
pTrace("%p addedTime ref data in cache, refCnt:%d", data, ref)
|
pTrace("%p add data ref in cache, refcnt:%d", ptNode, ref)
|
||||||
|
|
||||||
// the data if referenced by at least one object, so the reference count must be greater than the value of 2.
|
// the data if referenced by at least one object, so the reference count must be greater than the value of 2.
|
||||||
assert(ref >= 2);
|
assert(ref >= 2);
|
||||||
|
@ -515,17 +515,15 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
|
||||||
}
|
}
|
||||||
|
|
||||||
*data = NULL;
|
*data = NULL;
|
||||||
|
int16_t ref = T_REF_DEC(pNode);
|
||||||
|
pTrace("%p is released, refcnt:%d", pNode, ref);
|
||||||
|
|
||||||
if (_remove) {
|
if (_remove) {
|
||||||
__cache_wr_lock(pCacheObj);
|
__cache_wr_lock(pCacheObj);
|
||||||
// pNode may be released immediately by other thread after the reference count of pNode is set to 0,
|
// pNode may be released immediately by other thread after the reference count of pNode is set to 0,
|
||||||
// So we need to lock it in the first place.
|
// So we need to lock it in the first place.
|
||||||
T_REF_DEC(pNode);
|
|
||||||
taosCacheMoveToTrash(pCacheObj, pNode);
|
taosCacheMoveToTrash(pCacheObj, pNode);
|
||||||
|
|
||||||
__cache_unlock(pCacheObj);
|
__cache_unlock(pCacheObj);
|
||||||
} else {
|
|
||||||
T_REF_DEC(pNode);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue