From 428524bd08a3daddb10974b82dca0f99c4e58112 Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Tue, 3 Nov 2020 14:05:21 +0000 Subject: [PATCH] first version for taos file --- src/util/inc/tfile.h | 43 ++++++++ src/util/inc/tref.h | 37 ++++--- src/util/src/tfile.c | 94 +++++++++++++++++ src/util/src/tref.c | 246 +++++++++++++++++++++---------------------- 4 files changed, 278 insertions(+), 142 deletions(-) create mode 100644 src/util/inc/tfile.h create mode 100644 src/util/src/tfile.c diff --git a/src/util/inc/tfile.h b/src/util/inc/tfile.h new file mode 100644 index 0000000000..00b2fd6c32 --- /dev/null +++ b/src/util/inc/tfile.h @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_TFILE_H +#define TDENGINE_TFILE_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include + +// the same syntax as UNIX standard open/close/read/write +// but FD is int64_t and will never be reused + +int64_t tfopen(const char *pathname, int flags); +int64_t tfclose(int64_t tfd); +ssize_t tfwrite(int64_t tfd, const void *buf, size_t count); +ssize_t tfread(int64_t tfd, void *buf, size_t count); + +// init taos file module +int tfinit(); + +// clean up taos fle module +void tfcleanup(); + +#ifdef __cplusplus +} +#endif + +#endif // TDENGINE_TREF_H diff --git a/src/util/inc/tref.h b/src/util/inc/tref.h index ead8e2eb90..3e5db33cf7 100644 --- a/src/util/inc/tref.h +++ b/src/util/inc/tref.h @@ -21,38 +21,45 @@ extern "C" { #endif -// open an instance, return refId which will be used by other APIs -int taosOpenRef(int max, void (*fp)(void *)); +// open a reference set, max is the mod used by hash, fp is the pointer to free resource function +// return rsetId which will be used by other APIs. On error, -1 is returned, and terrno is set appropriately +int taosOpenRef(int max, void (*fp)(void *)); -// close the Ref instance -void taosCloseRef(int refId); +// close the reference set, refId is the return value by taosOpenRef +// return 0 if success. On error, -1 is returned, and terrno is set appropriately +int taosCloseRef(int refId); // add ref, p is the pointer to resource or pointer ID -int taosAddRef(int refId, void *p); +// return Reference ID(rid) allocated. On error, -1 is returned, and terrno is set appropriately +int64_t taosAddRef(int refId, void *p); #define taosRemoveRef taosReleaseRef -// acquire ref, p is the pointer to resource or pointer ID -int taosAcquireRef(int refId, void *p); +// acquire ref, rid is the reference ID returned by taosAddRef +// return the resource p. On error, NULL is returned, and terrno is set appropriately +void *taosAcquireRef(int rsetId, int64_t rid); -// release ref, p is the pointer to resource or pinter ID -void taosReleaseRef(int refId, void *p); +// release ref, rid is the reference ID returned by taosAddRef +// return 0 if success. On error, -1 is returned, and terrno is set appropriately +int taosReleaseRef(int rsetId, int64_t rid); -// return the first if p is null, otherwise return the next after p -void *taosIterateRef(int refId, void *p); +// return the first reference if rid is 0, otherwise return the next after current reference. +// if return value is NULL, it means list is over(if terrno is set, it means error happens) +void *taosIterateRef(int rsetId, int64_t rid); // return the number of references in system int taosListRef(); /* sample code to iterate the refs -void demoIterateRefs(int refId) { +void demoIterateRefs(int rsetId) { - void *p = taosIterateRef(refId, NULL); + void *p = taosIterateRef(refId, 0); while (p) { - // process P + + // get the rid from p - p = taosIterateRef(refId, p); + p = taosIterateRef(rsetId, rid); } } diff --git a/src/util/src/tfile.c b/src/util/src/tfile.c new file mode 100644 index 0000000000..ea699c2436 --- /dev/null +++ b/src/util/src/tfile.c @@ -0,0 +1,94 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "os.h" +#include "taoserror.h" +#include "tulog.h" +#include "tutil.h" +#include "tref.h" + +static int tsFileRsetId = -1; + +static void taosCloseFile(void *p) { + close((int)(uintptr_t)p); +} + +int tfinit() { + + tsFileRsetId = taosOpenRef(2000, taosCloseFile); + return tsFileRsetId; +} + +void tfcleanup() { + + if (tsFileRsetId >= 0) taosCloseRef(tsFileRsetId); + tsFileRsetId = -1; +} + +int64_t tfopen(const char *pathname, int flags) { + int fd = open(pathname, flags); + + if (fd < 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + + int64_t rid = taosAddRef(tsFileRsetId, (void *)(long)fd); + if (rid < 0) { + close(fd); + return -1; + } + + return rid; +} + +int64_t tfclose(int64_t tfd) { + return taosReleaseRef(tsFileRsetId, tfd); +} + +ssize_t tfwrite(int64_t tfd, const void *buf, size_t count) { + + void *p = taosAcquireRef(tsFileRsetId, tfd); + if (p == NULL) return terrno; + + int fd = (int)(uintptr_t)p; + + ssize_t ret = write(fd, buf, count); + if (ret < 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + + taosReleaseRef(tsFileRsetId, tfd); + return ret; +} + +ssize_t tfread(int64_t tfd, void *buf, size_t count) { + + void *p = taosAcquireRef(tsFileRsetId, tfd); + if (p == NULL) return terrno; + + int fd = (int)(uintptr_t)p; + + ssize_t ret = read(fd, buf, count); + if (ret < 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + + taosReleaseRef(tsFileRsetId, tfd); + return ret; +} + diff --git a/src/util/src/tref.c b/src/util/src/tref.c index 23a7210e99..c2adb6d15b 100644 --- a/src/util/src/tref.c +++ b/src/util/src/tref.c @@ -24,19 +24,21 @@ #define TSDB_REF_STATE_DELETED 2 typedef struct SRefNode { - struct SRefNode *prev; - struct SRefNode *next; - void *p; - int32_t count; + struct SRefNode *prev; // previous node + struct SRefNode *next; // next node + void *p; // pointer to resource protected, + int64_t rid; // reference ID + int32_t count; // number of references } SRefNode; typedef struct { - SRefNode **nodeList; - int state; // 0: empty, 1: active; 2: deleted - int refId; - int max; - int32_t count; // total number of SRefNodes in this set - int64_t *lockedBy; + SRefNode **nodeList; // array of SRefNode linked list + int state; // 0: empty, 1: active; 2: deleted + int rsetId; // refSet ID, global unique + int64_t rid; // increase by one for each new reference + int max; // mod + int32_t count; // total number of SRefNodes in this set + int64_t *lockedBy; void (*fp)(void *); } SRefSet; @@ -47,7 +49,6 @@ static int tsRefSetNum = 0; static int tsNextId = 0; static void taosInitRefModule(void); -static int taosHashRef(SRefSet *pSet, void *p); static void taosLockList(int64_t *lockedBy); static void taosUnlockList(int64_t *lockedBy); static void taosIncRefCount(SRefSet *pSet); @@ -58,19 +59,21 @@ int taosOpenRef(int max, void (*fp)(void *)) SRefNode **nodeList; SRefSet *pSet; int64_t *lockedBy; - int i, refId; + int i, rsetId; pthread_once(&tsRefModuleInit, taosInitRefModule); nodeList = calloc(sizeof(SRefNode *), (size_t)max); - if (nodeList == NULL) { - return TSDB_CODE_REF_NO_MEMORY; + if (nodeList == NULL) { + terrno = TSDB_CODE_REF_NO_MEMORY; + return -1; } lockedBy = calloc(sizeof(int64_t), (size_t)max); if (lockedBy == NULL) { free(nodeList); - return TSDB_CODE_REF_NO_MEMORY; + terrno = TSDB_CODE_REF_NO_MEMORY; + return -1; } pthread_mutex_lock(&tsRefMutex); @@ -81,20 +84,21 @@ int taosOpenRef(int max, void (*fp)(void *)) } if (i < TSDB_REF_OBJECTS) { - refId = tsNextId; - pSet = tsRefSetList + refId; + rsetId = tsNextId; + pSet = tsRefSetList + rsetId; taosIncRefCount(pSet); pSet->max = max; pSet->nodeList = nodeList; pSet->lockedBy = lockedBy; pSet->fp = fp; + pSet->rid = 1; + pSet->rsetId = rsetId; pSet->state = TSDB_REF_STATE_ACTIVE; - pSet->refId = refId; tsRefSetNum++; - uTrace("refId:%d is opened, max:%d, fp:%p refSetNum:%d", refId, max, fp, tsRefSetNum); + uTrace("rsetId:%d is opened, max:%d, fp:%p refSetNum:%d", rsetId, max, fp, tsRefSetNum); } else { - refId = TSDB_CODE_REF_FULL; + rsetId = TSDB_CODE_REF_FULL; free (nodeList); free (lockedBy); uTrace("run out of Ref ID, maximum:%d refSetNum:%d", TSDB_REF_OBJECTS, tsRefSetNum); @@ -102,121 +106,116 @@ int taosOpenRef(int max, void (*fp)(void *)) pthread_mutex_unlock(&tsRefMutex); - return refId; + return rsetId; } -void taosCloseRef(int refId) +int taosCloseRef(int rsetId) { SRefSet *pSet; int deleted = 0; - if (refId < 0 || refId >= TSDB_REF_OBJECTS) { - uTrace("refId:%d is invalid, out of range", refId); - return; + if (rsetId < 0 || rsetId >= TSDB_REF_OBJECTS) { + uTrace("rsetId:%d is invalid, out of range", rsetId); + terrno = TSDB_CODE_REF_INVALID_ID; + return -1; } - pSet = tsRefSetList + refId; + pSet = tsRefSetList + rsetId; pthread_mutex_lock(&tsRefMutex); if (pSet->state == TSDB_REF_STATE_ACTIVE) { pSet->state = TSDB_REF_STATE_DELETED; deleted = 1; - uTrace("refId:%d is closed, count:%d", refId, pSet->count); + uTrace("rsetId:%d is closed, count:%d", rsetId, pSet->count); } else { - uTrace("refId:%d is already closed, count:%d", refId, pSet->count); + uTrace("rsetId:%d is already closed, count:%d", rsetId, pSet->count); } pthread_mutex_unlock(&tsRefMutex); if (deleted) taosDecRefCount(pSet); + + return 0; } -int taosAddRef(int refId, void *p) +int64_t taosAddRef(int rsetId, void *p) { int hash; SRefNode *pNode; SRefSet *pSet; + int64_t rid = 0; - if (refId < 0 || refId >= TSDB_REF_OBJECTS) { - uTrace("refId:%d p:%p failed to add, refId not valid", refId, p); - return TSDB_CODE_REF_INVALID_ID; + if (rsetId < 0 || rsetId >= TSDB_REF_OBJECTS) { + uTrace("rsetId:%d p:%p failed to add, rsetId not valid", rsetId, p); + terrno = TSDB_CODE_REF_INVALID_ID; + return -1; } - pSet = tsRefSetList + refId; + pSet = tsRefSetList + rsetId; taosIncRefCount(pSet); if (pSet->state != TSDB_REF_STATE_ACTIVE) { taosDecRefCount(pSet); - uTrace("refId:%d p:%p failed to add, not active", refId, p); - return TSDB_CODE_REF_ID_REMOVED; + uTrace("rsetId:%d p:%p failed to add, not active", rsetId, p); + terrno = TSDB_CODE_REF_ID_REMOVED; + return -1; } - int code = 0; - hash = taosHashRef(pSet, p); + pNode = calloc(sizeof(SRefNode), 1); + if (pNode == NULL) { + terrno = TSDB_CODE_REF_NO_MEMORY; + return -1; + } + rid = atomic_add_fetch_64(&pSet->rid, 1); + hash = rid % pSet->max; taosLockList(pSet->lockedBy+hash); - pNode = pSet->nodeList[hash]; - while (pNode) { - if (pNode->p == p) - break; + pNode->p = p; + pNode->rid = rid; + pNode->count = 1; - pNode = pNode->next; - } + pNode->prev = NULL; + pNode->next = pSet->nodeList[hash]; + if (pSet->nodeList[hash]) pSet->nodeList[hash]->prev = pNode; + pSet->nodeList[hash] = pNode; - if (pNode) { - code = TSDB_CODE_REF_ALREADY_EXIST; - uTrace("refId:%d p:%p is already there, faild to add", refId, p); - } else { - pNode = calloc(sizeof(SRefNode), 1); - if (pNode) { - pNode->p = p; - pNode->count = 1; - pNode->prev = 0; - pNode->next = pSet->nodeList[hash]; - if (pSet->nodeList[hash]) pSet->nodeList[hash]->prev = pNode; - pSet->nodeList[hash] = pNode; - uTrace("refId:%d p:%p is added, count:%d malloc mem: %p", refId, p, pSet->count, pNode); - } else { - code = TSDB_CODE_REF_NO_MEMORY; - uTrace("refId:%d p:%p is not added, since no memory", refId, p); - } - } - - if (code < 0) taosDecRefCount(pSet); + uTrace("rsetId:%d p:%p rid:%" PRId64 " is added, count:%d", rsetId, p, rid, pSet->count); taosUnlockList(pSet->lockedBy+hash); - return code; + return rid; } -int taosAcquireRef(int refId, void *p) +void *taosAcquireRef(int rsetId, int64_t rid) { - int hash, code = 0; + int hash; SRefNode *pNode; SRefSet *pSet; + void *p = NULL; - if (refId < 0 || refId >= TSDB_REF_OBJECTS) { - uTrace("refId:%d p:%p failed to acquire, refId not valid", refId, p); - return TSDB_CODE_REF_INVALID_ID; + if (rsetId < 0 || rsetId >= TSDB_REF_OBJECTS) { + uTrace("rsetId:%d rid:%" PRId64 " failed to acquire, rsetId not valid", rsetId, rid); + terrno = TSDB_CODE_REF_INVALID_ID; + return NULL; } - pSet = tsRefSetList + refId; + pSet = tsRefSetList + rsetId; taosIncRefCount(pSet); if (pSet->state != TSDB_REF_STATE_ACTIVE) { - uTrace("refId:%d p:%p failed to acquire, not active", refId, p); + uTrace("rsetId:%d rid:%" PRId64 " failed to acquire, not active", rsetId, rid); taosDecRefCount(pSet); - return TSDB_CODE_REF_ID_REMOVED; + terrno = TSDB_CODE_REF_ID_REMOVED; + return NULL; } - hash = taosHashRef(pSet, p); - + hash = rid % pSet->max; taosLockList(pSet->lockedBy+hash); pNode = pSet->nodeList[hash]; while (pNode) { - if (pNode->p == p) { + if (pNode->rid == rid) { break; } @@ -225,44 +224,47 @@ int taosAcquireRef(int refId, void *p) if (pNode) { pNode->count++; - uTrace("refId:%d p:%p is acquired", refId, p); + p = pNode->p; + uTrace("rsetId:%d p:%p rid:%" PRId64 " is acquired", rsetId, pNode->p, rid); } else { - code = TSDB_CODE_REF_NOT_EXIST; - uTrace("refId:%d p:%p is not there, failed to acquire", refId, p); + terrno = TSDB_CODE_REF_NOT_EXIST; + uTrace("rsetId:%d rid:%" PRId64 " is not there, failed to acquire", rsetId, rid); } taosUnlockList(pSet->lockedBy+hash); taosDecRefCount(pSet); - return code; + return p; } -void taosReleaseRef(int refId, void *p) +int taosReleaseRef(int rsetId, int64_t rid) { int hash; SRefNode *pNode; SRefSet *pSet; int released = 0; - if (refId < 0 || refId >= TSDB_REF_OBJECTS) { - uTrace("refId:%d p:%p failed to release, refId not valid", refId, p); - return; + if (rsetId < 0 || rsetId >= TSDB_REF_OBJECTS) { + uTrace("rsetId:%d rid:%" PRId64 " failed to release, rsetId not valid", rsetId, rid); + terrno = TSDB_CODE_REF_INVALID_ID; + return -1; } - pSet = tsRefSetList + refId; + pSet = tsRefSetList + rsetId; if (pSet->state == TSDB_REF_STATE_EMPTY) { - uTrace("refId:%d p:%p failed to release, cleaned", refId, p); - return; + uTrace("rsetId:%d rid:%" PRId64 " failed to release, cleaned", rsetId, rid); + terrno = TSDB_CODE_REF_ID_REMOVED; + return -1; } - hash = taosHashRef(pSet, p); - + terrno = 0; + hash = rid % pSet->max; taosLockList(pSet->lockedBy+hash); pNode = pSet->nodeList[hash]; while (pNode) { - if (pNode->p == p) + if (pNode->rid == rid) break; pNode = pNode->next; @@ -284,57 +286,63 @@ void taosReleaseRef(int refId, void *p) (*pSet->fp)(pNode->p); + uTrace("rsetId:%d p:%p rid:%" PRId64 "is removed, count:%d, free mem: %p", rsetId, pNode->p, rid, pSet->count, pNode); free(pNode); released = 1; - uTrace("refId:%d p:%p is removed, count:%d, free mem: %p", refId, p, pSet->count, pNode); } else { - uTrace("refId:%d p:%p is released", refId, p); + uTrace("rsetId:%d p:%p rid:%" PRId64 "is released", rsetId, pNode->p, rid); } } else { - uTrace("refId:%d p:%p is not there, failed to release", refId, p); + uTrace("rsetId:%d rid:%" PRId64 " is not there, failed to release", rsetId, rid); + terrno = TSDB_CODE_REF_NOT_EXIST; } taosUnlockList(pSet->lockedBy+hash); if (released) taosDecRefCount(pSet); + + return terrno; } -// if p is NULL, return the first p in hash list, otherwise, return the next after p -void *taosIterateRef(int refId, void *p) { +// if rid is 0, return the first p in hash list, otherwise, return the next after current rid +void *taosIterateRef(int rsetId, int64_t rid) { SRefNode *pNode = NULL; SRefSet *pSet; - if (refId < 0 || refId >= TSDB_REF_OBJECTS) { - uTrace("refId:%d p:%p failed to iterate, refId not valid", refId, p); + if (rsetId < 0 || rsetId >= TSDB_REF_OBJECTS) { + uTrace("rsetId:%d rid:%" PRId64 " failed to iterate, rsetId not valid", rsetId, rid); + terrno = TSDB_CODE_REF_INVALID_ID; return NULL; } - pSet = tsRefSetList + refId; + pSet = tsRefSetList + rsetId; taosIncRefCount(pSet); if (pSet->state != TSDB_REF_STATE_ACTIVE) { - uTrace("refId:%d p:%p failed to iterate, not active", refId, p); + uTrace("rsetId:%d rid:%" PRId64 " failed to iterate, rset not active", rsetId, rid); + terrno = TSDB_CODE_REF_ID_REMOVED; taosDecRefCount(pSet); return NULL; } int hash = 0; - if (p) { - hash = taosHashRef(pSet, p); + if (rid > 0) { + hash = rid % pSet->max; taosLockList(pSet->lockedBy+hash); pNode = pSet->nodeList[hash]; while (pNode) { - if (pNode->p == p) break; + if (pNode->rid == rid) break; pNode = pNode->next; } if (pNode == NULL) { - uError("refId:%d p:%p not there, quit", refId, p); + uError("rsetId:%d rid:%" PRId64 " not there, quit", rsetId, rid); + terrno = TSDB_CODE_REF_NOT_EXIST; taosUnlockList(pSet->lockedBy+hash); return NULL; } - // p is there + // rid is there pNode = pNode->next; if (pNode == NULL) { taosUnlockList(pSet->lockedBy+hash); @@ -356,12 +364,12 @@ void *taosIterateRef(int refId, void *p) { pNode->count++; // acquire it newP = pNode->p; taosUnlockList(pSet->lockedBy+hash); - uTrace("refId:%d p:%p is returned", refId, p); + uTrace("rsetId:%d p:%p rid:%" PRId64 " is returned", rsetId, newP, rid); } else { - uTrace("refId:%d p:%p the list is over", refId, p); + uTrace("rsetId:%d the list is over", rsetId); } - if (p) taosReleaseRef(refId, p); // release the current one + if (rid > 0) taosReleaseRef(rsetId, rid); // release the current one taosDecRefCount(pSet); @@ -381,13 +389,13 @@ int taosListRef() { if (pSet->state == TSDB_REF_STATE_EMPTY) continue; - uInfo("refId:%d state:%d count::%d", i, pSet->state, pSet->count); + uInfo("rsetId:%d state:%d count::%d", i, pSet->state, pSet->count); for (int j=0; j < pSet->max; ++j) { pNode = pSet->nodeList[j]; while (pNode) { - uInfo("refId:%d p:%p count:%d", i, pNode->p, pNode->count); + uInfo("rsetId:%d p:%p rid:%" PRId64 "count:%d", i, pNode->p, pNode->rid, pNode->count); pNode = pNode->next; num++; } @@ -399,22 +407,6 @@ int taosListRef() { return num; } -static int taosHashRef(SRefSet *pSet, void *p) -{ - int hash = 0; - int64_t v = (int64_t)p; - - for (int i = 0; i < sizeof(v); ++i) { - hash += (int)(v & 0xFFFF); - v = v >> 16; - i = i + 2; - } - - hash = hash % pSet->max; - - return hash; -} - static void taosLockList(int64_t *lockedBy) { int64_t tid = taosGetPthreadId(); int i = 0; @@ -438,12 +430,12 @@ static void taosInitRefModule(void) { static void taosIncRefCount(SRefSet *pSet) { atomic_add_fetch_32(&pSet->count, 1); - uTrace("refId:%d inc count:%d", pSet->refId, pSet->count); + uTrace("rsetId:%d inc count:%d", pSet->rsetId, pSet->count); } static void taosDecRefCount(SRefSet *pSet) { int32_t count = atomic_sub_fetch_32(&pSet->count, 1); - uTrace("refId:%d dec count:%d", pSet->refId, pSet->count); + uTrace("rsetId:%d dec count:%d", pSet->rsetId, pSet->count); if (count > 0) return; @@ -458,7 +450,7 @@ static void taosDecRefCount(SRefSet *pSet) { taosTFree(pSet->lockedBy); tsRefSetNum--; - uTrace("refId:%d is cleaned, refSetNum:%d count:%d", pSet->refId, tsRefSetNum, pSet->count); + uTrace("rsetId:%d is cleaned, refSetNum:%d count:%d", pSet->rsetId, tsRefSetNum, pSet->count); } pthread_mutex_unlock(&tsRefMutex);