second version
This commit is contained in:
parent
1eabb496dc
commit
5b735ad091
|
@ -82,6 +82,7 @@ typedef struct {
|
||||||
int8_t oldInUse; // server EP inUse passed by app
|
int8_t oldInUse; // server EP inUse passed by app
|
||||||
int8_t redirect; // flag to indicate redirect
|
int8_t redirect; // flag to indicate redirect
|
||||||
int8_t connType; // connection type
|
int8_t connType; // connection type
|
||||||
|
int32_t rid; // refId returned by taosAddRef
|
||||||
SRpcMsg *pRsp; // for synchronous API
|
SRpcMsg *pRsp; // for synchronous API
|
||||||
tsem_t *pSem; // for synchronous API
|
tsem_t *pSem; // for synchronous API
|
||||||
SRpcEpSet *pSet; // for synchronous API
|
SRpcEpSet *pSet; // for synchronous API
|
||||||
|
@ -374,7 +375,7 @@ void *rpcReallocCont(void *ptr, int contLen) {
|
||||||
return start + sizeof(SRpcReqContext) + sizeof(SRpcHead);
|
return start + sizeof(SRpcReqContext) + sizeof(SRpcHead);
|
||||||
}
|
}
|
||||||
|
|
||||||
void rpcSendRequest(void *shandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg) {
|
int64_t rpcSendRequest(void *shandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg) {
|
||||||
SRpcInfo *pRpc = (SRpcInfo *)shandle;
|
SRpcInfo *pRpc = (SRpcInfo *)shandle;
|
||||||
SRpcReqContext *pContext;
|
SRpcReqContext *pContext;
|
||||||
|
|
||||||
|
@ -403,10 +404,11 @@ void rpcSendRequest(void *shandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg) {
|
||||||
// set the handle to pContext, so app can cancel the request
|
// set the handle to pContext, so app can cancel the request
|
||||||
if (pMsg->handle) *((void **)pMsg->handle) = pContext;
|
if (pMsg->handle) *((void **)pMsg->handle) = pContext;
|
||||||
|
|
||||||
taosAddRef(tsRpcRefId, pContext);
|
pContext->rid = taosAddRef(tsRpcRefId, pContext);
|
||||||
|
|
||||||
rpcSendReqToServer(pRpc, pContext);
|
rpcSendReqToServer(pRpc, pContext);
|
||||||
|
|
||||||
return;
|
return pContext->rid;
|
||||||
}
|
}
|
||||||
|
|
||||||
void rpcSendResponse(const SRpcMsg *pRsp) {
|
void rpcSendResponse(const SRpcMsg *pRsp) {
|
||||||
|
@ -551,11 +553,10 @@ int rpcReportProgress(void *handle, char *pCont, int contLen) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
void rpcCancelRequest(void *handle) {
|
void rpcCancelRequest(int64_t rid) {
|
||||||
SRpcReqContext *pContext = handle;
|
|
||||||
|
|
||||||
int code = taosAcquireRef(tsRpcRefId, pContext);
|
SRpcReqContext *pContext = taosAcquireRef(tsRpcRefId, rid);
|
||||||
if (code < 0) return;
|
if (pContext == NULL) return;
|
||||||
|
|
||||||
rpcCloseConn(pContext->pConn);
|
rpcCloseConn(pContext->pConn);
|
||||||
|
|
||||||
|
|
|
@ -32,7 +32,10 @@ int taosCloseRef(int refId);
|
||||||
// add ref, p is the pointer to resource or pointer ID
|
// add ref, p is the pointer to resource or pointer ID
|
||||||
// return Reference ID(rid) allocated. On error, -1 is returned, and terrno is set appropriately
|
// return Reference ID(rid) allocated. On error, -1 is returned, and terrno is set appropriately
|
||||||
int64_t taosAddRef(int refId, void *p);
|
int64_t taosAddRef(int refId, void *p);
|
||||||
#define taosRemoveRef taosReleaseRef
|
|
||||||
|
// remove ref, rid is the reference ID returned by taosAddRef
|
||||||
|
// return 0 if success. On error, -1 is returned, and terrno is set appropriately
|
||||||
|
int taosRemoveRef(int rsetId, int64_t rid);
|
||||||
|
|
||||||
// acquire ref, rid is the reference ID returned by taosAddRef
|
// acquire ref, rid is the reference ID returned by taosAddRef
|
||||||
// return the resource p. On error, NULL is returned, and terrno is set appropriately
|
// return the resource p. On error, NULL is returned, and terrno is set appropriately
|
||||||
|
|
|
@ -29,6 +29,7 @@ typedef struct SRefNode {
|
||||||
void *p; // pointer to resource protected,
|
void *p; // pointer to resource protected,
|
||||||
int64_t rid; // reference ID
|
int64_t rid; // reference ID
|
||||||
int32_t count; // number of references
|
int32_t count; // number of references
|
||||||
|
int removed; // 1: removed
|
||||||
} SRefNode;
|
} SRefNode;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
@ -51,8 +52,9 @@ static int tsNextId = 0;
|
||||||
static void taosInitRefModule(void);
|
static void taosInitRefModule(void);
|
||||||
static void taosLockList(int64_t *lockedBy);
|
static void taosLockList(int64_t *lockedBy);
|
||||||
static void taosUnlockList(int64_t *lockedBy);
|
static void taosUnlockList(int64_t *lockedBy);
|
||||||
static void taosIncRefCount(SRefSet *pSet);
|
static void taosIncRsetCount(SRefSet *pSet);
|
||||||
static void taosDecRefCount(SRefSet *pSet);
|
static void taosDecRsetCount(SRefSet *pSet);
|
||||||
|
static int taosDecRefCount(int rsetId, int64_t rid, int remove);
|
||||||
|
|
||||||
int taosOpenRef(int max, void (*fp)(void *))
|
int taosOpenRef(int max, void (*fp)(void *))
|
||||||
{
|
{
|
||||||
|
@ -86,7 +88,7 @@ int taosOpenRef(int max, void (*fp)(void *))
|
||||||
if (i < TSDB_REF_OBJECTS) {
|
if (i < TSDB_REF_OBJECTS) {
|
||||||
rsetId = tsNextId;
|
rsetId = tsNextId;
|
||||||
pSet = tsRefSetList + rsetId;
|
pSet = tsRefSetList + rsetId;
|
||||||
taosIncRefCount(pSet);
|
taosIncRsetCount(pSet);
|
||||||
pSet->max = max;
|
pSet->max = max;
|
||||||
pSet->nodeList = nodeList;
|
pSet->nodeList = nodeList;
|
||||||
pSet->lockedBy = lockedBy;
|
pSet->lockedBy = lockedBy;
|
||||||
|
@ -134,7 +136,7 @@ int taosCloseRef(int rsetId)
|
||||||
|
|
||||||
pthread_mutex_unlock(&tsRefMutex);
|
pthread_mutex_unlock(&tsRefMutex);
|
||||||
|
|
||||||
if (deleted) taosDecRefCount(pSet);
|
if (deleted) taosDecRsetCount(pSet);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -153,9 +155,9 @@ int64_t taosAddRef(int rsetId, void *p)
|
||||||
}
|
}
|
||||||
|
|
||||||
pSet = tsRefSetList + rsetId;
|
pSet = tsRefSetList + rsetId;
|
||||||
taosIncRefCount(pSet);
|
taosIncRsetCount(pSet);
|
||||||
if (pSet->state != TSDB_REF_STATE_ACTIVE) {
|
if (pSet->state != TSDB_REF_STATE_ACTIVE) {
|
||||||
taosDecRefCount(pSet);
|
taosDecRsetCount(pSet);
|
||||||
uTrace("rsetId:%d p:%p failed to add, not active", rsetId, p);
|
uTrace("rsetId:%d p:%p failed to add, not active", rsetId, p);
|
||||||
terrno = TSDB_CODE_REF_ID_REMOVED;
|
terrno = TSDB_CODE_REF_ID_REMOVED;
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -187,6 +189,12 @@ int64_t taosAddRef(int rsetId, void *p)
|
||||||
return rid;
|
return rid;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int taosRemoveRef(int rsetId, int64_t rid)
|
||||||
|
{
|
||||||
|
return taosDecRefCount(rsetId, rid, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
// if rid is 0, return the first p in hash list, otherwise, return the next after current rid
|
||||||
void *taosAcquireRef(int rsetId, int64_t rid)
|
void *taosAcquireRef(int rsetId, int64_t rid)
|
||||||
{
|
{
|
||||||
int hash;
|
int hash;
|
||||||
|
@ -200,11 +208,17 @@ void *taosAcquireRef(int rsetId, int64_t rid)
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (rid <= 0) {
|
||||||
|
uTrace("rsetId:%d rid:%" PRId64 " failed to acquire, rid not valid", rsetId, rid);
|
||||||
|
terrno = TSDB_CODE_REF_NOT_EXIST;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
pSet = tsRefSetList + rsetId;
|
pSet = tsRefSetList + rsetId;
|
||||||
taosIncRefCount(pSet);
|
taosIncRsetCount(pSet);
|
||||||
if (pSet->state != TSDB_REF_STATE_ACTIVE) {
|
if (pSet->state != TSDB_REF_STATE_ACTIVE) {
|
||||||
uTrace("rsetId:%d rid:%" PRId64 " failed to acquire, not active", rsetId, rid);
|
uTrace("rsetId:%d rid:%" PRId64 " failed to acquire, not active", rsetId, rid);
|
||||||
taosDecRefCount(pSet);
|
taosDecRsetCount(pSet);
|
||||||
terrno = TSDB_CODE_REF_ID_REMOVED;
|
terrno = TSDB_CODE_REF_ID_REMOVED;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -223,9 +237,14 @@ void *taosAcquireRef(int rsetId, int64_t rid)
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pNode) {
|
if (pNode) {
|
||||||
pNode->count++;
|
if (pNode->removed == 0) {
|
||||||
p = pNode->p;
|
pNode->count++;
|
||||||
uTrace("rsetId:%d p:%p rid:%" PRId64 " is acquired", rsetId, pNode->p, rid);
|
p = pNode->p;
|
||||||
|
uTrace("rsetId:%d p:%p rid:%" PRId64 " is acquired", rsetId, pNode->p, rid);
|
||||||
|
} else {
|
||||||
|
terrno = TSDB_CODE_REF_NOT_EXIST;
|
||||||
|
uTrace("rsetId:%d p:%p rid:%" PRId64 " is already removed, failed to acquire", rsetId, pNode->p, rid);
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
terrno = TSDB_CODE_REF_NOT_EXIST;
|
terrno = TSDB_CODE_REF_NOT_EXIST;
|
||||||
uTrace("rsetId:%d rid:%" PRId64 " is not there, failed to acquire", rsetId, rid);
|
uTrace("rsetId:%d rid:%" PRId64 " is not there, failed to acquire", rsetId, rid);
|
||||||
|
@ -233,75 +252,14 @@ void *taosAcquireRef(int rsetId, int64_t rid)
|
||||||
|
|
||||||
taosUnlockList(pSet->lockedBy+hash);
|
taosUnlockList(pSet->lockedBy+hash);
|
||||||
|
|
||||||
taosDecRefCount(pSet);
|
taosDecRsetCount(pSet);
|
||||||
|
|
||||||
return p;
|
return p;
|
||||||
}
|
}
|
||||||
|
|
||||||
int taosReleaseRef(int rsetId, int64_t rid)
|
int taosReleaseRef(int rsetId, int64_t rid)
|
||||||
{
|
{
|
||||||
int hash;
|
return taosDecRefCount(rsetId, rid, 0);
|
||||||
SRefNode *pNode;
|
|
||||||
SRefSet *pSet;
|
|
||||||
int released = 0;
|
|
||||||
|
|
||||||
if (rsetId < 0 || rsetId >= TSDB_REF_OBJECTS) {
|
|
||||||
uTrace("rsetId:%d rid:%" PRId64 " failed to release, rsetId not valid", rsetId, rid);
|
|
||||||
terrno = TSDB_CODE_REF_INVALID_ID;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
pSet = tsRefSetList + rsetId;
|
|
||||||
if (pSet->state == TSDB_REF_STATE_EMPTY) {
|
|
||||||
uTrace("rsetId:%d rid:%" PRId64 " failed to release, cleaned", rsetId, rid);
|
|
||||||
terrno = TSDB_CODE_REF_ID_REMOVED;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
terrno = 0;
|
|
||||||
hash = rid % pSet->max;
|
|
||||||
taosLockList(pSet->lockedBy+hash);
|
|
||||||
|
|
||||||
pNode = pSet->nodeList[hash];
|
|
||||||
while (pNode) {
|
|
||||||
if (pNode->rid == rid)
|
|
||||||
break;
|
|
||||||
|
|
||||||
pNode = pNode->next;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pNode) {
|
|
||||||
pNode->count--;
|
|
||||||
|
|
||||||
if (pNode->count == 0) {
|
|
||||||
if ( pNode->prev ) {
|
|
||||||
pNode->prev->next = pNode->next;
|
|
||||||
} else {
|
|
||||||
pSet->nodeList[hash] = pNode->next;
|
|
||||||
}
|
|
||||||
|
|
||||||
if ( pNode->next ) {
|
|
||||||
pNode->next->prev = pNode->prev;
|
|
||||||
}
|
|
||||||
|
|
||||||
(*pSet->fp)(pNode->p);
|
|
||||||
|
|
||||||
uTrace("rsetId:%d p:%p rid:%" PRId64 "is removed, count:%d, free mem: %p", rsetId, pNode->p, rid, pSet->count, pNode);
|
|
||||||
free(pNode);
|
|
||||||
released = 1;
|
|
||||||
} else {
|
|
||||||
uTrace("rsetId:%d p:%p rid:%" PRId64 "is released", rsetId, pNode->p, rid);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
uTrace("rsetId:%d rid:%" PRId64 " is not there, failed to release", rsetId, rid);
|
|
||||||
terrno = TSDB_CODE_REF_NOT_EXIST;
|
|
||||||
}
|
|
||||||
|
|
||||||
taosUnlockList(pSet->lockedBy+hash);
|
|
||||||
|
|
||||||
if (released) taosDecRefCount(pSet);
|
|
||||||
|
|
||||||
return terrno;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// if rid is 0, return the first p in hash list, otherwise, return the next after current rid
|
// if rid is 0, return the first p in hash list, otherwise, return the next after current rid
|
||||||
|
@ -315,12 +273,18 @@ void *taosIterateRef(int rsetId, int64_t rid) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (rid <= 0) {
|
||||||
|
uTrace("rsetId:%d rid:%" PRId64 " failed to iterate, rid not valid", rsetId, rid);
|
||||||
|
terrno = TSDB_CODE_REF_NOT_EXIST;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
pSet = tsRefSetList + rsetId;
|
pSet = tsRefSetList + rsetId;
|
||||||
taosIncRefCount(pSet);
|
taosIncRsetCount(pSet);
|
||||||
if (pSet->state != TSDB_REF_STATE_ACTIVE) {
|
if (pSet->state != TSDB_REF_STATE_ACTIVE) {
|
||||||
uTrace("rsetId:%d rid:%" PRId64 " failed to iterate, rset not active", rsetId, rid);
|
uTrace("rsetId:%d rid:%" PRId64 " failed to iterate, rset not active", rsetId, rid);
|
||||||
terrno = TSDB_CODE_REF_ID_REMOVED;
|
terrno = TSDB_CODE_REF_ID_REMOVED;
|
||||||
taosDecRefCount(pSet);
|
taosDecRsetCount(pSet);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -371,7 +335,7 @@ void *taosIterateRef(int rsetId, int64_t rid) {
|
||||||
|
|
||||||
if (rid > 0) taosReleaseRef(rsetId, rid); // release the current one
|
if (rid > 0) taosReleaseRef(rsetId, rid); // release the current one
|
||||||
|
|
||||||
taosDecRefCount(pSet);
|
taosDecRsetCount(pSet);
|
||||||
|
|
||||||
return newP;
|
return newP;
|
||||||
}
|
}
|
||||||
|
@ -407,6 +371,81 @@ int taosListRef() {
|
||||||
return num;
|
return num;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int taosDecRefCount(int rsetId, int64_t rid, int remove) {
|
||||||
|
int hash;
|
||||||
|
SRefSet *pSet;
|
||||||
|
SRefNode *pNode;
|
||||||
|
int released = 0;
|
||||||
|
int code = 0;
|
||||||
|
|
||||||
|
if (rsetId < 0 || rsetId >= TSDB_REF_OBJECTS) {
|
||||||
|
uTrace("rsetId:%d rid:%" PRId64 " failed to remove, rsetId not valid", rsetId, rid);
|
||||||
|
terrno = TSDB_CODE_REF_INVALID_ID;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (rid <= 0) {
|
||||||
|
uTrace("rsetId:%d rid:%" PRId64 " failed to remove, rid not valid", rsetId, rid);
|
||||||
|
terrno = TSDB_CODE_REF_NOT_EXIST;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
pSet = tsRefSetList + rsetId;
|
||||||
|
if (pSet->state == TSDB_REF_STATE_EMPTY) {
|
||||||
|
uTrace("rsetId:%d rid:%" PRId64 " failed to remove, cleaned", rsetId, rid);
|
||||||
|
terrno = TSDB_CODE_REF_ID_REMOVED;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
terrno = 0;
|
||||||
|
hash = rid % pSet->max;
|
||||||
|
|
||||||
|
taosLockList(pSet->lockedBy+hash);
|
||||||
|
|
||||||
|
pNode = pSet->nodeList[hash];
|
||||||
|
while (pNode) {
|
||||||
|
if (pNode->rid == rid)
|
||||||
|
break;
|
||||||
|
|
||||||
|
pNode = pNode->next;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pNode) {
|
||||||
|
pNode->count--;
|
||||||
|
if (remove) pNode->removed = 1;
|
||||||
|
|
||||||
|
if (pNode->count <= 0) {
|
||||||
|
if (pNode->prev) {
|
||||||
|
pNode->prev->next = pNode->next;
|
||||||
|
} else {
|
||||||
|
pSet->nodeList[hash] = pNode->next;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pNode->next) {
|
||||||
|
pNode->next->prev = pNode->prev;
|
||||||
|
}
|
||||||
|
|
||||||
|
(*pSet->fp)(pNode->p);
|
||||||
|
|
||||||
|
uTrace("rsetId:%d p:%p rid:%" PRId64 "is removed, count:%d, free mem: %p", rsetId, pNode->p, rid, pSet->count, pNode);
|
||||||
|
free(pNode);
|
||||||
|
released = 1;
|
||||||
|
} else {
|
||||||
|
uTrace("rsetId:%d p:%p rid:%" PRId64 "is released, count:%d", rsetId, pNode->p, rid, pNode->count);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
uTrace("rsetId:%d rid:%" PRId64 " is not there, failed to release/remove", rsetId, rid);
|
||||||
|
terrno = TSDB_CODE_REF_NOT_EXIST;
|
||||||
|
code = -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
taosUnlockList(pSet->lockedBy+hash);
|
||||||
|
|
||||||
|
if (released) taosDecRsetCount(pSet);
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
static void taosLockList(int64_t *lockedBy) {
|
static void taosLockList(int64_t *lockedBy) {
|
||||||
int64_t tid = taosGetPthreadId();
|
int64_t tid = taosGetPthreadId();
|
||||||
int i = 0;
|
int i = 0;
|
||||||
|
@ -428,12 +467,12 @@ static void taosInitRefModule(void) {
|
||||||
pthread_mutex_init(&tsRefMutex, NULL);
|
pthread_mutex_init(&tsRefMutex, NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void taosIncRefCount(SRefSet *pSet) {
|
static void taosIncRsetCount(SRefSet *pSet) {
|
||||||
atomic_add_fetch_32(&pSet->count, 1);
|
atomic_add_fetch_32(&pSet->count, 1);
|
||||||
uTrace("rsetId:%d inc count:%d", pSet->rsetId, pSet->count);
|
uTrace("rsetId:%d inc count:%d", pSet->rsetId, pSet->count);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void taosDecRefCount(SRefSet *pSet) {
|
static void taosDecRsetCount(SRefSet *pSet) {
|
||||||
int32_t count = atomic_sub_fetch_32(&pSet->count, 1);
|
int32_t count = atomic_sub_fetch_32(&pSet->count, 1);
|
||||||
uTrace("rsetId:%d dec count:%d", pSet->rsetId, pSet->count);
|
uTrace("rsetId:%d dec count:%d", pSet->rsetId, pSet->count);
|
||||||
|
|
||||||
|
|
|
@ -11,106 +11,119 @@
|
||||||
#include "tulog.h"
|
#include "tulog.h"
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int refNum;
|
int refNum;
|
||||||
int steps;
|
int steps;
|
||||||
int refId;
|
int rsetId;
|
||||||
void **p;
|
int64_t rid;
|
||||||
|
void **p;
|
||||||
} SRefSpace;
|
} SRefSpace;
|
||||||
|
|
||||||
void iterateRefs(int refId) {
|
void iterateRefs(int rsetId) {
|
||||||
int count = 0;
|
int count = 0;
|
||||||
|
|
||||||
void *p = taosIterateRef(refId, NULL);
|
void *p = taosIterateRef(rsetId, NULL);
|
||||||
while (p) {
|
while (p) {
|
||||||
// process P
|
// process P
|
||||||
count++;
|
count++;
|
||||||
p = taosIterateRef(refId, p);
|
p = taosIterateRef(rsetId, p);
|
||||||
}
|
}
|
||||||
|
|
||||||
printf(" %d ", count);
|
printf(" %d ", count);
|
||||||
}
|
}
|
||||||
|
|
||||||
void *takeRefActions(void *param) {
|
void *addRef(void *param) {
|
||||||
SRefSpace *pSpace = (SRefSpace *)param;
|
SRefSpace *pSpace = (SRefSpace *)param;
|
||||||
int code, id;
|
int id;
|
||||||
|
int64_t rid;
|
||||||
|
|
||||||
for (int i=0; i < pSpace->steps; ++i) {
|
for (int i=0; i < pSpace->steps; ++i) {
|
||||||
printf("s");
|
printf("a");
|
||||||
id = random() % pSpace->refNum;
|
id = random() % pSpace->refNum;
|
||||||
code = taosAddRef(pSpace->refId, pSpace->p[id]);
|
if (pSpace->rid[id] <= 0) {
|
||||||
usleep(1);
|
pSpace->p[id] = malloc(128);
|
||||||
|
pSpace->rid[id] = taosAddRef(pSpace->rsetId, pSpace->p[id]);
|
||||||
id = random() % pSpace->refNum;
|
|
||||||
code = taosAcquireRef(pSpace->refId, pSpace->p[id]);
|
|
||||||
if (code >= 0) {
|
|
||||||
usleep(id % 5 + 1);
|
|
||||||
taosReleaseRef(pSpace->refId, pSpace->p[id]);
|
|
||||||
}
|
}
|
||||||
|
usleep(100);
|
||||||
id = random() % pSpace->refNum;
|
|
||||||
taosRemoveRef(pSpace->refId, pSpace->p[id]);
|
|
||||||
usleep(id %5 + 1);
|
|
||||||
|
|
||||||
id = random() % pSpace->refNum;
|
|
||||||
code = taosAcquireRef(pSpace->refId, pSpace->p[id]);
|
|
||||||
if (code >= 0) {
|
|
||||||
usleep(id % 5 + 1);
|
|
||||||
taosReleaseRef(pSpace->refId, pSpace->p[id]);
|
|
||||||
}
|
|
||||||
|
|
||||||
id = random() % pSpace->refNum;
|
|
||||||
iterateRefs(id);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int i=0; i < pSpace->refNum; ++i) {
|
return NULL;
|
||||||
taosRemoveRef(pSpace->refId, pSpace->p[i]);
|
}
|
||||||
}
|
|
||||||
|
void *removeRef(void *param) {
|
||||||
//uInfo("refId:%d thread exits", pSpace->refId);
|
SRefSpace *pSpace = (SRefSpace *)param;
|
||||||
|
int id;
|
||||||
|
int64_t rid;
|
||||||
|
|
||||||
|
for (int i=0; i < pSpace->steps; ++i) {
|
||||||
|
printf("d");
|
||||||
|
id = random() % pSpace->refNum;
|
||||||
|
if (pSpace->rid[id] > 0) {
|
||||||
|
code = taosRemoveRef(pSpace->rsetId, pSpace->rid[id]);
|
||||||
|
if (code == 0) pSpace->rid[id] = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
usleep(100);
|
||||||
|
}
|
||||||
|
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
void *acquireRelease(void *param) {
|
||||||
|
SRefSpace *pSpace = (SRefSpace *)param;
|
||||||
|
int id;
|
||||||
|
int64_t rid;
|
||||||
|
|
||||||
|
for (int i=0; i < pSpace->steps; ++i) {
|
||||||
|
printf("a");
|
||||||
|
|
||||||
|
id = random() % pSpace->refNum;
|
||||||
|
code = taosAcquireRef(pSpace->rsetId, pSpace->p[id]);
|
||||||
|
if (code >= 0) {
|
||||||
|
usleep(id % 5 + 1);
|
||||||
|
taosReleaseRef(pSpace->rsetId, pSpace->p[id]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
void myfree(void *p) {
|
void myfree(void *p) {
|
||||||
return;
|
free(p);
|
||||||
}
|
}
|
||||||
|
|
||||||
void *openRefSpace(void *param) {
|
void *openRefSpace(void *param) {
|
||||||
SRefSpace *pSpace = (SRefSpace *)param;
|
SRefSpace *pSpace = (SRefSpace *)param;
|
||||||
|
|
||||||
printf("c");
|
printf("c");
|
||||||
pSpace->refId = taosOpenRef(50, myfree);
|
pSpace->rsetId = taosOpenRef(50, myfree);
|
||||||
|
|
||||||
if (pSpace->refId < 0) {
|
if (pSpace->rsetId < 0) {
|
||||||
printf("failed to open ref, reson:%s\n", tstrerror(pSpace->refId));
|
printf("failed to open ref, reson:%s\n", tstrerror(pSpace->rsetId));
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
pSpace->p = (void **) calloc(sizeof(void *), pSpace->refNum);
|
pSpace->p = (void **) calloc(sizeof(void *), pSpace->refNum);
|
||||||
for (int i=0; i<pSpace->refNum; ++i) {
|
|
||||||
pSpace->p[i] = (void *) malloc(128);
|
|
||||||
}
|
|
||||||
|
|
||||||
pthread_attr_t thattr;
|
pthread_attr_t thattr;
|
||||||
pthread_attr_init(&thattr);
|
pthread_attr_init(&thattr);
|
||||||
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
|
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
|
||||||
|
|
||||||
pthread_t thread1, thread2, thread3;
|
pthread_t thread1, thread2, thread3;
|
||||||
pthread_create(&(thread1), &thattr, takeRefActions, (void *)(pSpace));
|
pthread_create(&(thread1), &thattr, addRef, (void *)(pSpace));
|
||||||
pthread_create(&(thread2), &thattr, takeRefActions, (void *)(pSpace));
|
pthread_create(&(thread2), &thattr, removeRef, (void *)(pSpace));
|
||||||
pthread_create(&(thread3), &thattr, takeRefActions, (void *)(pSpace));
|
pthread_create(&(thread3), &thattr, acquireRelease, (void *)(pSpace));
|
||||||
|
|
||||||
pthread_join(thread1, NULL);
|
pthread_join(thread1, NULL);
|
||||||
pthread_join(thread2, NULL);
|
pthread_join(thread2, NULL);
|
||||||
pthread_join(thread3, NULL);
|
pthread_join(thread3, NULL);
|
||||||
|
|
||||||
taosCloseRef(pSpace->refId);
|
|
||||||
|
|
||||||
for (int i=0; i<pSpace->refNum; ++i) {
|
for (int i=0; i<pSpace->refNum; ++i) {
|
||||||
free(pSpace->p[i]);
|
taosRemoveRef(pSpace->rsetId, pSpace->rid[i]);
|
||||||
}
|
}
|
||||||
|
|
||||||
uInfo("refId:%d main thread exit", pSpace->refId);
|
taosCloseRef(pSpace->rsetId);
|
||||||
|
|
||||||
|
uInfo("rsetId:%d main thread exit", pSpace->rsetId);
|
||||||
free(pSpace->p);
|
free(pSpace->p);
|
||||||
pSpace->p = NULL;
|
pSpace->p = NULL;
|
||||||
|
|
||||||
|
@ -140,7 +153,7 @@ int main(int argc, char *argv[]) {
|
||||||
printf("\nusage: %s [options] \n", argv[0]);
|
printf("\nusage: %s [options] \n", argv[0]);
|
||||||
printf(" [-n]: number of references, default: %d\n", refNum);
|
printf(" [-n]: number of references, default: %d\n", refNum);
|
||||||
printf(" [-s]: steps to run for each reference, default: %d\n", steps);
|
printf(" [-s]: steps to run for each reference, default: %d\n", steps);
|
||||||
printf(" [-t]: number of refIds running in parallel, default: %d\n", threads);
|
printf(" [-t]: number of rsetIds running in parallel, default: %d\n", threads);
|
||||||
printf(" [-l]: number of loops, default: %d\n", loops);
|
printf(" [-l]: number of loops, default: %d\n", loops);
|
||||||
printf(" [-d]: debugFlag, default: %d\n", uDebugFlag);
|
printf(" [-d]: debugFlag, default: %d\n", uDebugFlag);
|
||||||
exit(0);
|
exit(0);
|
||||||
|
|
Loading…
Reference in New Issue