taosIterateRef: shall NOT return removed node
This commit is contained in:
parent
3914317751
commit
7295cf43f9
|
@ -24,22 +24,22 @@
|
||||||
#define TSDB_REF_STATE_DELETED 2
|
#define TSDB_REF_STATE_DELETED 2
|
||||||
|
|
||||||
typedef struct SRefNode {
|
typedef struct SRefNode {
|
||||||
struct SRefNode *prev; // previous node
|
struct SRefNode *prev; // previous node
|
||||||
struct SRefNode *next; // next node
|
struct SRefNode *next; // next node
|
||||||
void *p; // pointer to resource protected,
|
void *p; // pointer to resource protected,
|
||||||
int64_t rid; // reference ID
|
int64_t rid; // reference ID
|
||||||
int32_t count; // number of references
|
int32_t count; // number of references
|
||||||
int removed; // 1: removed
|
int removed; // 1: removed
|
||||||
} SRefNode;
|
} SRefNode;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
SRefNode **nodeList; // array of SRefNode linked list
|
SRefNode **nodeList; // array of SRefNode linked list
|
||||||
int state; // 0: empty, 1: active; 2: deleted
|
int state; // 0: empty, 1: active; 2: deleted
|
||||||
int rsetId; // refSet ID, global unique
|
int rsetId; // refSet ID, global unique
|
||||||
int64_t rid; // increase by one for each new reference
|
int64_t rid; // increase by one for each new reference
|
||||||
int max; // mod
|
int max; // mod
|
||||||
int32_t count; // total number of SRefNodes in this set
|
int32_t count; // total number of SRefNodes in this set
|
||||||
int64_t *lockedBy;
|
int64_t *lockedBy;
|
||||||
void (*fp)(void *);
|
void (*fp)(void *);
|
||||||
} SRefSet;
|
} SRefSet;
|
||||||
|
|
||||||
|
@ -62,9 +62,9 @@ int taosOpenRef(int max, void (*fp)(void *))
|
||||||
SRefSet *pSet;
|
SRefSet *pSet;
|
||||||
int64_t *lockedBy;
|
int64_t *lockedBy;
|
||||||
int i, rsetId;
|
int i, rsetId;
|
||||||
|
|
||||||
pthread_once(&tsRefModuleInit, taosInitRefModule);
|
pthread_once(&tsRefModuleInit, taosInitRefModule);
|
||||||
|
|
||||||
nodeList = calloc(sizeof(SRefNode *), (size_t)max);
|
nodeList = calloc(sizeof(SRefNode *), (size_t)max);
|
||||||
if (nodeList == NULL) {
|
if (nodeList == NULL) {
|
||||||
terrno = TSDB_CODE_REF_NO_MEMORY;
|
terrno = TSDB_CODE_REF_NO_MEMORY;
|
||||||
|
@ -79,12 +79,12 @@ int taosOpenRef(int max, void (*fp)(void *))
|
||||||
}
|
}
|
||||||
|
|
||||||
pthread_mutex_lock(&tsRefMutex);
|
pthread_mutex_lock(&tsRefMutex);
|
||||||
|
|
||||||
for (i = 0; i < TSDB_REF_OBJECTS; ++i) {
|
for (i = 0; i < TSDB_REF_OBJECTS; ++i) {
|
||||||
tsNextId = (tsNextId + 1) % TSDB_REF_OBJECTS;
|
tsNextId = (tsNextId + 1) % TSDB_REF_OBJECTS;
|
||||||
if (tsNextId == 0) tsNextId = 1; // dont use 0 as rsetId
|
if (tsNextId == 0) tsNextId = 1; // dont use 0 as rsetId
|
||||||
if (tsRefSetList[tsNextId].state == TSDB_REF_STATE_EMPTY) break;
|
if (tsRefSetList[tsNextId].state == TSDB_REF_STATE_EMPTY) break;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (i < TSDB_REF_OBJECTS) {
|
if (i < TSDB_REF_OBJECTS) {
|
||||||
rsetId = tsNextId;
|
rsetId = tsNextId;
|
||||||
|
@ -105,7 +105,7 @@ int taosOpenRef(int max, void (*fp)(void *))
|
||||||
free (nodeList);
|
free (nodeList);
|
||||||
free (lockedBy);
|
free (lockedBy);
|
||||||
uTrace("run out of Ref ID, maximum:%d refSetNum:%d", TSDB_REF_OBJECTS, tsRefSetNum);
|
uTrace("run out of Ref ID, maximum:%d refSetNum:%d", TSDB_REF_OBJECTS, tsRefSetNum);
|
||||||
}
|
}
|
||||||
|
|
||||||
pthread_mutex_unlock(&tsRefMutex);
|
pthread_mutex_unlock(&tsRefMutex);
|
||||||
|
|
||||||
|
@ -127,7 +127,7 @@ int taosCloseRef(int rsetId)
|
||||||
|
|
||||||
pthread_mutex_lock(&tsRefMutex);
|
pthread_mutex_lock(&tsRefMutex);
|
||||||
|
|
||||||
if (pSet->state == TSDB_REF_STATE_ACTIVE) {
|
if (pSet->state == TSDB_REF_STATE_ACTIVE) {
|
||||||
pSet->state = TSDB_REF_STATE_DELETED;
|
pSet->state = TSDB_REF_STATE_DELETED;
|
||||||
deleted = 1;
|
deleted = 1;
|
||||||
uTrace("rsetId:%d is closed, count:%d", rsetId, pSet->count);
|
uTrace("rsetId:%d is closed, count:%d", rsetId, pSet->count);
|
||||||
|
@ -142,7 +142,7 @@ int taosCloseRef(int rsetId)
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t taosAddRef(int rsetId, void *p)
|
int64_t taosAddRef(int rsetId, void *p)
|
||||||
{
|
{
|
||||||
int hash;
|
int hash;
|
||||||
SRefNode *pNode;
|
SRefNode *pNode;
|
||||||
|
@ -163,7 +163,7 @@ int64_t taosAddRef(int rsetId, void *p)
|
||||||
terrno = TSDB_CODE_REF_ID_REMOVED;
|
terrno = TSDB_CODE_REF_ID_REMOVED;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
pNode = calloc(sizeof(SRefNode), 1);
|
pNode = calloc(sizeof(SRefNode), 1);
|
||||||
if (pNode == NULL) {
|
if (pNode == NULL) {
|
||||||
terrno = TSDB_CODE_REF_NO_MEMORY;
|
terrno = TSDB_CODE_REF_NO_MEMORY;
|
||||||
|
@ -173,7 +173,7 @@ int64_t taosAddRef(int rsetId, void *p)
|
||||||
rid = atomic_add_fetch_64(&pSet->rid, 1);
|
rid = atomic_add_fetch_64(&pSet->rid, 1);
|
||||||
hash = rid % pSet->max;
|
hash = rid % pSet->max;
|
||||||
taosLockList(pSet->lockedBy+hash);
|
taosLockList(pSet->lockedBy+hash);
|
||||||
|
|
||||||
pNode->p = p;
|
pNode->p = p;
|
||||||
pNode->rid = rid;
|
pNode->rid = rid;
|
||||||
pNode->count = 1;
|
pNode->count = 1;
|
||||||
|
@ -187,16 +187,16 @@ int64_t taosAddRef(int rsetId, void *p)
|
||||||
|
|
||||||
taosUnlockList(pSet->lockedBy+hash);
|
taosUnlockList(pSet->lockedBy+hash);
|
||||||
|
|
||||||
return rid;
|
return rid;
|
||||||
}
|
}
|
||||||
|
|
||||||
int taosRemoveRef(int rsetId, int64_t rid)
|
int taosRemoveRef(int rsetId, int64_t rid)
|
||||||
{
|
{
|
||||||
return taosDecRefCount(rsetId, rid, 1);
|
return taosDecRefCount(rsetId, rid, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
// if rid is 0, return the first p in hash list, otherwise, return the next after current rid
|
// if rid is 0, return the first p in hash list, otherwise, return the next after current rid
|
||||||
void *taosAcquireRef(int rsetId, int64_t rid)
|
void *taosAcquireRef(int rsetId, int64_t rid)
|
||||||
{
|
{
|
||||||
int hash;
|
int hash;
|
||||||
SRefNode *pNode;
|
SRefNode *pNode;
|
||||||
|
@ -223,7 +223,7 @@ void *taosAcquireRef(int rsetId, int64_t rid)
|
||||||
terrno = TSDB_CODE_REF_ID_REMOVED;
|
terrno = TSDB_CODE_REF_ID_REMOVED;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
hash = rid % pSet->max;
|
hash = rid % pSet->max;
|
||||||
taosLockList(pSet->lockedBy+hash);
|
taosLockList(pSet->lockedBy+hash);
|
||||||
|
|
||||||
|
@ -233,8 +233,8 @@ void *taosAcquireRef(int rsetId, int64_t rid)
|
||||||
if (pNode->rid == rid) {
|
if (pNode->rid == rid) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
pNode = pNode->next;
|
pNode = pNode->next;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pNode) {
|
if (pNode) {
|
||||||
|
@ -258,7 +258,7 @@ void *taosAcquireRef(int rsetId, int64_t rid)
|
||||||
return p;
|
return p;
|
||||||
}
|
}
|
||||||
|
|
||||||
int taosReleaseRef(int rsetId, int64_t rid)
|
int taosReleaseRef(int rsetId, int64_t rid)
|
||||||
{
|
{
|
||||||
return taosDecRefCount(rsetId, rid, 0);
|
return taosDecRefCount(rsetId, rid, 0);
|
||||||
}
|
}
|
||||||
|
@ -280,6 +280,7 @@ void *taosIterateRef(int rsetId, int64_t rid) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void *newP = NULL;
|
||||||
pSet = tsRefSetList + rsetId;
|
pSet = tsRefSetList + rsetId;
|
||||||
taosIncRsetCount(pSet);
|
taosIncRsetCount(pSet);
|
||||||
if (pSet->state != TSDB_REF_STATE_ACTIVE) {
|
if (pSet->state != TSDB_REF_STATE_ACTIVE) {
|
||||||
|
@ -289,52 +290,56 @@ void *taosIterateRef(int rsetId, int64_t rid) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
int hash = 0;
|
do {
|
||||||
if (rid > 0) {
|
newP = NULL;
|
||||||
hash = rid % pSet->max;
|
int hash = 0;
|
||||||
taosLockList(pSet->lockedBy+hash);
|
if (rid > 0) {
|
||||||
|
hash = rid % pSet->max;
|
||||||
|
taosLockList(pSet->lockedBy+hash);
|
||||||
|
|
||||||
pNode = pSet->nodeList[hash];
|
pNode = pSet->nodeList[hash];
|
||||||
while (pNode) {
|
while (pNode) {
|
||||||
if (pNode->rid == rid) break;
|
if (pNode->rid == rid) break;
|
||||||
pNode = pNode->next;
|
pNode = pNode->next;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pNode == NULL) {
|
||||||
|
uError("rsetId:%d rid:%" PRId64 " not there, quit", rsetId, rid);
|
||||||
|
terrno = TSDB_CODE_REF_NOT_EXIST;
|
||||||
|
taosUnlockList(pSet->lockedBy+hash);
|
||||||
|
taosDecRsetCount(pSet);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
// rid is there
|
||||||
|
pNode = pNode->next;
|
||||||
|
if (pNode == NULL) {
|
||||||
|
taosUnlockList(pSet->lockedBy+hash);
|
||||||
|
hash++;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pNode == NULL) {
|
if (pNode == NULL) {
|
||||||
uError("rsetId:%d rid:%" PRId64 " not there, quit", rsetId, rid);
|
for (; hash < pSet->max; ++hash) {
|
||||||
terrno = TSDB_CODE_REF_NOT_EXIST;
|
taosLockList(pSet->lockedBy+hash);
|
||||||
taosUnlockList(pSet->lockedBy+hash);
|
pNode = pSet->nodeList[hash];
|
||||||
return NULL;
|
if (pNode) break;
|
||||||
|
taosUnlockList(pSet->lockedBy+hash);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// rid is there
|
if (pNode) {
|
||||||
pNode = pNode->next;
|
pNode->count++; // acquire it
|
||||||
if (pNode == NULL) {
|
newP = pNode->p;
|
||||||
taosUnlockList(pSet->lockedBy+hash);
|
taosUnlockList(pSet->lockedBy+hash);
|
||||||
hash++;
|
uTrace("rsetId:%d p:%p rid:%" PRId64 " is returned", rsetId, newP, rid);
|
||||||
|
} else {
|
||||||
|
uTrace("rsetId:%d the list is over", rsetId);
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
if (pNode == NULL) {
|
if (rid > 0) taosReleaseRef(rsetId, rid); // release the current one
|
||||||
for (; hash < pSet->max; ++hash) {
|
if (pNode) rid = pNode->rid;
|
||||||
taosLockList(pSet->lockedBy+hash);
|
} while (newP && pNode->removed);
|
||||||
pNode = pSet->nodeList[hash];
|
|
||||||
if (pNode) break;
|
|
||||||
taosUnlockList(pSet->lockedBy+hash);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void *newP = NULL;
|
|
||||||
if (pNode) {
|
|
||||||
pNode->count++; // acquire it
|
|
||||||
newP = pNode->p;
|
|
||||||
taosUnlockList(pSet->lockedBy+hash);
|
|
||||||
uTrace("rsetId:%d p:%p rid:%" PRId64 " is returned", rsetId, newP, rid);
|
|
||||||
} else {
|
|
||||||
uTrace("rsetId:%d the list is over", rsetId);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (rid > 0) taosReleaseRef(rsetId, rid); // release the current one
|
|
||||||
|
|
||||||
taosDecRsetCount(pSet);
|
taosDecRsetCount(pSet);
|
||||||
|
|
||||||
|
@ -350,22 +355,22 @@ int taosListRef() {
|
||||||
|
|
||||||
for (int i = 0; i < TSDB_REF_OBJECTS; ++i) {
|
for (int i = 0; i < TSDB_REF_OBJECTS; ++i) {
|
||||||
pSet = tsRefSetList + i;
|
pSet = tsRefSetList + i;
|
||||||
|
|
||||||
if (pSet->state == TSDB_REF_STATE_EMPTY)
|
if (pSet->state == TSDB_REF_STATE_EMPTY)
|
||||||
continue;
|
continue;
|
||||||
|
|
||||||
uInfo("rsetId:%d state:%d count::%d", i, pSet->state, pSet->count);
|
uInfo("rsetId:%d state:%d count::%d", i, pSet->state, pSet->count);
|
||||||
|
|
||||||
for (int j=0; j < pSet->max; ++j) {
|
for (int j=0; j < pSet->max; ++j) {
|
||||||
pNode = pSet->nodeList[j];
|
pNode = pSet->nodeList[j];
|
||||||
|
|
||||||
while (pNode) {
|
while (pNode) {
|
||||||
uInfo("rsetId:%d p:%p rid:%" PRId64 "count:%d", i, pNode->p, pNode->rid, pNode->count);
|
uInfo("rsetId:%d p:%p rid:%" PRId64 "count:%d", i, pNode->p, pNode->rid, pNode->count);
|
||||||
pNode = pNode->next;
|
pNode = pNode->next;
|
||||||
num++;
|
num++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pthread_mutex_unlock(&tsRefMutex);
|
pthread_mutex_unlock(&tsRefMutex);
|
||||||
|
|
||||||
|
@ -397,16 +402,16 @@ static int taosDecRefCount(int rsetId, int64_t rid, int remove) {
|
||||||
terrno = TSDB_CODE_REF_ID_REMOVED;
|
terrno = TSDB_CODE_REF_ID_REMOVED;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
hash = rid % pSet->max;
|
hash = rid % pSet->max;
|
||||||
taosLockList(pSet->lockedBy+hash);
|
taosLockList(pSet->lockedBy+hash);
|
||||||
|
|
||||||
pNode = pSet->nodeList[hash];
|
pNode = pSet->nodeList[hash];
|
||||||
while (pNode) {
|
while (pNode) {
|
||||||
if (pNode->rid == rid)
|
if (pNode->rid == rid)
|
||||||
break;
|
break;
|
||||||
|
|
||||||
pNode = pNode->next;
|
pNode = pNode->next;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pNode) {
|
if (pNode) {
|
||||||
|
@ -417,18 +422,18 @@ static int taosDecRefCount(int rsetId, int64_t rid, int remove) {
|
||||||
if (pNode->prev) {
|
if (pNode->prev) {
|
||||||
pNode->prev->next = pNode->next;
|
pNode->prev->next = pNode->next;
|
||||||
} else {
|
} else {
|
||||||
pSet->nodeList[hash] = pNode->next;
|
pSet->nodeList[hash] = pNode->next;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pNode->next) {
|
if (pNode->next) {
|
||||||
pNode->next->prev = pNode->prev;
|
pNode->next->prev = pNode->prev;
|
||||||
}
|
}
|
||||||
released = 1;
|
released = 1;
|
||||||
} else {
|
} else {
|
||||||
uTrace("rsetId:%d p:%p rid:%" PRId64 " is released", rsetId, pNode->p, rid);
|
uTrace("rsetId:%d p:%p rid:%" PRId64 " is released", rsetId, pNode->p, rid);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
uTrace("rsetId:%d rid:%" PRId64 " is not there, failed to release/remove", rsetId, rid);
|
uTrace("rsetId:%d rid:%" PRId64 " is not there, failed to release/remove", rsetId, rid);
|
||||||
terrno = TSDB_CODE_REF_NOT_EXIST;
|
terrno = TSDB_CODE_REF_NOT_EXIST;
|
||||||
code = -1;
|
code = -1;
|
||||||
}
|
}
|
||||||
|
@ -437,11 +442,11 @@ static int taosDecRefCount(int rsetId, int64_t rid, int remove) {
|
||||||
|
|
||||||
if (released) {
|
if (released) {
|
||||||
uTrace("rsetId:%d p:%p rid:%" PRId64 " is removed, count:%d, free mem: %p", rsetId, pNode->p, rid, pSet->count, pNode);
|
uTrace("rsetId:%d p:%p rid:%" PRId64 " is removed, count:%d, free mem: %p", rsetId, pNode->p, rid, pSet->count, pNode);
|
||||||
(*pSet->fp)(pNode->p);
|
(*pSet->fp)(pNode->p);
|
||||||
free(pNode);
|
free(pNode);
|
||||||
|
|
||||||
taosDecRsetCount(pSet);
|
taosDecRsetCount(pSet);
|
||||||
}
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -493,5 +498,5 @@ static void taosDecRsetCount(SRefSet *pSet) {
|
||||||
}
|
}
|
||||||
|
|
||||||
pthread_mutex_unlock(&tsRefMutex);
|
pthread_mutex_unlock(&tsRefMutex);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue