commit
cef01df630
|
@ -30,6 +30,7 @@ extern "C" {
|
|||
#include "tsqlfunction.h"
|
||||
#include "tutil.h"
|
||||
#include "tcache.h"
|
||||
#include "tref.h"
|
||||
|
||||
#include "qExecutor.h"
|
||||
#include "qSqlparser.h"
|
||||
|
@ -446,7 +447,7 @@ void tscFreeSqlObj(SSqlObj *pSql);
|
|||
void tscFreeRegisteredSqlObj(void *pSql);
|
||||
void tscFreeTableMetaHelper(void *pTableMeta);
|
||||
|
||||
void tscCloseTscObj(STscObj *pObj);
|
||||
void tscCloseTscObj(void *pObj);
|
||||
|
||||
// todo move to taos? or create a new file: taos_internal.h
|
||||
TAOS *taos_connect_a(char *ip, char *user, char *pass, char *db, uint16_t port, void (*fp)(void *, TAOS_RES *, int),
|
||||
|
@ -516,6 +517,7 @@ extern void * tscQhandle;
|
|||
extern int tscKeepConn[];
|
||||
extern int tsInsertHeadSize;
|
||||
extern int tscNumOfThreads;
|
||||
extern int tscRefId;
|
||||
|
||||
extern SRpcCorEpSet tscMgmtEpSet;
|
||||
|
||||
|
|
|
@ -190,18 +190,19 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
|
|||
|
||||
void tscProcessActivityTimer(void *handle, void *tmrId) {
|
||||
STscObj *pObj = (STscObj *)handle;
|
||||
if (pObj == NULL || pObj->signature != pObj) {
|
||||
|
||||
int ret = taosAcquireRef(tscRefId, pObj);
|
||||
if (ret < 0) {
|
||||
tscTrace("%p failed to acquire TSC obj, reason:%s", pObj, tstrerror(ret));
|
||||
return;
|
||||
}
|
||||
|
||||
SSqlObj* pHB = pObj->pHb;
|
||||
if (pObj->pTimer != tmrId || pHB == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
void** p = taosCacheAcquireByKey(tscObjCache, &pHB, sizeof(TSDB_CACHE_PTR_TYPE));
|
||||
if (p == NULL) {
|
||||
tscWarn("%p HB object has been released already", pHB);
|
||||
taosReleaseRef(tscRefId, pObj);
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -213,6 +214,8 @@ void tscProcessActivityTimer(void *handle, void *tmrId) {
|
|||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tscError("%p failed to sent HB to server, reason:%s", pHB, tstrerror(code));
|
||||
}
|
||||
|
||||
taosReleaseRef(tscRefId, pObj);
|
||||
}
|
||||
|
||||
int tscSendMsgToServer(SSqlObj *pSql) {
|
||||
|
|
|
@ -161,6 +161,7 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa
|
|||
registerSqlObj(pSql);
|
||||
tsInsertHeadSize = sizeof(SMsgDesc) + sizeof(SSubmitMsg);
|
||||
|
||||
taosAddRef(tscRefId, pObj);
|
||||
return pSql;
|
||||
}
|
||||
|
||||
|
@ -296,7 +297,8 @@ void taos_close(TAOS *taos) {
|
|||
}
|
||||
|
||||
tscDebug("%p all sqlObj are freed, free tscObj and close dnodeConn:%p", pObj, pObj->pDnodeConn);
|
||||
tscCloseTscObj(pObj);
|
||||
|
||||
taosRemoveRef(tscRefId, pObj);
|
||||
}
|
||||
|
||||
void waitForQueryRsp(void *param, TAOS_RES *tres, int code) {
|
||||
|
|
|
@ -36,6 +36,7 @@ void * tscTmr;
|
|||
void * tscQhandle;
|
||||
void * tscCheckDiskUsageTmr;
|
||||
int tsInsertHeadSize;
|
||||
int tscRefId;
|
||||
|
||||
int tscNumOfThreads;
|
||||
|
||||
|
@ -146,6 +147,8 @@ void taos_init_imp(void) {
|
|||
tscObjCache = taosCacheInit(TSDB_CACHE_PTR_KEY, refreshTime / 2, false, tscFreeRegisteredSqlObj, "sqlObj");
|
||||
}
|
||||
|
||||
tscRefId = taosOpenRef(200, tscCloseTscObj);
|
||||
|
||||
tscDebug("client is initialized successfully");
|
||||
}
|
||||
|
||||
|
@ -165,6 +168,7 @@ void taos_cleanup() {
|
|||
tscQhandle = NULL;
|
||||
}
|
||||
|
||||
taosCloseRef(tscRefId);
|
||||
taosCleanupKeywordsTable();
|
||||
taosCloseLog();
|
||||
|
||||
|
|
|
@ -404,7 +404,7 @@ void tscFreeRegisteredSqlObj(void *pSql) {
|
|||
tscDebug("%p free sqlObj completed, tscObj:%p ref:%d", *p, pTscObj, ref);
|
||||
if (ref == 0) {
|
||||
tscDebug("%p all sqlObj freed, free tscObj:%p", *p, pTscObj);
|
||||
tscCloseTscObj(pTscObj);
|
||||
taosRemoveRef(tscRefId, pTscObj);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -786,8 +786,8 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SArray* pTableDataBlockList) {
|
|||
}
|
||||
|
||||
// TODO: all subqueries should be freed correctly before close this connection.
|
||||
void tscCloseTscObj(STscObj* pObj) {
|
||||
assert(pObj != NULL);
|
||||
void tscCloseTscObj(void *param) {
|
||||
STscObj *pObj = param;
|
||||
|
||||
pObj->signature = NULL;
|
||||
taosTmrStopA(&(pObj->pTimer));
|
||||
|
|
|
@ -1 +1 @@
|
|||
Subproject commit 8d7bf743852897110cbdcc7c4322cd7a74d4167b
|
||||
Subproject commit 8c58c512b6acda8bcdfa48fdc7140227b5221766
|
|
@ -28,7 +28,7 @@ extern "C" {
|
|||
#else
|
||||
#define TAOS_DEFINE_ERROR(name, mod, code, msg) static const int32_t name = (0x80000000 | ((mod)<<16) | (code));
|
||||
#endif
|
||||
|
||||
|
||||
#define TAOS_SYSTEM_ERROR(code) (0x80ff0000 | (code))
|
||||
#define TAOS_SUCCEEDED(err) ((err) >= 0)
|
||||
#define TAOS_FAILED(err) ((err) < 0)
|
||||
|
@ -37,7 +37,7 @@ const char* tstrerror(int32_t err);
|
|||
|
||||
int32_t* taosGetErrno();
|
||||
#define terrno (*taosGetErrno())
|
||||
|
||||
|
||||
#define TSDB_CODE_SUCCESS 0
|
||||
|
||||
#ifdef TAOS_ERROR_C
|
||||
|
@ -74,6 +74,12 @@ TAOS_DEFINE_ERROR(TSDB_CODE_COM_MEMORY_CORRUPTED, 0, 0x0101, "Memory cor
|
|||
TAOS_DEFINE_ERROR(TSDB_CODE_COM_OUT_OF_MEMORY, 0, 0x0102, "Out of memory")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_COM_INVALID_CFG_MSG, 0, 0x0103, "Invalid config message")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_COM_FILE_CORRUPTED, 0, 0x0104, "Data file corrupted")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_REF_NO_MEMORY, 0, 0x0105, "Ref out of memory")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_REF_FULL, 0, 0x0106, "too many Ref Objs")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_REF_ID_REMOVED, 0, 0x0107, "Ref ID is removed")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_REF_INVALID_ID, 0, 0x0108, "Invalid Ref ID")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_REF_ALREADY_EXIST, 0, 0x0109, "Ref is already there")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_REF_NOT_EXIST, 0, 0x010A, "Ref is not there")
|
||||
|
||||
//client
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_SQL, 0, 0x0200, "Invalid SQL statement")
|
||||
|
@ -182,7 +188,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_DND_OUT_OF_MEMORY, 0, 0x0401, "Dnode out
|
|||
TAOS_DEFINE_ERROR(TSDB_CODE_DND_NO_WRITE_ACCESS, 0, 0x0402, "No permission for disk files in dnode")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_DND_INVALID_MSG_LEN, 0, 0x0403, "Invalid message length")
|
||||
|
||||
// vnode
|
||||
// vnode
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_VND_ACTION_IN_PROGRESS, 0, 0x0500, "Action in progress")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_VND_MSG_NOT_PROCESSED, 0, 0x0501, "Message not processed")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_VND_ACTION_NEED_REPROCESSED, 0, 0x0502, "Action need to be reprocessed")
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
#include "ttimer.h"
|
||||
#include "tutil.h"
|
||||
#include "lz4.h"
|
||||
#include "tref.h"
|
||||
#include "taoserror.h"
|
||||
#include "tsocket.h"
|
||||
#include "tglobal.h"
|
||||
|
@ -72,7 +73,6 @@ typedef struct {
|
|||
SRpcInfo *pRpc; // associated SRpcInfo
|
||||
SRpcEpSet epSet; // ip list provided by app
|
||||
void *ahandle; // handle provided by app
|
||||
void *signature; // for validation
|
||||
struct SRpcConn *pConn; // pConn allocated
|
||||
char msgType; // message type
|
||||
uint8_t *pCont; // content provided by app
|
||||
|
@ -132,6 +132,10 @@ int tsRpcMaxRetry;
|
|||
int tsRpcHeadSize;
|
||||
int tsRpcOverhead;
|
||||
|
||||
static int tsRpcRefId = -1;
|
||||
static int32_t tsRpcNum = 0;
|
||||
static pthread_once_t tsRpcInit = PTHREAD_ONCE_INIT;
|
||||
|
||||
// server:0 client:1 tcp:2 udp:0
|
||||
#define RPC_CONN_UDPS 0
|
||||
#define RPC_CONN_UDPC 1
|
||||
|
@ -211,14 +215,21 @@ static void rpcUnlockConn(SRpcConn *pConn);
|
|||
static void rpcAddRef(SRpcInfo *pRpc);
|
||||
static void rpcDecRef(SRpcInfo *pRpc);
|
||||
|
||||
void *rpcOpen(const SRpcInit *pInit) {
|
||||
SRpcInfo *pRpc;
|
||||
static void rpcInit(void) {
|
||||
|
||||
tsProgressTimer = tsRpcTimer/2;
|
||||
tsRpcMaxRetry = tsRpcMaxTime * 1000/tsProgressTimer;
|
||||
tsRpcHeadSize = RPC_MSG_OVERHEAD;
|
||||
tsRpcOverhead = sizeof(SRpcReqContext);
|
||||
|
||||
tsRpcRefId = taosOpenRef(200, free);
|
||||
}
|
||||
|
||||
void *rpcOpen(const SRpcInit *pInit) {
|
||||
SRpcInfo *pRpc;
|
||||
|
||||
pthread_once(&tsRpcInit, rpcInit);
|
||||
|
||||
pRpc = (SRpcInfo *)calloc(1, sizeof(SRpcInfo));
|
||||
if (pRpc == NULL) return NULL;
|
||||
|
||||
|
@ -237,6 +248,8 @@ void *rpcOpen(const SRpcInit *pInit) {
|
|||
pRpc->afp = pInit->afp;
|
||||
pRpc->refCount = 1;
|
||||
|
||||
atomic_add_fetch_32(&tsRpcNum, 1);
|
||||
|
||||
size_t size = sizeof(SRpcConn) * pRpc->sessions;
|
||||
pRpc->connList = (SRpcConn *)calloc(1, size);
|
||||
if (pRpc->connList == NULL) {
|
||||
|
@ -363,7 +376,6 @@ void rpcSendRequest(void *shandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg) {
|
|||
int contLen = rpcCompressRpcMsg(pMsg->pCont, pMsg->contLen);
|
||||
pContext = (SRpcReqContext *) ((char*)pMsg->pCont-sizeof(SRpcHead)-sizeof(SRpcReqContext));
|
||||
pContext->ahandle = pMsg->ahandle;
|
||||
pContext->signature = pContext;
|
||||
pContext->pRpc = (SRpcInfo *)shandle;
|
||||
pContext->epSet = *pEpSet;
|
||||
pContext->contLen = contLen;
|
||||
|
@ -386,6 +398,7 @@ void rpcSendRequest(void *shandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg) {
|
|||
// set the handle to pContext, so app can cancel the request
|
||||
if (pMsg->handle) *((void **)pMsg->handle) = pContext;
|
||||
|
||||
taosAddRef(tsRpcRefId, pContext);
|
||||
rpcSendReqToServer(pRpc, pContext);
|
||||
|
||||
return;
|
||||
|
@ -536,14 +549,15 @@ int rpcReportProgress(void *handle, char *pCont, int contLen) {
|
|||
void rpcCancelRequest(void *handle) {
|
||||
SRpcReqContext *pContext = handle;
|
||||
|
||||
// signature is used to check if pContext is freed.
|
||||
// pContext may have been released just before app calls the rpcCancelRequest
|
||||
if (pContext == NULL || pContext->signature != pContext) return;
|
||||
int code = taosAcquireRef(tsRpcRefId, pContext);
|
||||
if (code < 0) return;
|
||||
|
||||
if (pContext->pConn) {
|
||||
tDebug("%s, app tries to cancel request", pContext->pConn->info);
|
||||
rpcCloseConn(pContext->pConn);
|
||||
}
|
||||
|
||||
taosReleaseRef(tsRpcRefId, pContext);
|
||||
}
|
||||
|
||||
static void rpcFreeMsg(void *msg) {
|
||||
|
@ -612,7 +626,7 @@ static void rpcReleaseConn(SRpcConn *pConn) {
|
|||
// if there is an outgoing message, free it
|
||||
if (pConn->outType && pConn->pReqMsg) {
|
||||
if (pConn->pContext) pConn->pContext->pConn = NULL;
|
||||
rpcFreeMsg(pConn->pReqMsg);
|
||||
taosRemoveRef(tsRpcRefId, pConn->pContext);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1068,7 +1082,6 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) {
|
|||
static void rpcNotifyClient(SRpcReqContext *pContext, SRpcMsg *pMsg) {
|
||||
SRpcInfo *pRpc = pContext->pRpc;
|
||||
|
||||
pContext->signature = NULL;
|
||||
pContext->pConn = NULL;
|
||||
if (pContext->pRsp) {
|
||||
// for synchronous API
|
||||
|
@ -1085,7 +1098,7 @@ static void rpcNotifyClient(SRpcReqContext *pContext, SRpcMsg *pMsg) {
|
|||
}
|
||||
|
||||
// free the request message
|
||||
rpcFreeCont(pContext->pCont);
|
||||
taosRemoveRef(tsRpcRefId, pContext);
|
||||
}
|
||||
|
||||
static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqContext *pContext) {
|
||||
|
@ -1593,6 +1606,12 @@ static void rpcDecRef(SRpcInfo *pRpc)
|
|||
pthread_mutex_destroy(&pRpc->mutex);
|
||||
tDebug("%s rpc resources are released", pRpc->label);
|
||||
taosTFree(pRpc);
|
||||
|
||||
int count = atomic_sub_fetch_32(&tsRpcNum, 1);
|
||||
if (count == 0) {
|
||||
taosCloseRef(tsRpcRefId);
|
||||
// tsRpcInit = PTHREAD_ONCE_INIT; // windows compliling error
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,38 @@
|
|||
|
||||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#ifndef TDENGINE_TREF_H
|
||||
#define TDENGINE_TREF_H
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
int taosOpenRef(int max, void (*fp)(void *)); // return refId which will be used by other APIs
|
||||
void taosCloseRef(int refId);
|
||||
int taosListRef(); // return the number of references in system
|
||||
int taosAddRef(int refId, void *p);
|
||||
int taosAcquireRef(int refId, void *p);
|
||||
void taosReleaseRef(int refId, void *p);
|
||||
|
||||
#define taosRemoveRef taosReleaseRef
|
||||
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif // TDENGINE_TREF_H
|
|
@ -0,0 +1,402 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "os.h"
|
||||
#include "taoserror.h"
|
||||
#include "tulog.h"
|
||||
#include "tutil.h"
|
||||
|
||||
#define TSDB_REF_OBJECTS 50
|
||||
#define TSDB_REF_STATE_EMPTY 0
|
||||
#define TSDB_REF_STATE_ACTIVE 1
|
||||
#define TSDB_REF_STATE_DELETED 2
|
||||
|
||||
typedef struct SRefNode {
|
||||
struct SRefNode *prev;
|
||||
struct SRefNode *next;
|
||||
void *p;
|
||||
int32_t count;
|
||||
} SRefNode;
|
||||
|
||||
typedef struct {
|
||||
SRefNode **nodeList;
|
||||
int state; // 0: empty, 1: active; 2: deleted
|
||||
int refId;
|
||||
int max;
|
||||
int32_t count; // total number of SRefNodes in this set
|
||||
int64_t *lockedBy;
|
||||
void (*fp)(void *);
|
||||
} SRefSet;
|
||||
|
||||
static SRefSet tsRefSetList[TSDB_REF_OBJECTS];
|
||||
static pthread_once_t tsRefModuleInit = PTHREAD_ONCE_INIT;
|
||||
static pthread_mutex_t tsRefMutex;
|
||||
static int tsRefSetNum = 0;
|
||||
static int tsNextId = 0;
|
||||
|
||||
static void taosInitRefModule(void);
|
||||
static int taosHashRef(SRefSet *pSet, void *p);
|
||||
static void taosLockList(int64_t *lockedBy);
|
||||
static void taosUnlockList(int64_t *lockedBy);
|
||||
static void taosIncRefCount(SRefSet *pSet);
|
||||
static void taosDecRefCount(SRefSet *pSet);
|
||||
|
||||
int taosOpenRef(int max, void (*fp)(void *))
|
||||
{
|
||||
SRefNode **nodeList;
|
||||
SRefSet *pSet;
|
||||
int64_t *lockedBy;
|
||||
int i, refId;
|
||||
|
||||
pthread_once(&tsRefModuleInit, taosInitRefModule);
|
||||
|
||||
nodeList = calloc(sizeof(SRefNode *), (size_t)max);
|
||||
if (nodeList == NULL) {
|
||||
return TSDB_CODE_REF_NO_MEMORY;
|
||||
}
|
||||
|
||||
lockedBy = calloc(sizeof(int64_t), (size_t)max);
|
||||
if (lockedBy == NULL) {
|
||||
free(nodeList);
|
||||
return TSDB_CODE_REF_NO_MEMORY;
|
||||
}
|
||||
|
||||
pthread_mutex_lock(&tsRefMutex);
|
||||
|
||||
for (i = 0; i < TSDB_REF_OBJECTS; ++i) {
|
||||
tsNextId = (tsNextId + 1) % TSDB_REF_OBJECTS;
|
||||
if (tsRefSetList[tsNextId].state == TSDB_REF_STATE_EMPTY) break;
|
||||
}
|
||||
|
||||
if (i < TSDB_REF_OBJECTS) {
|
||||
refId = tsNextId;
|
||||
pSet = tsRefSetList + refId;
|
||||
taosIncRefCount(pSet);
|
||||
pSet->max = max;
|
||||
pSet->nodeList = nodeList;
|
||||
pSet->lockedBy = lockedBy;
|
||||
pSet->fp = fp;
|
||||
pSet->state = TSDB_REF_STATE_ACTIVE;
|
||||
pSet->refId = refId;
|
||||
|
||||
tsRefSetNum++;
|
||||
uTrace("refId:%d is opened, max:%d, fp:%p refSetNum:%d", refId, max, fp, tsRefSetNum);
|
||||
} else {
|
||||
refId = TSDB_CODE_REF_FULL;
|
||||
free (nodeList);
|
||||
free (lockedBy);
|
||||
uTrace("run out of Ref ID, maximum:%d refSetNum:%d", TSDB_REF_OBJECTS, tsRefSetNum);
|
||||
}
|
||||
|
||||
pthread_mutex_unlock(&tsRefMutex);
|
||||
|
||||
return refId;
|
||||
}
|
||||
|
||||
void taosCloseRef(int refId)
|
||||
{
|
||||
SRefSet *pSet;
|
||||
int deleted = 0;
|
||||
|
||||
if (refId < 0 || refId >= TSDB_REF_OBJECTS) {
|
||||
uTrace("refId:%d is invalid, out of range", refId);
|
||||
return;
|
||||
}
|
||||
|
||||
pSet = tsRefSetList + refId;
|
||||
|
||||
pthread_mutex_lock(&tsRefMutex);
|
||||
|
||||
if (pSet->state == TSDB_REF_STATE_ACTIVE) {
|
||||
pSet->state = TSDB_REF_STATE_DELETED;
|
||||
deleted = 1;
|
||||
uTrace("refId:%d is closed, count:%d", refId, pSet->count);
|
||||
} else {
|
||||
uTrace("refId:%d is already closed, count:%d", refId, pSet->count);
|
||||
}
|
||||
|
||||
pthread_mutex_unlock(&tsRefMutex);
|
||||
|
||||
if (deleted) taosDecRefCount(pSet);
|
||||
}
|
||||
|
||||
int taosAddRef(int refId, void *p)
|
||||
{
|
||||
int hash;
|
||||
SRefNode *pNode;
|
||||
SRefSet *pSet;
|
||||
|
||||
if (refId < 0 || refId >= TSDB_REF_OBJECTS) {
|
||||
uTrace("refId:%d p:%p failed to add, refId not valid", refId, p);
|
||||
return TSDB_CODE_REF_INVALID_ID;
|
||||
}
|
||||
|
||||
uTrace("refId:%d p:%p try to add", refId, p);
|
||||
|
||||
pSet = tsRefSetList + refId;
|
||||
taosIncRefCount(pSet);
|
||||
if (pSet->state != TSDB_REF_STATE_ACTIVE) {
|
||||
taosDecRefCount(pSet);
|
||||
uTrace("refId:%d p:%p failed to add, not active", refId, p);
|
||||
return TSDB_CODE_REF_ID_REMOVED;
|
||||
}
|
||||
|
||||
int code = 0;
|
||||
hash = taosHashRef(pSet, p);
|
||||
|
||||
taosLockList(pSet->lockedBy+hash);
|
||||
|
||||
pNode = pSet->nodeList[hash];
|
||||
while ( pNode ) {
|
||||
if ( pNode->p == p )
|
||||
break;
|
||||
|
||||
pNode = pNode->next;
|
||||
}
|
||||
|
||||
if (pNode) {
|
||||
code = TSDB_CODE_REF_ALREADY_EXIST;
|
||||
uTrace("refId:%d p:%p is already there, faild to add", refId, p);
|
||||
} else {
|
||||
pNode = calloc(sizeof(SRefNode), 1);
|
||||
if (pNode) {
|
||||
pNode->p = p;
|
||||
pNode->count = 1;
|
||||
pNode->prev = 0;
|
||||
pNode->next = pSet->nodeList[hash];
|
||||
pSet->nodeList[hash] = pNode;
|
||||
uTrace("refId:%d p:%p is added, count::%d", refId, p, pSet->count);
|
||||
} else {
|
||||
code = TSDB_CODE_REF_NO_MEMORY;
|
||||
uTrace("refId:%d p:%p is not added, since no memory", refId, p);
|
||||
}
|
||||
}
|
||||
|
||||
if (code < 0) taosDecRefCount(pSet);
|
||||
|
||||
taosUnlockList(pSet->lockedBy+hash);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
int taosAcquireRef(int refId, void *p)
|
||||
{
|
||||
int hash, code = 0;
|
||||
SRefNode *pNode;
|
||||
SRefSet *pSet;
|
||||
|
||||
if ( refId < 0 || refId >= TSDB_REF_OBJECTS ) {
|
||||
uTrace("refId:%d p:%p failed to acquire, refId not valid", refId, p);
|
||||
return TSDB_CODE_REF_INVALID_ID;
|
||||
}
|
||||
|
||||
uTrace("refId:%d p:%p try to acquire", refId, p);
|
||||
|
||||
pSet = tsRefSetList + refId;
|
||||
taosIncRefCount(pSet);
|
||||
if (pSet->state != TSDB_REF_STATE_ACTIVE) {
|
||||
uTrace("refId:%d p:%p failed to acquire, not active", refId, p);
|
||||
taosDecRefCount(pSet);
|
||||
return TSDB_CODE_REF_ID_REMOVED;
|
||||
}
|
||||
|
||||
hash = taosHashRef(pSet, p);
|
||||
|
||||
taosLockList(pSet->lockedBy+hash);
|
||||
|
||||
pNode = pSet->nodeList[hash];
|
||||
|
||||
while (pNode) {
|
||||
if (pNode->p == p) {
|
||||
break;
|
||||
}
|
||||
|
||||
pNode = pNode->next;
|
||||
}
|
||||
|
||||
if (pNode) {
|
||||
pNode->count++;
|
||||
uTrace("refId:%d p:%p is acquired", refId, p);
|
||||
} else {
|
||||
code = TSDB_CODE_REF_NOT_EXIST;
|
||||
uTrace("refId:%d p:%p is not there, failed to acquire", refId, p);
|
||||
}
|
||||
|
||||
taosUnlockList(pSet->lockedBy+hash);
|
||||
|
||||
taosDecRefCount(pSet);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
void taosReleaseRef(int refId, void *p)
|
||||
{
|
||||
int hash;
|
||||
SRefNode *pNode;
|
||||
SRefSet *pSet;
|
||||
int released = 0;
|
||||
|
||||
if (refId < 0 || refId >= TSDB_REF_OBJECTS) {
|
||||
uTrace("refId:%d p:%p failed to release, refId not valid", refId, p);
|
||||
return;
|
||||
}
|
||||
|
||||
uTrace("refId:%d p:%p try to release", refId, p);
|
||||
|
||||
pSet = tsRefSetList + refId;
|
||||
if (pSet->state == TSDB_REF_STATE_EMPTY) {
|
||||
uTrace("refId:%d p:%p failed to release, cleaned", refId, p);
|
||||
return;
|
||||
}
|
||||
|
||||
hash = taosHashRef(pSet, p);
|
||||
|
||||
taosLockList(pSet->lockedBy+hash);
|
||||
|
||||
pNode = pSet->nodeList[hash];
|
||||
while (pNode) {
|
||||
if ( pNode->p == p )
|
||||
break;
|
||||
|
||||
pNode = pNode->next;
|
||||
}
|
||||
|
||||
if (pNode) {
|
||||
pNode->count--;
|
||||
|
||||
if (pNode->count == 0) {
|
||||
if ( pNode->prev ) {
|
||||
pNode->prev->next = pNode->next;
|
||||
} else {
|
||||
pSet->nodeList[hash] = pNode->next;
|
||||
}
|
||||
|
||||
if ( pNode->next ) {
|
||||
pNode->next->prev = pNode->prev;
|
||||
}
|
||||
|
||||
(*pSet->fp)(pNode->p);
|
||||
|
||||
free(pNode);
|
||||
released = 1;
|
||||
uTrace("refId:%d p:%p is removed, count::%d", refId, p, pSet->count);
|
||||
} else {
|
||||
uTrace("refId:%d p:%p is released", refId, p);
|
||||
}
|
||||
} else {
|
||||
uTrace("refId:%d p:%p is not there, failed to release", refId, p);
|
||||
}
|
||||
|
||||
taosUnlockList(pSet->lockedBy+hash);
|
||||
|
||||
if (released) taosDecRefCount(pSet);
|
||||
}
|
||||
|
||||
int taosListRef() {
|
||||
SRefSet *pSet;
|
||||
SRefNode *pNode;
|
||||
int num = 0;
|
||||
|
||||
pthread_mutex_lock(&tsRefMutex);
|
||||
|
||||
for (int i = 0; i < TSDB_REF_OBJECTS; ++i) {
|
||||
pSet = tsRefSetList + i;
|
||||
|
||||
if (pSet->state == TSDB_REF_STATE_EMPTY)
|
||||
continue;
|
||||
|
||||
uInfo("refId:%d state:%d count::%d", i, pSet->state, pSet->count);
|
||||
|
||||
for (int j=0; j < pSet->max; ++j) {
|
||||
pNode = pSet->nodeList[j];
|
||||
|
||||
while (pNode) {
|
||||
uInfo("refId:%d p:%p count:%d", i, pNode->p, pNode->count);
|
||||
pNode = pNode->next;
|
||||
num++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pthread_mutex_unlock(&tsRefMutex);
|
||||
|
||||
return num;
|
||||
}
|
||||
|
||||
static int taosHashRef(SRefSet *pSet, void *p)
|
||||
{
|
||||
int hash = 0;
|
||||
int64_t v = (int64_t)p;
|
||||
|
||||
for (int i = 0; i < sizeof(v); ++i) {
|
||||
hash += (int)(v & 0xFFFF);
|
||||
v = v >> 16;
|
||||
i = i + 2;
|
||||
}
|
||||
|
||||
hash = hash % pSet->max;
|
||||
|
||||
return hash;
|
||||
}
|
||||
|
||||
static void taosLockList(int64_t *lockedBy) {
|
||||
int64_t tid = taosGetPthreadId();
|
||||
int i = 0;
|
||||
while (atomic_val_compare_exchange_64(lockedBy, 0, tid) != 0) {
|
||||
if (++i % 100 == 0) {
|
||||
sched_yield();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static void taosUnlockList(int64_t *lockedBy) {
|
||||
int64_t tid = taosGetPthreadId();
|
||||
if (atomic_val_compare_exchange_64(lockedBy, tid, 0) != tid) {
|
||||
assert(false);
|
||||
}
|
||||
}
|
||||
|
||||
static void taosInitRefModule(void) {
|
||||
pthread_mutex_init(&tsRefMutex, NULL);
|
||||
}
|
||||
|
||||
static void taosIncRefCount(SRefSet *pSet) {
|
||||
atomic_add_fetch_32(&pSet->count, 1);
|
||||
uTrace("refId:%d inc count:%d", pSet->refId, pSet->count);
|
||||
}
|
||||
|
||||
static void taosDecRefCount(SRefSet *pSet) {
|
||||
int32_t count = atomic_sub_fetch_32(&pSet->count, 1);
|
||||
uTrace("refId:%d dec count:%d", pSet->refId, pSet->count);
|
||||
|
||||
if (count > 0) return;
|
||||
|
||||
pthread_mutex_lock(&tsRefMutex);
|
||||
|
||||
if (pSet->state != TSDB_REF_STATE_EMPTY) {
|
||||
pSet->state = TSDB_REF_STATE_EMPTY;
|
||||
pSet->max = 0;
|
||||
pSet->fp = NULL;
|
||||
|
||||
taosTFree(pSet->nodeList);
|
||||
taosTFree(pSet->lockedBy);
|
||||
|
||||
tsRefSetNum--;
|
||||
uTrace("refId:%d is cleaned, refSetNum:%d count:%d", pSet->refId, tsRefSetNum, pSet->count);
|
||||
}
|
||||
|
||||
pthread_mutex_unlock(&tsRefMutex);
|
||||
}
|
||||
|
|
@ -13,3 +13,12 @@ IF (HEADER_GTEST_INCLUDE_DIR AND LIB_GTEST_STATIC_DIR)
|
|||
ADD_EXECUTABLE(utilTest ${SOURCE_LIST})
|
||||
TARGET_LINK_LIBRARIES(utilTest tutil common osdetail gtest pthread gcov)
|
||||
ENDIF()
|
||||
|
||||
IF (TD_LINUX)
|
||||
ADD_EXECUTABLE(trefTest ./trefTest.c)
|
||||
TARGET_LINK_LIBRARIES(trefTest tutil common)
|
||||
ENDIF ()
|
||||
|
||||
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc)
|
||||
|
||||
|
||||
|
|
|
@ -0,0 +1,166 @@
|
|||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <pthread.h>
|
||||
#include <string.h>
|
||||
#include <unistd.h>
|
||||
#include "os.h"
|
||||
#include "tref.h"
|
||||
#include "tlog.h"
|
||||
#include "tglobal.h"
|
||||
#include "taoserror.h"
|
||||
#include "tulog.h"
|
||||
|
||||
typedef struct {
|
||||
int refNum;
|
||||
int steps;
|
||||
int refId;
|
||||
void **p;
|
||||
} SRefSpace;
|
||||
|
||||
void *takeRefActions(void *param) {
|
||||
SRefSpace *pSpace = (SRefSpace *)param;
|
||||
int code, id;
|
||||
|
||||
for (int i=0; i < pSpace->steps; ++i) {
|
||||
printf("s");
|
||||
id = random() % pSpace->refNum;
|
||||
code = taosAddRef(pSpace->refId, pSpace->p[id]);
|
||||
usleep(1);
|
||||
|
||||
id = random() % pSpace->refNum;
|
||||
code = taosAcquireRef(pSpace->refId, pSpace->p[id]);
|
||||
if (code >= 0) {
|
||||
usleep(id % 5 + 1);
|
||||
taosReleaseRef(pSpace->refId, pSpace->p[id]);
|
||||
}
|
||||
|
||||
id = random() % pSpace->refNum;
|
||||
taosRemoveRef(pSpace->refId, pSpace->p[id]);
|
||||
usleep(id %5 + 1);
|
||||
|
||||
id = random() % pSpace->refNum;
|
||||
code = taosAcquireRef(pSpace->refId, pSpace->p[id]);
|
||||
if (code >= 0) {
|
||||
usleep(id % 5 + 1);
|
||||
taosReleaseRef(pSpace->refId, pSpace->p[id]);
|
||||
}
|
||||
}
|
||||
|
||||
for (int i=0; i < pSpace->refNum; ++i) {
|
||||
taosRemoveRef(pSpace->refId, pSpace->p[i]);
|
||||
}
|
||||
|
||||
//uInfo("refId:%d thread exits", pSpace->refId);
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
void myfree(void *p) {
|
||||
return;
|
||||
}
|
||||
|
||||
void *openRefSpace(void *param) {
|
||||
SRefSpace *pSpace = (SRefSpace *)param;
|
||||
|
||||
printf("c");
|
||||
pSpace->refId = taosOpenRef(10000, myfree);
|
||||
|
||||
if (pSpace->refId < 0) {
|
||||
printf("failed to open ref, reson:%s\n", tstrerror(pSpace->refId));
|
||||
return NULL;
|
||||
}
|
||||
|
||||
pSpace->p = (void **) calloc(sizeof(void *), pSpace->refNum);
|
||||
for (int i=0; i<pSpace->refNum; ++i) {
|
||||
pSpace->p[i] = (void *) malloc(128);
|
||||
}
|
||||
|
||||
pthread_attr_t thattr;
|
||||
pthread_attr_init(&thattr);
|
||||
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
|
||||
|
||||
pthread_t thread1, thread2, thread3;
|
||||
pthread_create(&(thread1), &thattr, takeRefActions, (void *)(pSpace));
|
||||
pthread_create(&(thread2), &thattr, takeRefActions, (void *)(pSpace));
|
||||
pthread_create(&(thread3), &thattr, takeRefActions, (void *)(pSpace));
|
||||
|
||||
pthread_join(thread1, NULL);
|
||||
pthread_join(thread2, NULL);
|
||||
pthread_join(thread3, NULL);
|
||||
|
||||
taosCloseRef(pSpace->refId);
|
||||
|
||||
for (int i=0; i<pSpace->refNum; ++i) {
|
||||
free(pSpace->p[i]);
|
||||
}
|
||||
|
||||
uInfo("refId:%d main thread exit", pSpace->refId);
|
||||
free(pSpace->p);
|
||||
pSpace->p = NULL;
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
int main(int argc, char *argv[]) {
|
||||
int refNum = 100;
|
||||
int threads = 10;
|
||||
int steps = 10000;
|
||||
int loops = 1;
|
||||
|
||||
uDebugFlag = 143;
|
||||
|
||||
for (int i=1; i<argc; ++i) {
|
||||
if (strcmp(argv[i], "-n")==0 && i < argc-1) {
|
||||
refNum = atoi(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-s")==0 && i < argc-1) {
|
||||
steps = atoi(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-t")==0 && i < argc-1) {
|
||||
threads = atoi(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-l")==0 && i < argc-1) {
|
||||
loops = atoi(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-d")==0 && i < argc-1) {
|
||||
uDebugFlag = atoi(argv[i]);
|
||||
} else {
|
||||
printf("\nusage: %s [options] \n", argv[0]);
|
||||
printf(" [-n]: number of references, default: %d\n", refNum);
|
||||
printf(" [-s]: steps to run for each reference, default: %d\n", steps);
|
||||
printf(" [-t]: number of refIds running in parallel, default: %d\n", threads);
|
||||
printf(" [-l]: number of loops, default: %d\n", loops);
|
||||
printf(" [-d]: debugFlag, default: %d\n", uDebugFlag);
|
||||
exit(0);
|
||||
}
|
||||
}
|
||||
|
||||
taosInitLog("tref.log", 5000000, 10);
|
||||
|
||||
SRefSpace *pSpaceList = (SRefSpace *) calloc(sizeof(SRefSpace), threads);
|
||||
pthread_t *pThreadList = (pthread_t *) calloc(sizeof(pthread_t), threads);
|
||||
|
||||
pthread_attr_t thattr;
|
||||
pthread_attr_init(&thattr);
|
||||
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
|
||||
|
||||
for (int i=0; i<loops; ++i) {
|
||||
printf("\nloop: %d\n", i);
|
||||
for (int j=0; j<threads; ++j) {
|
||||
pSpaceList[j].steps = steps;
|
||||
pSpaceList[j].refNum = refNum;
|
||||
pthread_create(&(pThreadList[j]), &thattr, openRefSpace, (void *)(pSpaceList+j));
|
||||
}
|
||||
|
||||
for (int j=0; j<threads; ++j) {
|
||||
pthread_join(pThreadList[j], NULL);
|
||||
}
|
||||
}
|
||||
|
||||
int num = taosListRef();
|
||||
printf("\nnumber of references:%d\n", num);
|
||||
|
||||
free(pSpaceList);
|
||||
free(pThreadList);
|
||||
|
||||
taosCloseLog();
|
||||
|
||||
return num;
|
||||
}
|
||||
|
Loading…
Reference in New Issue