first version for taos file
This commit is contained in:
parent
e2cc48af9e
commit
428524bd08
|
@ -0,0 +1,43 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef TDENGINE_TFILE_H
|
||||||
|
#define TDENGINE_TFILE_H
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
extern "C" {
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#include <unistd.h>
|
||||||
|
|
||||||
|
// the same syntax as UNIX standard open/close/read/write
|
||||||
|
// but FD is int64_t and will never be reused
|
||||||
|
|
||||||
|
int64_t tfopen(const char *pathname, int flags);
|
||||||
|
int64_t tfclose(int64_t tfd);
|
||||||
|
ssize_t tfwrite(int64_t tfd, const void *buf, size_t count);
|
||||||
|
ssize_t tfread(int64_t tfd, void *buf, size_t count);
|
||||||
|
|
||||||
|
// init taos file module
|
||||||
|
int tfinit();
|
||||||
|
|
||||||
|
// clean up taos fle module
|
||||||
|
void tfcleanup();
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#endif // TDENGINE_TREF_H
|
|
@ -21,38 +21,45 @@
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
// open an instance, return refId which will be used by other APIs
|
// open a reference set, max is the mod used by hash, fp is the pointer to free resource function
|
||||||
int taosOpenRef(int max, void (*fp)(void *));
|
// return rsetId which will be used by other APIs. On error, -1 is returned, and terrno is set appropriately
|
||||||
|
int taosOpenRef(int max, void (*fp)(void *));
|
||||||
|
|
||||||
// close the Ref instance
|
// close the reference set, refId is the return value by taosOpenRef
|
||||||
void taosCloseRef(int refId);
|
// return 0 if success. On error, -1 is returned, and terrno is set appropriately
|
||||||
|
int taosCloseRef(int refId);
|
||||||
|
|
||||||
// add ref, p is the pointer to resource or pointer ID
|
// add ref, p is the pointer to resource or pointer ID
|
||||||
int taosAddRef(int refId, void *p);
|
// return Reference ID(rid) allocated. On error, -1 is returned, and terrno is set appropriately
|
||||||
|
int64_t taosAddRef(int refId, void *p);
|
||||||
#define taosRemoveRef taosReleaseRef
|
#define taosRemoveRef taosReleaseRef
|
||||||
|
|
||||||
// acquire ref, p is the pointer to resource or pointer ID
|
// acquire ref, rid is the reference ID returned by taosAddRef
|
||||||
int taosAcquireRef(int refId, void *p);
|
// return the resource p. On error, NULL is returned, and terrno is set appropriately
|
||||||
|
void *taosAcquireRef(int rsetId, int64_t rid);
|
||||||
|
|
||||||
// release ref, p is the pointer to resource or pinter ID
|
// release ref, rid is the reference ID returned by taosAddRef
|
||||||
void taosReleaseRef(int refId, void *p);
|
// return 0 if success. On error, -1 is returned, and terrno is set appropriately
|
||||||
|
int taosReleaseRef(int rsetId, int64_t rid);
|
||||||
|
|
||||||
// return the first if p is null, otherwise return the next after p
|
// return the first reference if rid is 0, otherwise return the next after current reference.
|
||||||
void *taosIterateRef(int refId, void *p);
|
// if return value is NULL, it means list is over(if terrno is set, it means error happens)
|
||||||
|
void *taosIterateRef(int rsetId, int64_t rid);
|
||||||
|
|
||||||
// return the number of references in system
|
// return the number of references in system
|
||||||
int taosListRef();
|
int taosListRef();
|
||||||
|
|
||||||
/* sample code to iterate the refs
|
/* sample code to iterate the refs
|
||||||
|
|
||||||
void demoIterateRefs(int refId) {
|
void demoIterateRefs(int rsetId) {
|
||||||
|
|
||||||
void *p = taosIterateRef(refId, NULL);
|
void *p = taosIterateRef(refId, 0);
|
||||||
while (p) {
|
while (p) {
|
||||||
|
|
||||||
// process P
|
// process P
|
||||||
|
|
||||||
|
// get the rid from p
|
||||||
|
|
||||||
p = taosIterateRef(refId, p);
|
p = taosIterateRef(rsetId, rid);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,94 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "os.h"
|
||||||
|
#include "taoserror.h"
|
||||||
|
#include "tulog.h"
|
||||||
|
#include "tutil.h"
|
||||||
|
#include "tref.h"
|
||||||
|
|
||||||
|
static int tsFileRsetId = -1;
|
||||||
|
|
||||||
|
static void taosCloseFile(void *p) {
|
||||||
|
close((int)(uintptr_t)p);
|
||||||
|
}
|
||||||
|
|
||||||
|
int tfinit() {
|
||||||
|
|
||||||
|
tsFileRsetId = taosOpenRef(2000, taosCloseFile);
|
||||||
|
return tsFileRsetId;
|
||||||
|
}
|
||||||
|
|
||||||
|
void tfcleanup() {
|
||||||
|
|
||||||
|
if (tsFileRsetId >= 0) taosCloseRef(tsFileRsetId);
|
||||||
|
tsFileRsetId = -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t tfopen(const char *pathname, int flags) {
|
||||||
|
int fd = open(pathname, flags);
|
||||||
|
|
||||||
|
if (fd < 0) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t rid = taosAddRef(tsFileRsetId, (void *)(long)fd);
|
||||||
|
if (rid < 0) {
|
||||||
|
close(fd);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return rid;
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t tfclose(int64_t tfd) {
|
||||||
|
return taosReleaseRef(tsFileRsetId, tfd);
|
||||||
|
}
|
||||||
|
|
||||||
|
ssize_t tfwrite(int64_t tfd, const void *buf, size_t count) {
|
||||||
|
|
||||||
|
void *p = taosAcquireRef(tsFileRsetId, tfd);
|
||||||
|
if (p == NULL) return terrno;
|
||||||
|
|
||||||
|
int fd = (int)(uintptr_t)p;
|
||||||
|
|
||||||
|
ssize_t ret = write(fd, buf, count);
|
||||||
|
if (ret < 0) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
taosReleaseRef(tsFileRsetId, tfd);
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
ssize_t tfread(int64_t tfd, void *buf, size_t count) {
|
||||||
|
|
||||||
|
void *p = taosAcquireRef(tsFileRsetId, tfd);
|
||||||
|
if (p == NULL) return terrno;
|
||||||
|
|
||||||
|
int fd = (int)(uintptr_t)p;
|
||||||
|
|
||||||
|
ssize_t ret = read(fd, buf, count);
|
||||||
|
if (ret < 0) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
taosReleaseRef(tsFileRsetId, tfd);
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
|
@ -24,19 +24,21 @@
|
||||||
#define TSDB_REF_STATE_DELETED 2
|
#define TSDB_REF_STATE_DELETED 2
|
||||||
|
|
||||||
typedef struct SRefNode {
|
typedef struct SRefNode {
|
||||||
struct SRefNode *prev;
|
struct SRefNode *prev; // previous node
|
||||||
struct SRefNode *next;
|
struct SRefNode *next; // next node
|
||||||
void *p;
|
void *p; // pointer to resource protected,
|
||||||
int32_t count;
|
int64_t rid; // reference ID
|
||||||
|
int32_t count; // number of references
|
||||||
} SRefNode;
|
} SRefNode;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
SRefNode **nodeList;
|
SRefNode **nodeList; // array of SRefNode linked list
|
||||||
int state; // 0: empty, 1: active; 2: deleted
|
int state; // 0: empty, 1: active; 2: deleted
|
||||||
int refId;
|
int rsetId; // refSet ID, global unique
|
||||||
int max;
|
int64_t rid; // increase by one for each new reference
|
||||||
int32_t count; // total number of SRefNodes in this set
|
int max; // mod
|
||||||
int64_t *lockedBy;
|
int32_t count; // total number of SRefNodes in this set
|
||||||
|
int64_t *lockedBy;
|
||||||
void (*fp)(void *);
|
void (*fp)(void *);
|
||||||
} SRefSet;
|
} SRefSet;
|
||||||
|
|
||||||
|
@ -47,7 +49,6 @@ static int tsRefSetNum = 0;
|
||||||
static int tsNextId = 0;
|
static int tsNextId = 0;
|
||||||
|
|
||||||
static void taosInitRefModule(void);
|
static void taosInitRefModule(void);
|
||||||
static int taosHashRef(SRefSet *pSet, void *p);
|
|
||||||
static void taosLockList(int64_t *lockedBy);
|
static void taosLockList(int64_t *lockedBy);
|
||||||
static void taosUnlockList(int64_t *lockedBy);
|
static void taosUnlockList(int64_t *lockedBy);
|
||||||
static void taosIncRefCount(SRefSet *pSet);
|
static void taosIncRefCount(SRefSet *pSet);
|
||||||
|
@ -58,19 +59,21 @@ int taosOpenRef(int max, void (*fp)(void *))
|
||||||
SRefNode **nodeList;
|
SRefNode **nodeList;
|
||||||
SRefSet *pSet;
|
SRefSet *pSet;
|
||||||
int64_t *lockedBy;
|
int64_t *lockedBy;
|
||||||
int i, refId;
|
int i, rsetId;
|
||||||
|
|
||||||
pthread_once(&tsRefModuleInit, taosInitRefModule);
|
pthread_once(&tsRefModuleInit, taosInitRefModule);
|
||||||
|
|
||||||
nodeList = calloc(sizeof(SRefNode *), (size_t)max);
|
nodeList = calloc(sizeof(SRefNode *), (size_t)max);
|
||||||
if (nodeList == NULL) {
|
if (nodeList == NULL) {
|
||||||
return TSDB_CODE_REF_NO_MEMORY;
|
terrno = TSDB_CODE_REF_NO_MEMORY;
|
||||||
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
lockedBy = calloc(sizeof(int64_t), (size_t)max);
|
lockedBy = calloc(sizeof(int64_t), (size_t)max);
|
||||||
if (lockedBy == NULL) {
|
if (lockedBy == NULL) {
|
||||||
free(nodeList);
|
free(nodeList);
|
||||||
return TSDB_CODE_REF_NO_MEMORY;
|
terrno = TSDB_CODE_REF_NO_MEMORY;
|
||||||
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
pthread_mutex_lock(&tsRefMutex);
|
pthread_mutex_lock(&tsRefMutex);
|
||||||
|
@ -81,20 +84,21 @@ int taosOpenRef(int max, void (*fp)(void *))
|
||||||
}
|
}
|
||||||
|
|
||||||
if (i < TSDB_REF_OBJECTS) {
|
if (i < TSDB_REF_OBJECTS) {
|
||||||
refId = tsNextId;
|
rsetId = tsNextId;
|
||||||
pSet = tsRefSetList + refId;
|
pSet = tsRefSetList + rsetId;
|
||||||
taosIncRefCount(pSet);
|
taosIncRefCount(pSet);
|
||||||
pSet->max = max;
|
pSet->max = max;
|
||||||
pSet->nodeList = nodeList;
|
pSet->nodeList = nodeList;
|
||||||
pSet->lockedBy = lockedBy;
|
pSet->lockedBy = lockedBy;
|
||||||
pSet->fp = fp;
|
pSet->fp = fp;
|
||||||
|
pSet->rid = 1;
|
||||||
|
pSet->rsetId = rsetId;
|
||||||
pSet->state = TSDB_REF_STATE_ACTIVE;
|
pSet->state = TSDB_REF_STATE_ACTIVE;
|
||||||
pSet->refId = refId;
|
|
||||||
|
|
||||||
tsRefSetNum++;
|
tsRefSetNum++;
|
||||||
uTrace("refId:%d is opened, max:%d, fp:%p refSetNum:%d", refId, max, fp, tsRefSetNum);
|
uTrace("rsetId:%d is opened, max:%d, fp:%p refSetNum:%d", rsetId, max, fp, tsRefSetNum);
|
||||||
} else {
|
} else {
|
||||||
refId = TSDB_CODE_REF_FULL;
|
rsetId = TSDB_CODE_REF_FULL;
|
||||||
free (nodeList);
|
free (nodeList);
|
||||||
free (lockedBy);
|
free (lockedBy);
|
||||||
uTrace("run out of Ref ID, maximum:%d refSetNum:%d", TSDB_REF_OBJECTS, tsRefSetNum);
|
uTrace("run out of Ref ID, maximum:%d refSetNum:%d", TSDB_REF_OBJECTS, tsRefSetNum);
|
||||||
|
@ -102,121 +106,116 @@ int taosOpenRef(int max, void (*fp)(void *))
|
||||||
|
|
||||||
pthread_mutex_unlock(&tsRefMutex);
|
pthread_mutex_unlock(&tsRefMutex);
|
||||||
|
|
||||||
return refId;
|
return rsetId;
|
||||||
}
|
}
|
||||||
|
|
||||||
void taosCloseRef(int refId)
|
int taosCloseRef(int rsetId)
|
||||||
{
|
{
|
||||||
SRefSet *pSet;
|
SRefSet *pSet;
|
||||||
int deleted = 0;
|
int deleted = 0;
|
||||||
|
|
||||||
if (refId < 0 || refId >= TSDB_REF_OBJECTS) {
|
if (rsetId < 0 || rsetId >= TSDB_REF_OBJECTS) {
|
||||||
uTrace("refId:%d is invalid, out of range", refId);
|
uTrace("rsetId:%d is invalid, out of range", rsetId);
|
||||||
return;
|
terrno = TSDB_CODE_REF_INVALID_ID;
|
||||||
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
pSet = tsRefSetList + refId;
|
pSet = tsRefSetList + rsetId;
|
||||||
|
|
||||||
pthread_mutex_lock(&tsRefMutex);
|
pthread_mutex_lock(&tsRefMutex);
|
||||||
|
|
||||||
if (pSet->state == TSDB_REF_STATE_ACTIVE) {
|
if (pSet->state == TSDB_REF_STATE_ACTIVE) {
|
||||||
pSet->state = TSDB_REF_STATE_DELETED;
|
pSet->state = TSDB_REF_STATE_DELETED;
|
||||||
deleted = 1;
|
deleted = 1;
|
||||||
uTrace("refId:%d is closed, count:%d", refId, pSet->count);
|
uTrace("rsetId:%d is closed, count:%d", rsetId, pSet->count);
|
||||||
} else {
|
} else {
|
||||||
uTrace("refId:%d is already closed, count:%d", refId, pSet->count);
|
uTrace("rsetId:%d is already closed, count:%d", rsetId, pSet->count);
|
||||||
}
|
}
|
||||||
|
|
||||||
pthread_mutex_unlock(&tsRefMutex);
|
pthread_mutex_unlock(&tsRefMutex);
|
||||||
|
|
||||||
if (deleted) taosDecRefCount(pSet);
|
if (deleted) taosDecRefCount(pSet);
|
||||||
|
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int taosAddRef(int refId, void *p)
|
int64_t taosAddRef(int rsetId, void *p)
|
||||||
{
|
{
|
||||||
int hash;
|
int hash;
|
||||||
SRefNode *pNode;
|
SRefNode *pNode;
|
||||||
SRefSet *pSet;
|
SRefSet *pSet;
|
||||||
|
int64_t rid = 0;
|
||||||
|
|
||||||
if (refId < 0 || refId >= TSDB_REF_OBJECTS) {
|
if (rsetId < 0 || rsetId >= TSDB_REF_OBJECTS) {
|
||||||
uTrace("refId:%d p:%p failed to add, refId not valid", refId, p);
|
uTrace("rsetId:%d p:%p failed to add, rsetId not valid", rsetId, p);
|
||||||
return TSDB_CODE_REF_INVALID_ID;
|
terrno = TSDB_CODE_REF_INVALID_ID;
|
||||||
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
pSet = tsRefSetList + refId;
|
pSet = tsRefSetList + rsetId;
|
||||||
taosIncRefCount(pSet);
|
taosIncRefCount(pSet);
|
||||||
if (pSet->state != TSDB_REF_STATE_ACTIVE) {
|
if (pSet->state != TSDB_REF_STATE_ACTIVE) {
|
||||||
taosDecRefCount(pSet);
|
taosDecRefCount(pSet);
|
||||||
uTrace("refId:%d p:%p failed to add, not active", refId, p);
|
uTrace("rsetId:%d p:%p failed to add, not active", rsetId, p);
|
||||||
return TSDB_CODE_REF_ID_REMOVED;
|
terrno = TSDB_CODE_REF_ID_REMOVED;
|
||||||
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
int code = 0;
|
pNode = calloc(sizeof(SRefNode), 1);
|
||||||
hash = taosHashRef(pSet, p);
|
if (pNode == NULL) {
|
||||||
|
terrno = TSDB_CODE_REF_NO_MEMORY;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
rid = atomic_add_fetch_64(&pSet->rid, 1);
|
||||||
|
hash = rid % pSet->max;
|
||||||
taosLockList(pSet->lockedBy+hash);
|
taosLockList(pSet->lockedBy+hash);
|
||||||
|
|
||||||
pNode = pSet->nodeList[hash];
|
pNode->p = p;
|
||||||
while (pNode) {
|
pNode->rid = rid;
|
||||||
if (pNode->p == p)
|
pNode->count = 1;
|
||||||
break;
|
|
||||||
|
|
||||||
pNode = pNode->next;
|
pNode->prev = NULL;
|
||||||
}
|
pNode->next = pSet->nodeList[hash];
|
||||||
|
if (pSet->nodeList[hash]) pSet->nodeList[hash]->prev = pNode;
|
||||||
|
pSet->nodeList[hash] = pNode;
|
||||||
|
|
||||||
if (pNode) {
|
uTrace("rsetId:%d p:%p rid:%" PRId64 " is added, count:%d", rsetId, p, rid, pSet->count);
|
||||||
code = TSDB_CODE_REF_ALREADY_EXIST;
|
|
||||||
uTrace("refId:%d p:%p is already there, faild to add", refId, p);
|
|
||||||
} else {
|
|
||||||
pNode = calloc(sizeof(SRefNode), 1);
|
|
||||||
if (pNode) {
|
|
||||||
pNode->p = p;
|
|
||||||
pNode->count = 1;
|
|
||||||
pNode->prev = 0;
|
|
||||||
pNode->next = pSet->nodeList[hash];
|
|
||||||
if (pSet->nodeList[hash]) pSet->nodeList[hash]->prev = pNode;
|
|
||||||
pSet->nodeList[hash] = pNode;
|
|
||||||
uTrace("refId:%d p:%p is added, count:%d malloc mem: %p", refId, p, pSet->count, pNode);
|
|
||||||
} else {
|
|
||||||
code = TSDB_CODE_REF_NO_MEMORY;
|
|
||||||
uTrace("refId:%d p:%p is not added, since no memory", refId, p);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (code < 0) taosDecRefCount(pSet);
|
|
||||||
|
|
||||||
taosUnlockList(pSet->lockedBy+hash);
|
taosUnlockList(pSet->lockedBy+hash);
|
||||||
|
|
||||||
return code;
|
return rid;
|
||||||
}
|
}
|
||||||
|
|
||||||
int taosAcquireRef(int refId, void *p)
|
void *taosAcquireRef(int rsetId, int64_t rid)
|
||||||
{
|
{
|
||||||
int hash, code = 0;
|
int hash;
|
||||||
SRefNode *pNode;
|
SRefNode *pNode;
|
||||||
SRefSet *pSet;
|
SRefSet *pSet;
|
||||||
|
void *p = NULL;
|
||||||
|
|
||||||
if (refId < 0 || refId >= TSDB_REF_OBJECTS) {
|
if (rsetId < 0 || rsetId >= TSDB_REF_OBJECTS) {
|
||||||
uTrace("refId:%d p:%p failed to acquire, refId not valid", refId, p);
|
uTrace("rsetId:%d rid:%" PRId64 " failed to acquire, rsetId not valid", rsetId, rid);
|
||||||
return TSDB_CODE_REF_INVALID_ID;
|
terrno = TSDB_CODE_REF_INVALID_ID;
|
||||||
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
pSet = tsRefSetList + refId;
|
pSet = tsRefSetList + rsetId;
|
||||||
taosIncRefCount(pSet);
|
taosIncRefCount(pSet);
|
||||||
if (pSet->state != TSDB_REF_STATE_ACTIVE) {
|
if (pSet->state != TSDB_REF_STATE_ACTIVE) {
|
||||||
uTrace("refId:%d p:%p failed to acquire, not active", refId, p);
|
uTrace("rsetId:%d rid:%" PRId64 " failed to acquire, not active", rsetId, rid);
|
||||||
taosDecRefCount(pSet);
|
taosDecRefCount(pSet);
|
||||||
return TSDB_CODE_REF_ID_REMOVED;
|
terrno = TSDB_CODE_REF_ID_REMOVED;
|
||||||
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
hash = taosHashRef(pSet, p);
|
hash = rid % pSet->max;
|
||||||
|
|
||||||
taosLockList(pSet->lockedBy+hash);
|
taosLockList(pSet->lockedBy+hash);
|
||||||
|
|
||||||
pNode = pSet->nodeList[hash];
|
pNode = pSet->nodeList[hash];
|
||||||
|
|
||||||
while (pNode) {
|
while (pNode) {
|
||||||
if (pNode->p == p) {
|
if (pNode->rid == rid) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -225,44 +224,47 @@ int taosAcquireRef(int refId, void *p)
|
||||||
|
|
||||||
if (pNode) {
|
if (pNode) {
|
||||||
pNode->count++;
|
pNode->count++;
|
||||||
uTrace("refId:%d p:%p is acquired", refId, p);
|
p = pNode->p;
|
||||||
|
uTrace("rsetId:%d p:%p rid:%" PRId64 " is acquired", rsetId, pNode->p, rid);
|
||||||
} else {
|
} else {
|
||||||
code = TSDB_CODE_REF_NOT_EXIST;
|
terrno = TSDB_CODE_REF_NOT_EXIST;
|
||||||
uTrace("refId:%d p:%p is not there, failed to acquire", refId, p);
|
uTrace("rsetId:%d rid:%" PRId64 " is not there, failed to acquire", rsetId, rid);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosUnlockList(pSet->lockedBy+hash);
|
taosUnlockList(pSet->lockedBy+hash);
|
||||||
|
|
||||||
taosDecRefCount(pSet);
|
taosDecRefCount(pSet);
|
||||||
|
|
||||||
return code;
|
return p;
|
||||||
}
|
}
|
||||||
|
|
||||||
void taosReleaseRef(int refId, void *p)
|
int taosReleaseRef(int rsetId, int64_t rid)
|
||||||
{
|
{
|
||||||
int hash;
|
int hash;
|
||||||
SRefNode *pNode;
|
SRefNode *pNode;
|
||||||
SRefSet *pSet;
|
SRefSet *pSet;
|
||||||
int released = 0;
|
int released = 0;
|
||||||
|
|
||||||
if (refId < 0 || refId >= TSDB_REF_OBJECTS) {
|
if (rsetId < 0 || rsetId >= TSDB_REF_OBJECTS) {
|
||||||
uTrace("refId:%d p:%p failed to release, refId not valid", refId, p);
|
uTrace("rsetId:%d rid:%" PRId64 " failed to release, rsetId not valid", rsetId, rid);
|
||||||
return;
|
terrno = TSDB_CODE_REF_INVALID_ID;
|
||||||
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
pSet = tsRefSetList + refId;
|
pSet = tsRefSetList + rsetId;
|
||||||
if (pSet->state == TSDB_REF_STATE_EMPTY) {
|
if (pSet->state == TSDB_REF_STATE_EMPTY) {
|
||||||
uTrace("refId:%d p:%p failed to release, cleaned", refId, p);
|
uTrace("rsetId:%d rid:%" PRId64 " failed to release, cleaned", rsetId, rid);
|
||||||
return;
|
terrno = TSDB_CODE_REF_ID_REMOVED;
|
||||||
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
hash = taosHashRef(pSet, p);
|
terrno = 0;
|
||||||
|
hash = rid % pSet->max;
|
||||||
taosLockList(pSet->lockedBy+hash);
|
taosLockList(pSet->lockedBy+hash);
|
||||||
|
|
||||||
pNode = pSet->nodeList[hash];
|
pNode = pSet->nodeList[hash];
|
||||||
while (pNode) {
|
while (pNode) {
|
||||||
if (pNode->p == p)
|
if (pNode->rid == rid)
|
||||||
break;
|
break;
|
||||||
|
|
||||||
pNode = pNode->next;
|
pNode = pNode->next;
|
||||||
|
@ -284,57 +286,63 @@ void taosReleaseRef(int refId, void *p)
|
||||||
|
|
||||||
(*pSet->fp)(pNode->p);
|
(*pSet->fp)(pNode->p);
|
||||||
|
|
||||||
|
uTrace("rsetId:%d p:%p rid:%" PRId64 "is removed, count:%d, free mem: %p", rsetId, pNode->p, rid, pSet->count, pNode);
|
||||||
free(pNode);
|
free(pNode);
|
||||||
released = 1;
|
released = 1;
|
||||||
uTrace("refId:%d p:%p is removed, count:%d, free mem: %p", refId, p, pSet->count, pNode);
|
|
||||||
} else {
|
} else {
|
||||||
uTrace("refId:%d p:%p is released", refId, p);
|
uTrace("rsetId:%d p:%p rid:%" PRId64 "is released", rsetId, pNode->p, rid);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
uTrace("refId:%d p:%p is not there, failed to release", refId, p);
|
uTrace("rsetId:%d rid:%" PRId64 " is not there, failed to release", rsetId, rid);
|
||||||
|
terrno = TSDB_CODE_REF_NOT_EXIST;
|
||||||
}
|
}
|
||||||
|
|
||||||
taosUnlockList(pSet->lockedBy+hash);
|
taosUnlockList(pSet->lockedBy+hash);
|
||||||
|
|
||||||
if (released) taosDecRefCount(pSet);
|
if (released) taosDecRefCount(pSet);
|
||||||
|
|
||||||
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
// if p is NULL, return the first p in hash list, otherwise, return the next after p
|
// if rid is 0, return the first p in hash list, otherwise, return the next after current rid
|
||||||
void *taosIterateRef(int refId, void *p) {
|
void *taosIterateRef(int rsetId, int64_t rid) {
|
||||||
SRefNode *pNode = NULL;
|
SRefNode *pNode = NULL;
|
||||||
SRefSet *pSet;
|
SRefSet *pSet;
|
||||||
|
|
||||||
if (refId < 0 || refId >= TSDB_REF_OBJECTS) {
|
if (rsetId < 0 || rsetId >= TSDB_REF_OBJECTS) {
|
||||||
uTrace("refId:%d p:%p failed to iterate, refId not valid", refId, p);
|
uTrace("rsetId:%d rid:%" PRId64 " failed to iterate, rsetId not valid", rsetId, rid);
|
||||||
|
terrno = TSDB_CODE_REF_INVALID_ID;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
pSet = tsRefSetList + refId;
|
pSet = tsRefSetList + rsetId;
|
||||||
taosIncRefCount(pSet);
|
taosIncRefCount(pSet);
|
||||||
if (pSet->state != TSDB_REF_STATE_ACTIVE) {
|
if (pSet->state != TSDB_REF_STATE_ACTIVE) {
|
||||||
uTrace("refId:%d p:%p failed to iterate, not active", refId, p);
|
uTrace("rsetId:%d rid:%" PRId64 " failed to iterate, rset not active", rsetId, rid);
|
||||||
|
terrno = TSDB_CODE_REF_ID_REMOVED;
|
||||||
taosDecRefCount(pSet);
|
taosDecRefCount(pSet);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
int hash = 0;
|
int hash = 0;
|
||||||
if (p) {
|
if (rid > 0) {
|
||||||
hash = taosHashRef(pSet, p);
|
hash = rid % pSet->max;
|
||||||
taosLockList(pSet->lockedBy+hash);
|
taosLockList(pSet->lockedBy+hash);
|
||||||
|
|
||||||
pNode = pSet->nodeList[hash];
|
pNode = pSet->nodeList[hash];
|
||||||
while (pNode) {
|
while (pNode) {
|
||||||
if (pNode->p == p) break;
|
if (pNode->rid == rid) break;
|
||||||
pNode = pNode->next;
|
pNode = pNode->next;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pNode == NULL) {
|
if (pNode == NULL) {
|
||||||
uError("refId:%d p:%p not there, quit", refId, p);
|
uError("rsetId:%d rid:%" PRId64 " not there, quit", rsetId, rid);
|
||||||
|
terrno = TSDB_CODE_REF_NOT_EXIST;
|
||||||
taosUnlockList(pSet->lockedBy+hash);
|
taosUnlockList(pSet->lockedBy+hash);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
// p is there
|
// rid is there
|
||||||
pNode = pNode->next;
|
pNode = pNode->next;
|
||||||
if (pNode == NULL) {
|
if (pNode == NULL) {
|
||||||
taosUnlockList(pSet->lockedBy+hash);
|
taosUnlockList(pSet->lockedBy+hash);
|
||||||
|
@ -356,12 +364,12 @@ void *taosIterateRef(int refId, void *p) {
|
||||||
pNode->count++; // acquire it
|
pNode->count++; // acquire it
|
||||||
newP = pNode->p;
|
newP = pNode->p;
|
||||||
taosUnlockList(pSet->lockedBy+hash);
|
taosUnlockList(pSet->lockedBy+hash);
|
||||||
uTrace("refId:%d p:%p is returned", refId, p);
|
uTrace("rsetId:%d p:%p rid:%" PRId64 " is returned", rsetId, newP, rid);
|
||||||
} else {
|
} else {
|
||||||
uTrace("refId:%d p:%p the list is over", refId, p);
|
uTrace("rsetId:%d the list is over", rsetId);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (p) taosReleaseRef(refId, p); // release the current one
|
if (rid > 0) taosReleaseRef(rsetId, rid); // release the current one
|
||||||
|
|
||||||
taosDecRefCount(pSet);
|
taosDecRefCount(pSet);
|
||||||
|
|
||||||
|
@ -381,13 +389,13 @@ int taosListRef() {
|
||||||
if (pSet->state == TSDB_REF_STATE_EMPTY)
|
if (pSet->state == TSDB_REF_STATE_EMPTY)
|
||||||
continue;
|
continue;
|
||||||
|
|
||||||
uInfo("refId:%d state:%d count::%d", i, pSet->state, pSet->count);
|
uInfo("rsetId:%d state:%d count::%d", i, pSet->state, pSet->count);
|
||||||
|
|
||||||
for (int j=0; j < pSet->max; ++j) {
|
for (int j=0; j < pSet->max; ++j) {
|
||||||
pNode = pSet->nodeList[j];
|
pNode = pSet->nodeList[j];
|
||||||
|
|
||||||
while (pNode) {
|
while (pNode) {
|
||||||
uInfo("refId:%d p:%p count:%d", i, pNode->p, pNode->count);
|
uInfo("rsetId:%d p:%p rid:%" PRId64 "count:%d", i, pNode->p, pNode->rid, pNode->count);
|
||||||
pNode = pNode->next;
|
pNode = pNode->next;
|
||||||
num++;
|
num++;
|
||||||
}
|
}
|
||||||
|
@ -399,22 +407,6 @@ int taosListRef() {
|
||||||
return num;
|
return num;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int taosHashRef(SRefSet *pSet, void *p)
|
|
||||||
{
|
|
||||||
int hash = 0;
|
|
||||||
int64_t v = (int64_t)p;
|
|
||||||
|
|
||||||
for (int i = 0; i < sizeof(v); ++i) {
|
|
||||||
hash += (int)(v & 0xFFFF);
|
|
||||||
v = v >> 16;
|
|
||||||
i = i + 2;
|
|
||||||
}
|
|
||||||
|
|
||||||
hash = hash % pSet->max;
|
|
||||||
|
|
||||||
return hash;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void taosLockList(int64_t *lockedBy) {
|
static void taosLockList(int64_t *lockedBy) {
|
||||||
int64_t tid = taosGetPthreadId();
|
int64_t tid = taosGetPthreadId();
|
||||||
int i = 0;
|
int i = 0;
|
||||||
|
@ -438,12 +430,12 @@ static void taosInitRefModule(void) {
|
||||||
|
|
||||||
static void taosIncRefCount(SRefSet *pSet) {
|
static void taosIncRefCount(SRefSet *pSet) {
|
||||||
atomic_add_fetch_32(&pSet->count, 1);
|
atomic_add_fetch_32(&pSet->count, 1);
|
||||||
uTrace("refId:%d inc count:%d", pSet->refId, pSet->count);
|
uTrace("rsetId:%d inc count:%d", pSet->rsetId, pSet->count);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void taosDecRefCount(SRefSet *pSet) {
|
static void taosDecRefCount(SRefSet *pSet) {
|
||||||
int32_t count = atomic_sub_fetch_32(&pSet->count, 1);
|
int32_t count = atomic_sub_fetch_32(&pSet->count, 1);
|
||||||
uTrace("refId:%d dec count:%d", pSet->refId, pSet->count);
|
uTrace("rsetId:%d dec count:%d", pSet->rsetId, pSet->count);
|
||||||
|
|
||||||
if (count > 0) return;
|
if (count > 0) return;
|
||||||
|
|
||||||
|
@ -458,7 +450,7 @@ static void taosDecRefCount(SRefSet *pSet) {
|
||||||
taosTFree(pSet->lockedBy);
|
taosTFree(pSet->lockedBy);
|
||||||
|
|
||||||
tsRefSetNum--;
|
tsRefSetNum--;
|
||||||
uTrace("refId:%d is cleaned, refSetNum:%d count:%d", pSet->refId, tsRefSetNum, pSet->count);
|
uTrace("rsetId:%d is cleaned, refSetNum:%d count:%d", pSet->rsetId, tsRefSetNum, pSet->count);
|
||||||
}
|
}
|
||||||
|
|
||||||
pthread_mutex_unlock(&tsRefMutex);
|
pthread_mutex_unlock(&tsRefMutex);
|
||||||
|
|
Loading…
Reference in New Issue