From 428524bd08a3daddb10974b82dca0f99c4e58112 Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Tue, 3 Nov 2020 14:05:21 +0000 Subject: [PATCH 01/13] first version for taos file --- src/util/inc/tfile.h | 43 ++++++++ src/util/inc/tref.h | 37 ++++--- src/util/src/tfile.c | 94 +++++++++++++++++ src/util/src/tref.c | 246 +++++++++++++++++++++---------------------- 4 files changed, 278 insertions(+), 142 deletions(-) create mode 100644 src/util/inc/tfile.h create mode 100644 src/util/src/tfile.c diff --git a/src/util/inc/tfile.h b/src/util/inc/tfile.h new file mode 100644 index 0000000000..00b2fd6c32 --- /dev/null +++ b/src/util/inc/tfile.h @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_TFILE_H +#define TDENGINE_TFILE_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include + +// the same syntax as UNIX standard open/close/read/write +// but FD is int64_t and will never be reused + +int64_t tfopen(const char *pathname, int flags); +int64_t tfclose(int64_t tfd); +ssize_t tfwrite(int64_t tfd, const void *buf, size_t count); +ssize_t tfread(int64_t tfd, void *buf, size_t count); + +// init taos file module +int tfinit(); + +// clean up taos fle module +void tfcleanup(); + +#ifdef __cplusplus +} +#endif + +#endif // TDENGINE_TREF_H diff --git a/src/util/inc/tref.h b/src/util/inc/tref.h index ead8e2eb90..3e5db33cf7 100644 --- a/src/util/inc/tref.h +++ b/src/util/inc/tref.h @@ -21,38 +21,45 @@ extern "C" { #endif -// open an instance, return refId which will be used by other APIs -int taosOpenRef(int max, void (*fp)(void *)); +// open a reference set, max is the mod used by hash, fp is the pointer to free resource function +// return rsetId which will be used by other APIs. On error, -1 is returned, and terrno is set appropriately +int taosOpenRef(int max, void (*fp)(void *)); -// close the Ref instance -void taosCloseRef(int refId); +// close the reference set, refId is the return value by taosOpenRef +// return 0 if success. On error, -1 is returned, and terrno is set appropriately +int taosCloseRef(int refId); // add ref, p is the pointer to resource or pointer ID -int taosAddRef(int refId, void *p); +// return Reference ID(rid) allocated. On error, -1 is returned, and terrno is set appropriately +int64_t taosAddRef(int refId, void *p); #define taosRemoveRef taosReleaseRef -// acquire ref, p is the pointer to resource or pointer ID -int taosAcquireRef(int refId, void *p); +// acquire ref, rid is the reference ID returned by taosAddRef +// return the resource p. On error, NULL is returned, and terrno is set appropriately +void *taosAcquireRef(int rsetId, int64_t rid); -// release ref, p is the pointer to resource or pinter ID -void taosReleaseRef(int refId, void *p); +// release ref, rid is the reference ID returned by taosAddRef +// return 0 if success. On error, -1 is returned, and terrno is set appropriately +int taosReleaseRef(int rsetId, int64_t rid); -// return the first if p is null, otherwise return the next after p -void *taosIterateRef(int refId, void *p); +// return the first reference if rid is 0, otherwise return the next after current reference. +// if return value is NULL, it means list is over(if terrno is set, it means error happens) +void *taosIterateRef(int rsetId, int64_t rid); // return the number of references in system int taosListRef(); /* sample code to iterate the refs -void demoIterateRefs(int refId) { +void demoIterateRefs(int rsetId) { - void *p = taosIterateRef(refId, NULL); + void *p = taosIterateRef(refId, 0); while (p) { - // process P + + // get the rid from p - p = taosIterateRef(refId, p); + p = taosIterateRef(rsetId, rid); } } diff --git a/src/util/src/tfile.c b/src/util/src/tfile.c new file mode 100644 index 0000000000..ea699c2436 --- /dev/null +++ b/src/util/src/tfile.c @@ -0,0 +1,94 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "os.h" +#include "taoserror.h" +#include "tulog.h" +#include "tutil.h" +#include "tref.h" + +static int tsFileRsetId = -1; + +static void taosCloseFile(void *p) { + close((int)(uintptr_t)p); +} + +int tfinit() { + + tsFileRsetId = taosOpenRef(2000, taosCloseFile); + return tsFileRsetId; +} + +void tfcleanup() { + + if (tsFileRsetId >= 0) taosCloseRef(tsFileRsetId); + tsFileRsetId = -1; +} + +int64_t tfopen(const char *pathname, int flags) { + int fd = open(pathname, flags); + + if (fd < 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + + int64_t rid = taosAddRef(tsFileRsetId, (void *)(long)fd); + if (rid < 0) { + close(fd); + return -1; + } + + return rid; +} + +int64_t tfclose(int64_t tfd) { + return taosReleaseRef(tsFileRsetId, tfd); +} + +ssize_t tfwrite(int64_t tfd, const void *buf, size_t count) { + + void *p = taosAcquireRef(tsFileRsetId, tfd); + if (p == NULL) return terrno; + + int fd = (int)(uintptr_t)p; + + ssize_t ret = write(fd, buf, count); + if (ret < 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + + taosReleaseRef(tsFileRsetId, tfd); + return ret; +} + +ssize_t tfread(int64_t tfd, void *buf, size_t count) { + + void *p = taosAcquireRef(tsFileRsetId, tfd); + if (p == NULL) return terrno; + + int fd = (int)(uintptr_t)p; + + ssize_t ret = read(fd, buf, count); + if (ret < 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + + taosReleaseRef(tsFileRsetId, tfd); + return ret; +} + diff --git a/src/util/src/tref.c b/src/util/src/tref.c index 23a7210e99..c2adb6d15b 100644 --- a/src/util/src/tref.c +++ b/src/util/src/tref.c @@ -24,19 +24,21 @@ #define TSDB_REF_STATE_DELETED 2 typedef struct SRefNode { - struct SRefNode *prev; - struct SRefNode *next; - void *p; - int32_t count; + struct SRefNode *prev; // previous node + struct SRefNode *next; // next node + void *p; // pointer to resource protected, + int64_t rid; // reference ID + int32_t count; // number of references } SRefNode; typedef struct { - SRefNode **nodeList; - int state; // 0: empty, 1: active; 2: deleted - int refId; - int max; - int32_t count; // total number of SRefNodes in this set - int64_t *lockedBy; + SRefNode **nodeList; // array of SRefNode linked list + int state; // 0: empty, 1: active; 2: deleted + int rsetId; // refSet ID, global unique + int64_t rid; // increase by one for each new reference + int max; // mod + int32_t count; // total number of SRefNodes in this set + int64_t *lockedBy; void (*fp)(void *); } SRefSet; @@ -47,7 +49,6 @@ static int tsRefSetNum = 0; static int tsNextId = 0; static void taosInitRefModule(void); -static int taosHashRef(SRefSet *pSet, void *p); static void taosLockList(int64_t *lockedBy); static void taosUnlockList(int64_t *lockedBy); static void taosIncRefCount(SRefSet *pSet); @@ -58,19 +59,21 @@ int taosOpenRef(int max, void (*fp)(void *)) SRefNode **nodeList; SRefSet *pSet; int64_t *lockedBy; - int i, refId; + int i, rsetId; pthread_once(&tsRefModuleInit, taosInitRefModule); nodeList = calloc(sizeof(SRefNode *), (size_t)max); - if (nodeList == NULL) { - return TSDB_CODE_REF_NO_MEMORY; + if (nodeList == NULL) { + terrno = TSDB_CODE_REF_NO_MEMORY; + return -1; } lockedBy = calloc(sizeof(int64_t), (size_t)max); if (lockedBy == NULL) { free(nodeList); - return TSDB_CODE_REF_NO_MEMORY; + terrno = TSDB_CODE_REF_NO_MEMORY; + return -1; } pthread_mutex_lock(&tsRefMutex); @@ -81,20 +84,21 @@ int taosOpenRef(int max, void (*fp)(void *)) } if (i < TSDB_REF_OBJECTS) { - refId = tsNextId; - pSet = tsRefSetList + refId; + rsetId = tsNextId; + pSet = tsRefSetList + rsetId; taosIncRefCount(pSet); pSet->max = max; pSet->nodeList = nodeList; pSet->lockedBy = lockedBy; pSet->fp = fp; + pSet->rid = 1; + pSet->rsetId = rsetId; pSet->state = TSDB_REF_STATE_ACTIVE; - pSet->refId = refId; tsRefSetNum++; - uTrace("refId:%d is opened, max:%d, fp:%p refSetNum:%d", refId, max, fp, tsRefSetNum); + uTrace("rsetId:%d is opened, max:%d, fp:%p refSetNum:%d", rsetId, max, fp, tsRefSetNum); } else { - refId = TSDB_CODE_REF_FULL; + rsetId = TSDB_CODE_REF_FULL; free (nodeList); free (lockedBy); uTrace("run out of Ref ID, maximum:%d refSetNum:%d", TSDB_REF_OBJECTS, tsRefSetNum); @@ -102,121 +106,116 @@ int taosOpenRef(int max, void (*fp)(void *)) pthread_mutex_unlock(&tsRefMutex); - return refId; + return rsetId; } -void taosCloseRef(int refId) +int taosCloseRef(int rsetId) { SRefSet *pSet; int deleted = 0; - if (refId < 0 || refId >= TSDB_REF_OBJECTS) { - uTrace("refId:%d is invalid, out of range", refId); - return; + if (rsetId < 0 || rsetId >= TSDB_REF_OBJECTS) { + uTrace("rsetId:%d is invalid, out of range", rsetId); + terrno = TSDB_CODE_REF_INVALID_ID; + return -1; } - pSet = tsRefSetList + refId; + pSet = tsRefSetList + rsetId; pthread_mutex_lock(&tsRefMutex); if (pSet->state == TSDB_REF_STATE_ACTIVE) { pSet->state = TSDB_REF_STATE_DELETED; deleted = 1; - uTrace("refId:%d is closed, count:%d", refId, pSet->count); + uTrace("rsetId:%d is closed, count:%d", rsetId, pSet->count); } else { - uTrace("refId:%d is already closed, count:%d", refId, pSet->count); + uTrace("rsetId:%d is already closed, count:%d", rsetId, pSet->count); } pthread_mutex_unlock(&tsRefMutex); if (deleted) taosDecRefCount(pSet); + + return 0; } -int taosAddRef(int refId, void *p) +int64_t taosAddRef(int rsetId, void *p) { int hash; SRefNode *pNode; SRefSet *pSet; + int64_t rid = 0; - if (refId < 0 || refId >= TSDB_REF_OBJECTS) { - uTrace("refId:%d p:%p failed to add, refId not valid", refId, p); - return TSDB_CODE_REF_INVALID_ID; + if (rsetId < 0 || rsetId >= TSDB_REF_OBJECTS) { + uTrace("rsetId:%d p:%p failed to add, rsetId not valid", rsetId, p); + terrno = TSDB_CODE_REF_INVALID_ID; + return -1; } - pSet = tsRefSetList + refId; + pSet = tsRefSetList + rsetId; taosIncRefCount(pSet); if (pSet->state != TSDB_REF_STATE_ACTIVE) { taosDecRefCount(pSet); - uTrace("refId:%d p:%p failed to add, not active", refId, p); - return TSDB_CODE_REF_ID_REMOVED; + uTrace("rsetId:%d p:%p failed to add, not active", rsetId, p); + terrno = TSDB_CODE_REF_ID_REMOVED; + return -1; } - int code = 0; - hash = taosHashRef(pSet, p); + pNode = calloc(sizeof(SRefNode), 1); + if (pNode == NULL) { + terrno = TSDB_CODE_REF_NO_MEMORY; + return -1; + } + rid = atomic_add_fetch_64(&pSet->rid, 1); + hash = rid % pSet->max; taosLockList(pSet->lockedBy+hash); - pNode = pSet->nodeList[hash]; - while (pNode) { - if (pNode->p == p) - break; + pNode->p = p; + pNode->rid = rid; + pNode->count = 1; - pNode = pNode->next; - } + pNode->prev = NULL; + pNode->next = pSet->nodeList[hash]; + if (pSet->nodeList[hash]) pSet->nodeList[hash]->prev = pNode; + pSet->nodeList[hash] = pNode; - if (pNode) { - code = TSDB_CODE_REF_ALREADY_EXIST; - uTrace("refId:%d p:%p is already there, faild to add", refId, p); - } else { - pNode = calloc(sizeof(SRefNode), 1); - if (pNode) { - pNode->p = p; - pNode->count = 1; - pNode->prev = 0; - pNode->next = pSet->nodeList[hash]; - if (pSet->nodeList[hash]) pSet->nodeList[hash]->prev = pNode; - pSet->nodeList[hash] = pNode; - uTrace("refId:%d p:%p is added, count:%d malloc mem: %p", refId, p, pSet->count, pNode); - } else { - code = TSDB_CODE_REF_NO_MEMORY; - uTrace("refId:%d p:%p is not added, since no memory", refId, p); - } - } - - if (code < 0) taosDecRefCount(pSet); + uTrace("rsetId:%d p:%p rid:%" PRId64 " is added, count:%d", rsetId, p, rid, pSet->count); taosUnlockList(pSet->lockedBy+hash); - return code; + return rid; } -int taosAcquireRef(int refId, void *p) +void *taosAcquireRef(int rsetId, int64_t rid) { - int hash, code = 0; + int hash; SRefNode *pNode; SRefSet *pSet; + void *p = NULL; - if (refId < 0 || refId >= TSDB_REF_OBJECTS) { - uTrace("refId:%d p:%p failed to acquire, refId not valid", refId, p); - return TSDB_CODE_REF_INVALID_ID; + if (rsetId < 0 || rsetId >= TSDB_REF_OBJECTS) { + uTrace("rsetId:%d rid:%" PRId64 " failed to acquire, rsetId not valid", rsetId, rid); + terrno = TSDB_CODE_REF_INVALID_ID; + return NULL; } - pSet = tsRefSetList + refId; + pSet = tsRefSetList + rsetId; taosIncRefCount(pSet); if (pSet->state != TSDB_REF_STATE_ACTIVE) { - uTrace("refId:%d p:%p failed to acquire, not active", refId, p); + uTrace("rsetId:%d rid:%" PRId64 " failed to acquire, not active", rsetId, rid); taosDecRefCount(pSet); - return TSDB_CODE_REF_ID_REMOVED; + terrno = TSDB_CODE_REF_ID_REMOVED; + return NULL; } - hash = taosHashRef(pSet, p); - + hash = rid % pSet->max; taosLockList(pSet->lockedBy+hash); pNode = pSet->nodeList[hash]; while (pNode) { - if (pNode->p == p) { + if (pNode->rid == rid) { break; } @@ -225,44 +224,47 @@ int taosAcquireRef(int refId, void *p) if (pNode) { pNode->count++; - uTrace("refId:%d p:%p is acquired", refId, p); + p = pNode->p; + uTrace("rsetId:%d p:%p rid:%" PRId64 " is acquired", rsetId, pNode->p, rid); } else { - code = TSDB_CODE_REF_NOT_EXIST; - uTrace("refId:%d p:%p is not there, failed to acquire", refId, p); + terrno = TSDB_CODE_REF_NOT_EXIST; + uTrace("rsetId:%d rid:%" PRId64 " is not there, failed to acquire", rsetId, rid); } taosUnlockList(pSet->lockedBy+hash); taosDecRefCount(pSet); - return code; + return p; } -void taosReleaseRef(int refId, void *p) +int taosReleaseRef(int rsetId, int64_t rid) { int hash; SRefNode *pNode; SRefSet *pSet; int released = 0; - if (refId < 0 || refId >= TSDB_REF_OBJECTS) { - uTrace("refId:%d p:%p failed to release, refId not valid", refId, p); - return; + if (rsetId < 0 || rsetId >= TSDB_REF_OBJECTS) { + uTrace("rsetId:%d rid:%" PRId64 " failed to release, rsetId not valid", rsetId, rid); + terrno = TSDB_CODE_REF_INVALID_ID; + return -1; } - pSet = tsRefSetList + refId; + pSet = tsRefSetList + rsetId; if (pSet->state == TSDB_REF_STATE_EMPTY) { - uTrace("refId:%d p:%p failed to release, cleaned", refId, p); - return; + uTrace("rsetId:%d rid:%" PRId64 " failed to release, cleaned", rsetId, rid); + terrno = TSDB_CODE_REF_ID_REMOVED; + return -1; } - hash = taosHashRef(pSet, p); - + terrno = 0; + hash = rid % pSet->max; taosLockList(pSet->lockedBy+hash); pNode = pSet->nodeList[hash]; while (pNode) { - if (pNode->p == p) + if (pNode->rid == rid) break; pNode = pNode->next; @@ -284,57 +286,63 @@ void taosReleaseRef(int refId, void *p) (*pSet->fp)(pNode->p); + uTrace("rsetId:%d p:%p rid:%" PRId64 "is removed, count:%d, free mem: %p", rsetId, pNode->p, rid, pSet->count, pNode); free(pNode); released = 1; - uTrace("refId:%d p:%p is removed, count:%d, free mem: %p", refId, p, pSet->count, pNode); } else { - uTrace("refId:%d p:%p is released", refId, p); + uTrace("rsetId:%d p:%p rid:%" PRId64 "is released", rsetId, pNode->p, rid); } } else { - uTrace("refId:%d p:%p is not there, failed to release", refId, p); + uTrace("rsetId:%d rid:%" PRId64 " is not there, failed to release", rsetId, rid); + terrno = TSDB_CODE_REF_NOT_EXIST; } taosUnlockList(pSet->lockedBy+hash); if (released) taosDecRefCount(pSet); + + return terrno; } -// if p is NULL, return the first p in hash list, otherwise, return the next after p -void *taosIterateRef(int refId, void *p) { +// if rid is 0, return the first p in hash list, otherwise, return the next after current rid +void *taosIterateRef(int rsetId, int64_t rid) { SRefNode *pNode = NULL; SRefSet *pSet; - if (refId < 0 || refId >= TSDB_REF_OBJECTS) { - uTrace("refId:%d p:%p failed to iterate, refId not valid", refId, p); + if (rsetId < 0 || rsetId >= TSDB_REF_OBJECTS) { + uTrace("rsetId:%d rid:%" PRId64 " failed to iterate, rsetId not valid", rsetId, rid); + terrno = TSDB_CODE_REF_INVALID_ID; return NULL; } - pSet = tsRefSetList + refId; + pSet = tsRefSetList + rsetId; taosIncRefCount(pSet); if (pSet->state != TSDB_REF_STATE_ACTIVE) { - uTrace("refId:%d p:%p failed to iterate, not active", refId, p); + uTrace("rsetId:%d rid:%" PRId64 " failed to iterate, rset not active", rsetId, rid); + terrno = TSDB_CODE_REF_ID_REMOVED; taosDecRefCount(pSet); return NULL; } int hash = 0; - if (p) { - hash = taosHashRef(pSet, p); + if (rid > 0) { + hash = rid % pSet->max; taosLockList(pSet->lockedBy+hash); pNode = pSet->nodeList[hash]; while (pNode) { - if (pNode->p == p) break; + if (pNode->rid == rid) break; pNode = pNode->next; } if (pNode == NULL) { - uError("refId:%d p:%p not there, quit", refId, p); + uError("rsetId:%d rid:%" PRId64 " not there, quit", rsetId, rid); + terrno = TSDB_CODE_REF_NOT_EXIST; taosUnlockList(pSet->lockedBy+hash); return NULL; } - // p is there + // rid is there pNode = pNode->next; if (pNode == NULL) { taosUnlockList(pSet->lockedBy+hash); @@ -356,12 +364,12 @@ void *taosIterateRef(int refId, void *p) { pNode->count++; // acquire it newP = pNode->p; taosUnlockList(pSet->lockedBy+hash); - uTrace("refId:%d p:%p is returned", refId, p); + uTrace("rsetId:%d p:%p rid:%" PRId64 " is returned", rsetId, newP, rid); } else { - uTrace("refId:%d p:%p the list is over", refId, p); + uTrace("rsetId:%d the list is over", rsetId); } - if (p) taosReleaseRef(refId, p); // release the current one + if (rid > 0) taosReleaseRef(rsetId, rid); // release the current one taosDecRefCount(pSet); @@ -381,13 +389,13 @@ int taosListRef() { if (pSet->state == TSDB_REF_STATE_EMPTY) continue; - uInfo("refId:%d state:%d count::%d", i, pSet->state, pSet->count); + uInfo("rsetId:%d state:%d count::%d", i, pSet->state, pSet->count); for (int j=0; j < pSet->max; ++j) { pNode = pSet->nodeList[j]; while (pNode) { - uInfo("refId:%d p:%p count:%d", i, pNode->p, pNode->count); + uInfo("rsetId:%d p:%p rid:%" PRId64 "count:%d", i, pNode->p, pNode->rid, pNode->count); pNode = pNode->next; num++; } @@ -399,22 +407,6 @@ int taosListRef() { return num; } -static int taosHashRef(SRefSet *pSet, void *p) -{ - int hash = 0; - int64_t v = (int64_t)p; - - for (int i = 0; i < sizeof(v); ++i) { - hash += (int)(v & 0xFFFF); - v = v >> 16; - i = i + 2; - } - - hash = hash % pSet->max; - - return hash; -} - static void taosLockList(int64_t *lockedBy) { int64_t tid = taosGetPthreadId(); int i = 0; @@ -438,12 +430,12 @@ static void taosInitRefModule(void) { static void taosIncRefCount(SRefSet *pSet) { atomic_add_fetch_32(&pSet->count, 1); - uTrace("refId:%d inc count:%d", pSet->refId, pSet->count); + uTrace("rsetId:%d inc count:%d", pSet->rsetId, pSet->count); } static void taosDecRefCount(SRefSet *pSet) { int32_t count = atomic_sub_fetch_32(&pSet->count, 1); - uTrace("refId:%d dec count:%d", pSet->refId, pSet->count); + uTrace("rsetId:%d dec count:%d", pSet->rsetId, pSet->count); if (count > 0) return; @@ -458,7 +450,7 @@ static void taosDecRefCount(SRefSet *pSet) { taosTFree(pSet->lockedBy); tsRefSetNum--; - uTrace("refId:%d is cleaned, refSetNum:%d count:%d", pSet->refId, tsRefSetNum, pSet->count); + uTrace("rsetId:%d is cleaned, refSetNum:%d count:%d", pSet->rsetId, tsRefSetNum, pSet->count); } pthread_mutex_unlock(&tsRefMutex); From 5b735ad09170dea7fce07a0f897f859fe0867746 Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Thu, 5 Nov 2020 07:28:20 +0000 Subject: [PATCH 02/13] second version --- src/rpc/src/rpcMain.c | 15 +-- src/util/inc/tref.h | 5 +- src/util/src/tref.c | 197 +++++++++++++++++++++++--------------- src/util/tests/trefTest.c | 117 ++++++++++++---------- 4 files changed, 195 insertions(+), 139 deletions(-) diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index d01c34c810..05330ebff8 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -82,6 +82,7 @@ typedef struct { int8_t oldInUse; // server EP inUse passed by app int8_t redirect; // flag to indicate redirect int8_t connType; // connection type + int32_t rid; // refId returned by taosAddRef SRpcMsg *pRsp; // for synchronous API tsem_t *pSem; // for synchronous API SRpcEpSet *pSet; // for synchronous API @@ -374,7 +375,7 @@ void *rpcReallocCont(void *ptr, int contLen) { return start + sizeof(SRpcReqContext) + sizeof(SRpcHead); } -void rpcSendRequest(void *shandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg) { +int64_t rpcSendRequest(void *shandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg) { SRpcInfo *pRpc = (SRpcInfo *)shandle; SRpcReqContext *pContext; @@ -403,10 +404,11 @@ void rpcSendRequest(void *shandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg) { // set the handle to pContext, so app can cancel the request if (pMsg->handle) *((void **)pMsg->handle) = pContext; - taosAddRef(tsRpcRefId, pContext); + pContext->rid = taosAddRef(tsRpcRefId, pContext); + rpcSendReqToServer(pRpc, pContext); - return; + return pContext->rid; } void rpcSendResponse(const SRpcMsg *pRsp) { @@ -551,11 +553,10 @@ int rpcReportProgress(void *handle, char *pCont, int contLen) { return code; } -void rpcCancelRequest(void *handle) { - SRpcReqContext *pContext = handle; +void rpcCancelRequest(int64_t rid) { - int code = taosAcquireRef(tsRpcRefId, pContext); - if (code < 0) return; + SRpcReqContext *pContext = taosAcquireRef(tsRpcRefId, rid); + if (pContext == NULL) return; rpcCloseConn(pContext->pConn); diff --git a/src/util/inc/tref.h b/src/util/inc/tref.h index 3e5db33cf7..cd5092f30a 100644 --- a/src/util/inc/tref.h +++ b/src/util/inc/tref.h @@ -32,7 +32,10 @@ int taosCloseRef(int refId); // add ref, p is the pointer to resource or pointer ID // return Reference ID(rid) allocated. On error, -1 is returned, and terrno is set appropriately int64_t taosAddRef(int refId, void *p); -#define taosRemoveRef taosReleaseRef + +// remove ref, rid is the reference ID returned by taosAddRef +// return 0 if success. On error, -1 is returned, and terrno is set appropriately +int taosRemoveRef(int rsetId, int64_t rid); // acquire ref, rid is the reference ID returned by taosAddRef // return the resource p. On error, NULL is returned, and terrno is set appropriately diff --git a/src/util/src/tref.c b/src/util/src/tref.c index c2adb6d15b..915ed53193 100644 --- a/src/util/src/tref.c +++ b/src/util/src/tref.c @@ -29,6 +29,7 @@ typedef struct SRefNode { void *p; // pointer to resource protected, int64_t rid; // reference ID int32_t count; // number of references + int removed; // 1: removed } SRefNode; typedef struct { @@ -51,8 +52,9 @@ static int tsNextId = 0; static void taosInitRefModule(void); static void taosLockList(int64_t *lockedBy); static void taosUnlockList(int64_t *lockedBy); -static void taosIncRefCount(SRefSet *pSet); -static void taosDecRefCount(SRefSet *pSet); +static void taosIncRsetCount(SRefSet *pSet); +static void taosDecRsetCount(SRefSet *pSet); +static int taosDecRefCount(int rsetId, int64_t rid, int remove); int taosOpenRef(int max, void (*fp)(void *)) { @@ -86,7 +88,7 @@ int taosOpenRef(int max, void (*fp)(void *)) if (i < TSDB_REF_OBJECTS) { rsetId = tsNextId; pSet = tsRefSetList + rsetId; - taosIncRefCount(pSet); + taosIncRsetCount(pSet); pSet->max = max; pSet->nodeList = nodeList; pSet->lockedBy = lockedBy; @@ -134,7 +136,7 @@ int taosCloseRef(int rsetId) pthread_mutex_unlock(&tsRefMutex); - if (deleted) taosDecRefCount(pSet); + if (deleted) taosDecRsetCount(pSet); return 0; } @@ -153,9 +155,9 @@ int64_t taosAddRef(int rsetId, void *p) } pSet = tsRefSetList + rsetId; - taosIncRefCount(pSet); + taosIncRsetCount(pSet); if (pSet->state != TSDB_REF_STATE_ACTIVE) { - taosDecRefCount(pSet); + taosDecRsetCount(pSet); uTrace("rsetId:%d p:%p failed to add, not active", rsetId, p); terrno = TSDB_CODE_REF_ID_REMOVED; return -1; @@ -187,6 +189,12 @@ int64_t taosAddRef(int rsetId, void *p) return rid; } +int taosRemoveRef(int rsetId, int64_t rid) +{ + return taosDecRefCount(rsetId, rid, 1); +} + +// if rid is 0, return the first p in hash list, otherwise, return the next after current rid void *taosAcquireRef(int rsetId, int64_t rid) { int hash; @@ -200,11 +208,17 @@ void *taosAcquireRef(int rsetId, int64_t rid) return NULL; } + if (rid <= 0) { + uTrace("rsetId:%d rid:%" PRId64 " failed to acquire, rid not valid", rsetId, rid); + terrno = TSDB_CODE_REF_NOT_EXIST; + return NULL; + } + pSet = tsRefSetList + rsetId; - taosIncRefCount(pSet); + taosIncRsetCount(pSet); if (pSet->state != TSDB_REF_STATE_ACTIVE) { uTrace("rsetId:%d rid:%" PRId64 " failed to acquire, not active", rsetId, rid); - taosDecRefCount(pSet); + taosDecRsetCount(pSet); terrno = TSDB_CODE_REF_ID_REMOVED; return NULL; } @@ -223,9 +237,14 @@ void *taosAcquireRef(int rsetId, int64_t rid) } if (pNode) { - pNode->count++; - p = pNode->p; - uTrace("rsetId:%d p:%p rid:%" PRId64 " is acquired", rsetId, pNode->p, rid); + if (pNode->removed == 0) { + pNode->count++; + p = pNode->p; + uTrace("rsetId:%d p:%p rid:%" PRId64 " is acquired", rsetId, pNode->p, rid); + } else { + terrno = TSDB_CODE_REF_NOT_EXIST; + uTrace("rsetId:%d p:%p rid:%" PRId64 " is already removed, failed to acquire", rsetId, pNode->p, rid); + } } else { terrno = TSDB_CODE_REF_NOT_EXIST; uTrace("rsetId:%d rid:%" PRId64 " is not there, failed to acquire", rsetId, rid); @@ -233,75 +252,14 @@ void *taosAcquireRef(int rsetId, int64_t rid) taosUnlockList(pSet->lockedBy+hash); - taosDecRefCount(pSet); + taosDecRsetCount(pSet); return p; } int taosReleaseRef(int rsetId, int64_t rid) { - int hash; - SRefNode *pNode; - SRefSet *pSet; - int released = 0; - - if (rsetId < 0 || rsetId >= TSDB_REF_OBJECTS) { - uTrace("rsetId:%d rid:%" PRId64 " failed to release, rsetId not valid", rsetId, rid); - terrno = TSDB_CODE_REF_INVALID_ID; - return -1; - } - - pSet = tsRefSetList + rsetId; - if (pSet->state == TSDB_REF_STATE_EMPTY) { - uTrace("rsetId:%d rid:%" PRId64 " failed to release, cleaned", rsetId, rid); - terrno = TSDB_CODE_REF_ID_REMOVED; - return -1; - } - - terrno = 0; - hash = rid % pSet->max; - taosLockList(pSet->lockedBy+hash); - - pNode = pSet->nodeList[hash]; - while (pNode) { - if (pNode->rid == rid) - break; - - pNode = pNode->next; - } - - if (pNode) { - pNode->count--; - - if (pNode->count == 0) { - if ( pNode->prev ) { - pNode->prev->next = pNode->next; - } else { - pSet->nodeList[hash] = pNode->next; - } - - if ( pNode->next ) { - pNode->next->prev = pNode->prev; - } - - (*pSet->fp)(pNode->p); - - uTrace("rsetId:%d p:%p rid:%" PRId64 "is removed, count:%d, free mem: %p", rsetId, pNode->p, rid, pSet->count, pNode); - free(pNode); - released = 1; - } else { - uTrace("rsetId:%d p:%p rid:%" PRId64 "is released", rsetId, pNode->p, rid); - } - } else { - uTrace("rsetId:%d rid:%" PRId64 " is not there, failed to release", rsetId, rid); - terrno = TSDB_CODE_REF_NOT_EXIST; - } - - taosUnlockList(pSet->lockedBy+hash); - - if (released) taosDecRefCount(pSet); - - return terrno; + return taosDecRefCount(rsetId, rid, 0); } // if rid is 0, return the first p in hash list, otherwise, return the next after current rid @@ -315,12 +273,18 @@ void *taosIterateRef(int rsetId, int64_t rid) { return NULL; } + if (rid <= 0) { + uTrace("rsetId:%d rid:%" PRId64 " failed to iterate, rid not valid", rsetId, rid); + terrno = TSDB_CODE_REF_NOT_EXIST; + return NULL; + } + pSet = tsRefSetList + rsetId; - taosIncRefCount(pSet); + taosIncRsetCount(pSet); if (pSet->state != TSDB_REF_STATE_ACTIVE) { uTrace("rsetId:%d rid:%" PRId64 " failed to iterate, rset not active", rsetId, rid); terrno = TSDB_CODE_REF_ID_REMOVED; - taosDecRefCount(pSet); + taosDecRsetCount(pSet); return NULL; } @@ -371,7 +335,7 @@ void *taosIterateRef(int rsetId, int64_t rid) { if (rid > 0) taosReleaseRef(rsetId, rid); // release the current one - taosDecRefCount(pSet); + taosDecRsetCount(pSet); return newP; } @@ -407,6 +371,81 @@ int taosListRef() { return num; } +static int taosDecRefCount(int rsetId, int64_t rid, int remove) { + int hash; + SRefSet *pSet; + SRefNode *pNode; + int released = 0; + int code = 0; + + if (rsetId < 0 || rsetId >= TSDB_REF_OBJECTS) { + uTrace("rsetId:%d rid:%" PRId64 " failed to remove, rsetId not valid", rsetId, rid); + terrno = TSDB_CODE_REF_INVALID_ID; + return -1; + } + + if (rid <= 0) { + uTrace("rsetId:%d rid:%" PRId64 " failed to remove, rid not valid", rsetId, rid); + terrno = TSDB_CODE_REF_NOT_EXIST; + return -1; + } + + pSet = tsRefSetList + rsetId; + if (pSet->state == TSDB_REF_STATE_EMPTY) { + uTrace("rsetId:%d rid:%" PRId64 " failed to remove, cleaned", rsetId, rid); + terrno = TSDB_CODE_REF_ID_REMOVED; + return -1; + } + + terrno = 0; + hash = rid % pSet->max; + + taosLockList(pSet->lockedBy+hash); + + pNode = pSet->nodeList[hash]; + while (pNode) { + if (pNode->rid == rid) + break; + + pNode = pNode->next; + } + + if (pNode) { + pNode->count--; + if (remove) pNode->removed = 1; + + if (pNode->count <= 0) { + if (pNode->prev) { + pNode->prev->next = pNode->next; + } else { + pSet->nodeList[hash] = pNode->next; + } + + if (pNode->next) { + pNode->next->prev = pNode->prev; + } + + (*pSet->fp)(pNode->p); + + uTrace("rsetId:%d p:%p rid:%" PRId64 "is removed, count:%d, free mem: %p", rsetId, pNode->p, rid, pSet->count, pNode); + free(pNode); + released = 1; + } else { + uTrace("rsetId:%d p:%p rid:%" PRId64 "is released, count:%d", rsetId, pNode->p, rid, pNode->count); + } + } else { + uTrace("rsetId:%d rid:%" PRId64 " is not there, failed to release/remove", rsetId, rid); + terrno = TSDB_CODE_REF_NOT_EXIST; + code = -1; + } + + taosUnlockList(pSet->lockedBy+hash); + + if (released) taosDecRsetCount(pSet); + + return code; +} + static void taosLockList(int64_t *lockedBy) { int64_t tid = taosGetPthreadId(); int i = 0; @@ -428,12 +467,12 @@ static void taosInitRefModule(void) { pthread_mutex_init(&tsRefMutex, NULL); } -static void taosIncRefCount(SRefSet *pSet) { +static void taosIncRsetCount(SRefSet *pSet) { atomic_add_fetch_32(&pSet->count, 1); uTrace("rsetId:%d inc count:%d", pSet->rsetId, pSet->count); } -static void taosDecRefCount(SRefSet *pSet) { +static void taosDecRsetCount(SRefSet *pSet) { int32_t count = atomic_sub_fetch_32(&pSet->count, 1); uTrace("rsetId:%d dec count:%d", pSet->rsetId, pSet->count); diff --git a/src/util/tests/trefTest.c b/src/util/tests/trefTest.c index 09ffccd7b5..6887b24abd 100644 --- a/src/util/tests/trefTest.c +++ b/src/util/tests/trefTest.c @@ -11,106 +11,119 @@ #include "tulog.h" typedef struct { - int refNum; - int steps; - int refId; - void **p; + int refNum; + int steps; + int rsetId; + int64_t rid; + void **p; } SRefSpace; -void iterateRefs(int refId) { +void iterateRefs(int rsetId) { int count = 0; - void *p = taosIterateRef(refId, NULL); + void *p = taosIterateRef(rsetId, NULL); while (p) { // process P count++; - p = taosIterateRef(refId, p); + p = taosIterateRef(rsetId, p); } printf(" %d ", count); } -void *takeRefActions(void *param) { +void *addRef(void *param) { SRefSpace *pSpace = (SRefSpace *)param; - int code, id; + int id; + int64_t rid; for (int i=0; i < pSpace->steps; ++i) { - printf("s"); + printf("a"); id = random() % pSpace->refNum; - code = taosAddRef(pSpace->refId, pSpace->p[id]); - usleep(1); - - id = random() % pSpace->refNum; - code = taosAcquireRef(pSpace->refId, pSpace->p[id]); - if (code >= 0) { - usleep(id % 5 + 1); - taosReleaseRef(pSpace->refId, pSpace->p[id]); + if (pSpace->rid[id] <= 0) { + pSpace->p[id] = malloc(128); + pSpace->rid[id] = taosAddRef(pSpace->rsetId, pSpace->p[id]); } - - id = random() % pSpace->refNum; - taosRemoveRef(pSpace->refId, pSpace->p[id]); - usleep(id %5 + 1); - - id = random() % pSpace->refNum; - code = taosAcquireRef(pSpace->refId, pSpace->p[id]); - if (code >= 0) { - usleep(id % 5 + 1); - taosReleaseRef(pSpace->refId, pSpace->p[id]); - } - - id = random() % pSpace->refNum; - iterateRefs(id); + usleep(100); } - for (int i=0; i < pSpace->refNum; ++i) { - taosRemoveRef(pSpace->refId, pSpace->p[i]); - } - - //uInfo("refId:%d thread exits", pSpace->refId); + return NULL; +} + +void *removeRef(void *param) { + SRefSpace *pSpace = (SRefSpace *)param; + int id; + int64_t rid; + + for (int i=0; i < pSpace->steps; ++i) { + printf("d"); + id = random() % pSpace->refNum; + if (pSpace->rid[id] > 0) { + code = taosRemoveRef(pSpace->rsetId, pSpace->rid[id]); + if (code == 0) pSpace->rid[id] = 0; + } + + usleep(100); + } + + return NULL; +} + +void *acquireRelease(void *param) { + SRefSpace *pSpace = (SRefSpace *)param; + int id; + int64_t rid; + + for (int i=0; i < pSpace->steps; ++i) { + printf("a"); + + id = random() % pSpace->refNum; + code = taosAcquireRef(pSpace->rsetId, pSpace->p[id]); + if (code >= 0) { + usleep(id % 5 + 1); + taosReleaseRef(pSpace->rsetId, pSpace->p[id]); + } + } return NULL; } void myfree(void *p) { - return; + free(p); } void *openRefSpace(void *param) { SRefSpace *pSpace = (SRefSpace *)param; printf("c"); - pSpace->refId = taosOpenRef(50, myfree); + pSpace->rsetId = taosOpenRef(50, myfree); - if (pSpace->refId < 0) { - printf("failed to open ref, reson:%s\n", tstrerror(pSpace->refId)); + if (pSpace->rsetId < 0) { + printf("failed to open ref, reson:%s\n", tstrerror(pSpace->rsetId)); return NULL; } pSpace->p = (void **) calloc(sizeof(void *), pSpace->refNum); - for (int i=0; irefNum; ++i) { - pSpace->p[i] = (void *) malloc(128); - } pthread_attr_t thattr; pthread_attr_init(&thattr); pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); pthread_t thread1, thread2, thread3; - pthread_create(&(thread1), &thattr, takeRefActions, (void *)(pSpace)); - pthread_create(&(thread2), &thattr, takeRefActions, (void *)(pSpace)); - pthread_create(&(thread3), &thattr, takeRefActions, (void *)(pSpace)); + pthread_create(&(thread1), &thattr, addRef, (void *)(pSpace)); + pthread_create(&(thread2), &thattr, removeRef, (void *)(pSpace)); + pthread_create(&(thread3), &thattr, acquireRelease, (void *)(pSpace)); pthread_join(thread1, NULL); pthread_join(thread2, NULL); pthread_join(thread3, NULL); - taosCloseRef(pSpace->refId); - for (int i=0; irefNum; ++i) { - free(pSpace->p[i]); + taosRemoveRef(pSpace->rsetId, pSpace->rid[i]); } - uInfo("refId:%d main thread exit", pSpace->refId); + taosCloseRef(pSpace->rsetId); + + uInfo("rsetId:%d main thread exit", pSpace->rsetId); free(pSpace->p); pSpace->p = NULL; @@ -140,7 +153,7 @@ int main(int argc, char *argv[]) { printf("\nusage: %s [options] \n", argv[0]); printf(" [-n]: number of references, default: %d\n", refNum); printf(" [-s]: steps to run for each reference, default: %d\n", steps); - printf(" [-t]: number of refIds running in parallel, default: %d\n", threads); + printf(" [-t]: number of rsetIds running in parallel, default: %d\n", threads); printf(" [-l]: number of loops, default: %d\n", loops); printf(" [-d]: debugFlag, default: %d\n", uDebugFlag); exit(0); From acc5c53b126e079f25613ab15c3c1354ce031343 Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Thu, 5 Nov 2020 13:15:23 +0000 Subject: [PATCH 03/13] make changes on app --- src/client/inc/tsclient.h | 3 +- src/client/src/tscServer.c | 24 ++++----- src/client/src/tscSql.c | 24 ++++----- src/client/src/tscUtil.c | 2 +- src/inc/trpc.h | 4 +- src/inc/tsync.h | 14 ++--- src/mnode/src/mnodeSdb.c | 6 +-- src/rpc/src/rpcMain.c | 6 +-- src/sync/inc/syncInt.h | 1 + src/sync/src/syncMain.c | 104 +++++++++++++++++-------------------- src/sync/test/syncServer.c | 2 +- src/vnode/inc/vnodeInt.h | 2 +- src/vnode/src/vnodeMain.c | 20 +++---- src/wal/inc/walInt.h | 1 + src/wal/src/walMgmt.c | 9 ++-- 15 files changed, 106 insertions(+), 116 deletions(-) diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 1c2821638b..4c85af4919 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -330,6 +330,7 @@ typedef struct STscObj { char writeAuth : 1; char superAuth : 1; uint32_t connId; + uint64_t rid; // ref ID returned by taosAddRef struct SSqlObj * pHb; struct SSqlObj * sqlList; struct SSqlStream *streamList; @@ -348,7 +349,7 @@ typedef struct SSqlObj { void *signature; pthread_t owner; // owner of sql object, by which it is executed STscObj *pTscObj; - void *pRpcCtx; + int64_t rpcRid; void (*fp)(); void (*fetchFp)(); void *param; diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 1584afe706..a3e2eba168 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -182,27 +182,23 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { int32_t waitingDuring = tsShellActivityTimer * 500; tscDebug("%p send heartbeat in %dms", pSql, waitingDuring); - taosTmrReset(tscProcessActivityTimer, waitingDuring, pObj, tscTmr, &pObj->pTimer); + taosTmrReset(tscProcessActivityTimer, waitingDuring, (void *)pObj->rid, tscTmr, &pObj->pTimer); } else { tscDebug("%p start to close tscObj:%p, not send heartbeat again", pSql, pObj); } } void tscProcessActivityTimer(void *handle, void *tmrId) { - STscObj *pObj = (STscObj *)handle; - - int ret = taosAcquireRef(tscRefId, pObj); - if (ret < 0) { - tscTrace("%p failed to acquire TSC obj, reason:%s", pObj, tstrerror(ret)); - return; - } + int64_t rid = (int64_t) handle; + STscObj *pObj = taosAcquireRef(tscRefId, rid); + if (pObj == NULL) return; SSqlObj* pHB = pObj->pHb; void** p = taosCacheAcquireByKey(tscObjCache, &pHB, sizeof(TSDB_CACHE_PTR_TYPE)); if (p == NULL) { tscWarn("%p HB object has been released already", pHB); - taosReleaseRef(tscRefId, pObj); + taosReleaseRef(tscRefId, pObj->rid); return; } @@ -216,7 +212,7 @@ void tscProcessActivityTimer(void *handle, void *tmrId) { tscError("%p failed to sent HB to server, reason:%s", pHB, tstrerror(code)); } - taosReleaseRef(tscRefId, pObj); + taosReleaseRef(tscRefId, rid); } int tscSendMsgToServer(SSqlObj *pSql) { @@ -241,7 +237,7 @@ int tscSendMsgToServer(SSqlObj *pSql) { .pCont = pMsg, .contLen = pSql->cmd.payloadLen, .ahandle = pSql, - .handle = &pSql->pRpcCtx, + .handle = NULL, .code = 0 }; @@ -249,7 +245,7 @@ int tscSendMsgToServer(SSqlObj *pSql) { // Otherwise, the pSql object may have been released already during the response function, which is // processMsgFromServer function. In the meanwhile, the assignment of the rpc context to sql object will absolutely // cause crash. - rpcSendRequest(pObj->pDnodeConn, &pSql->epSet, &rpcMsg); + pSql->rpcRid = rpcSendRequest(pObj->pDnodeConn, &pSql->epSet, &rpcMsg); return TSDB_CODE_SUCCESS; } @@ -269,7 +265,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { SSqlCmd *pCmd = &pSql->cmd; assert(*pSql->self == pSql); - pSql->pRpcCtx = NULL; + pSql->rpcRid = -1; if (pObj->signature != pObj) { tscDebug("%p DB connection is closed, cmd:%d pObj:%p signature:%p", pSql, pCmd->command, pObj, pObj->signature); @@ -2026,7 +2022,7 @@ int tscProcessConnectRsp(SSqlObj *pSql) { createHBObj(pObj); //launch a timer to send heartbeat to maintain the connection and send status to mnode - taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer); + taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, (void *)pObj->rid, tscTmr, &pObj->pTimer); return 0; } diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 89dfa24e8f..bdc46c5446 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -161,7 +161,7 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa registerSqlObj(pSql); tsInsertHeadSize = sizeof(SMsgDesc) + sizeof(SSubmitMsg); - taosAddRef(tscRefId, pObj); + pObj->rid = taosAddRef(tscRefId, pObj); return pSql; } @@ -279,9 +279,9 @@ void taos_close(TAOS *taos) { SSqlObj* pHb = pObj->pHb; if (pHb != NULL && atomic_val_compare_exchange_ptr(&pObj->pHb, pHb, 0) == pHb) { - if (pHb->pRpcCtx != NULL) { // wait for rsp from dnode - rpcCancelRequest(pHb->pRpcCtx); - pHb->pRpcCtx = NULL; + if (pHb->rpcRid > 0) { // wait for rsp from dnode + rpcCancelRequest(pHb->rpcRid); + pHb->rpcRid = -1; } tscDebug("%p HB is freed", pHb); @@ -298,7 +298,7 @@ void taos_close(TAOS *taos) { tscDebug("%p all sqlObj are freed, free tscObj and close dnodeConn:%p", pObj, pObj->pDnodeConn); - taosRemoveRef(tscRefId, pObj); + taosRemoveRef(tscRefId, pObj->rid); } void waitForQueryRsp(void *param, TAOS_RES *tres, int code) { @@ -748,9 +748,9 @@ static void tscKillSTableQuery(SSqlObj *pSql) { assert(pSubObj->self == (SSqlObj**) p); pSubObj->res.code = TSDB_CODE_TSC_QUERY_CANCELLED; - if (pSubObj->pRpcCtx != NULL) { - rpcCancelRequest(pSubObj->pRpcCtx); - pSubObj->pRpcCtx = NULL; + if (pSubObj->rpcRid > 0) { + rpcCancelRequest(pSubObj->rpcRid); + pSubObj->rpcRid = -1; } tscQueueAsyncRes(pSubObj); @@ -775,7 +775,7 @@ void taos_stop_query(TAOS_RES *res) { SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); if (tscIsTwoStageSTableQuery(pQueryInfo, 0)) { - assert(pSql->pRpcCtx == NULL); + assert(pSql->rpcRid <= 0); tscKillSTableQuery(pSql); } else { if (pSql->cmd.command < TSDB_SQL_LOCAL) { @@ -784,9 +784,9 @@ void taos_stop_query(TAOS_RES *res) { * reset and freed in the processMsgFromServer function, and causes the invalid * write problem for rpcCancelRequest. */ - if (pSql->pRpcCtx != NULL) { - rpcCancelRequest(pSql->pRpcCtx); - pSql->pRpcCtx = NULL; + if (pSql->rpcRid > 0) { + rpcCancelRequest(pSql->rpcRid); + pSql->rpcRid = -1; } tscQueueAsyncRes(pSql); diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 879eeeaded..2aee90653d 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -376,7 +376,7 @@ void tscFreeRegisteredSqlObj(void *pSql) { tscDebug("%p free sqlObj completed, tscObj:%p ref:%d", *p, pTscObj, ref); if (ref == 0) { tscDebug("%p all sqlObj freed, free tscObj:%p", *p, pTscObj); - taosRemoveRef(tscRefId, pTscObj); + taosRemoveRef(tscRefId, pTscObj->rid); } } diff --git a/src/inc/trpc.h b/src/inc/trpc.h index bdee917b5e..e430a43807 100644 --- a/src/inc/trpc.h +++ b/src/inc/trpc.h @@ -83,13 +83,13 @@ void rpcClose(void *); void *rpcMallocCont(int contLen); void rpcFreeCont(void *pCont); void *rpcReallocCont(void *ptr, int contLen); -void rpcSendRequest(void *thandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg); +int64_t rpcSendRequest(void *thandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg); void rpcSendResponse(const SRpcMsg *pMsg); void rpcSendRedirectRsp(void *pConn, const SRpcEpSet *pEpSet); int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo); void rpcSendRecv(void *shandle, SRpcEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp); int rpcReportProgress(void *pConn, char *pCont, int contLen); -void rpcCancelRequest(void *pContext); +void rpcCancelRequest(int64_t rid); #ifdef __cplusplus } diff --git a/src/inc/tsync.h b/src/inc/tsync.h index 671adefab8..1b16fef84c 100644 --- a/src/inc/tsync.h +++ b/src/inc/tsync.h @@ -106,13 +106,13 @@ typedef void* tsync_h; int32_t syncInit(); void syncCleanUp(); -tsync_h syncStart(const SSyncInfo *); -void syncStop(tsync_h shandle); -int32_t syncReconfig(tsync_h shandle, const SSyncCfg *); -int32_t syncForwardToPeer(tsync_h shandle, void *pHead, void *mhandle, int qtype); -void syncConfirmForward(tsync_h shandle, uint64_t version, int32_t code); -void syncRecover(tsync_h shandle); // recover from other nodes: -int syncGetNodesRole(tsync_h shandle, SNodesRole *); +int64_t syncStart(const SSyncInfo *); +void syncStop(int64_t rid); +int32_t syncReconfig(int64_t rid, const SSyncCfg *); +int32_t syncForwardToPeer(int64_t rid, void *pHead, void *mhandle, int qtype); +void syncConfirmForward(int64_t rid, uint64_t version, int32_t code); +void syncRecover(int64_t rid); // recover from other nodes: +int syncGetNodesRole(int64_t rid, SNodesRole *); extern char *syncRole[]; diff --git a/src/mnode/src/mnodeSdb.c b/src/mnode/src/mnodeSdb.c index 8fb0b33060..f6acb6826c 100644 --- a/src/mnode/src/mnodeSdb.c +++ b/src/mnode/src/mnodeSdb.c @@ -72,7 +72,7 @@ typedef struct { ESyncRole role; ESdbStatus status; int64_t version; - void * sync; + int64_t sync; void * wal; SSyncCfg cfg; int32_t numOfTables; @@ -212,7 +212,7 @@ static void sdbRestoreTables() { } void sdbUpdateMnodeRoles() { - if (tsSdbObj.sync == NULL) return; + if (tsSdbObj.sync <= 0) return; SNodesRole roles = {0}; syncGetNodesRole(tsSdbObj.sync, &roles); @@ -433,7 +433,7 @@ void sdbCleanUp() { if (tsSdbObj.sync) { syncStop(tsSdbObj.sync); - tsSdbObj.sync = NULL; + tsSdbObj.sync = -1; } if (tsSdbObj.wal) { diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index 05330ebff8..dc67f6a80f 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -560,7 +560,7 @@ void rpcCancelRequest(int64_t rid) { rpcCloseConn(pContext->pConn); - taosReleaseRef(tsRpcRefId, pContext); + taosReleaseRef(tsRpcRefId, rid); } static void rpcFreeMsg(void *msg) { @@ -629,7 +629,7 @@ static void rpcReleaseConn(SRpcConn *pConn) { // if there is an outgoing message, free it if (pConn->outType && pConn->pReqMsg) { if (pConn->pContext) pConn->pContext->pConn = NULL; - taosRemoveRef(tsRpcRefId, pConn->pContext); + taosRemoveRef(tsRpcRefId, pConn->pContext->rid); } } @@ -1110,7 +1110,7 @@ static void rpcNotifyClient(SRpcReqContext *pContext, SRpcMsg *pMsg) { } // free the request message - taosRemoveRef(tsRpcRefId, pContext); + taosRemoveRef(tsRpcRefId, pContext->rid); } static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqContext *pContext) { diff --git a/src/sync/inc/syncInt.h b/src/sync/inc/syncInt.h index f681810646..8808a82c46 100644 --- a/src/sync/inc/syncInt.h +++ b/src/sync/inc/syncInt.h @@ -141,6 +141,7 @@ typedef struct SSyncNode { int8_t replica; int8_t quorum; uint32_t vgId; + int64_t rid; void *ahandle; int8_t selfIndex; SSyncPeer *peerInfo[TAOS_SYNC_MAX_REPLICA+1]; // extra one for arbitrator diff --git a/src/sync/src/syncMain.c b/src/sync/src/syncMain.c index 6f5e3be8ab..74e327183b 100644 --- a/src/sync/src/syncMain.c +++ b/src/sync/src/syncMain.c @@ -142,14 +142,14 @@ void syncCleanUp() { sInfo("sync module is cleaned up"); } -void *syncStart(const SSyncInfo *pInfo) { +int64_t syncStart(const SSyncInfo *pInfo) { const SSyncCfg *pCfg = &pInfo->syncCfg; SSyncNode *pNode = (SSyncNode *)calloc(sizeof(SSyncNode), 1); if (pNode == NULL) { sError("no memory to allocate syncNode"); terrno = TAOS_SYSTEM_ERROR(errno); - return NULL; + return -1; } tstrncpy(pNode->path, pInfo->path, sizeof(pNode->path)); @@ -170,10 +170,10 @@ void *syncStart(const SSyncInfo *pInfo) { pNode->quorum = pCfg->quorum; if (pNode->quorum > pNode->replica) pNode->quorum = pNode->replica; - int ret = taosAddRef(tsSyncRefId, pNode); - if (ret < 0) { + pNode->rid = taosAddRef(tsSyncRefId, pNode); + if (pNode->rid < 0) { syncFreeNode(pNode); - return NULL; + return -1; } for (int i = 0; i < pCfg->replica; ++i) { @@ -187,8 +187,8 @@ void *syncStart(const SSyncInfo *pInfo) { if (pNode->selfIndex < 0) { sInfo("vgId:%d, this node is not configured", pNode->vgId); terrno = TSDB_CODE_SYN_INVALID_CONFIG; - syncStop(pNode); - return NULL; + syncStop(pNode->rid); + return -1; } nodeVersion = pInfo->version; // set the initial version @@ -200,15 +200,15 @@ void *syncStart(const SSyncInfo *pInfo) { if (pNode->pSyncFwds == NULL) { sError("vgId:%d, no memory to allocate syncFwds", pNode->vgId); terrno = TAOS_SYSTEM_ERROR(errno); - syncStop(pNode); - return NULL; + syncStop(pNode->rid); + return -1; } - pNode->pFwdTimer = taosTmrStart(syncMonitorFwdInfos, 300, pNode, syncTmrCtrl); + pNode->pFwdTimer = taosTmrStart(syncMonitorFwdInfos, 300, (void *)pNode->rid, syncTmrCtrl); if (pNode->pFwdTimer == NULL) { sError("vgId:%d, failed to allocate timer", pNode->vgId); - syncStop(pNode); - return NULL; + syncStop(pNode->rid); + return -1; } syncAddArbitrator(pNode); @@ -218,15 +218,14 @@ void *syncStart(const SSyncInfo *pInfo) { (*pNode->notifyRole)(pNode->ahandle, nodeRole); } - return pNode; + return pNode->rid; } -void syncStop(void *param) { - SSyncNode *pNode = param; +void syncStop(int64_t rid) { SSyncPeer *pPeer; - int ret = taosAcquireRef(tsSyncRefId, pNode); - if (ret < 0) return; + SSyncNode *pNode = taosAcquireRef(tsSyncRefId, rid); + if (pNode == NULL) return; sInfo("vgId:%d, cleanup sync", pNode->vgId); @@ -245,16 +244,15 @@ void syncStop(void *param) { pthread_mutex_unlock(&(pNode->mutex)); - taosReleaseRef(tsSyncRefId, pNode); - taosRemoveRef(tsSyncRefId, pNode); + taosReleaseRef(tsSyncRefId, rid); + taosRemoveRef(tsSyncRefId, rid); } -int32_t syncReconfig(void *param, const SSyncCfg *pNewCfg) { - SSyncNode *pNode = param; +int32_t syncReconfig(int64_t rid, const SSyncCfg *pNewCfg) { int i, j; - int ret = taosAcquireRef(tsSyncRefId, pNode); - if (ret < 0) return TSDB_CODE_SYN_INVALID_CONFIG; + SSyncNode *pNode = taosAcquireRef(tsSyncRefId, rid); + if (pNode == NULL) return TSDB_CODE_SYN_INVALID_CONFIG; sInfo("vgId:%d, reconfig, role:%s replica:%d old:%d", pNode->vgId, syncRole[nodeRole], pNewCfg->replica, pNode->replica); @@ -318,29 +316,25 @@ int32_t syncReconfig(void *param, const SSyncCfg *pNewCfg) { syncRole[nodeRole]); syncBroadcastStatus(pNode); - taosReleaseRef(tsSyncRefId, pNode); + taosReleaseRef(tsSyncRefId, rid); return 0; } -int32_t syncForwardToPeer(void *param, void *data, void *mhandle, int qtype) { - SSyncNode *pNode = param; - - int ret = taosAcquireRef(tsSyncRefId, pNode); - if (ret < 0) return 0; +int32_t syncForwardToPeer(int64_t rid, void *data, void *mhandle, int qtype) { + SSyncNode *pNode = taosAcquireRef(tsSyncRefId, rid); + if (pNode == NULL) return 0; int32_t code = syncForwardToPeerImpl(pNode, data, mhandle, qtype); - taosReleaseRef(tsSyncRefId, pNode); + taosReleaseRef(tsSyncRefId, rid); return code; } -void syncConfirmForward(void *param, uint64_t version, int32_t code) { - SSyncNode *pNode = param; - - int ret = taosAcquireRef(tsSyncRefId, pNode); - if (ret < 0) return; +void syncConfirmForward(int64_t rid, uint64_t version, int32_t code) { + SSyncNode *pNode = taosAcquireRef(tsSyncRefId, rid); + if (pNode == NULL) return; SSyncPeer *pPeer = pNode->pMaster; if (pPeer && pNode->quorum > 1) { @@ -365,15 +359,14 @@ void syncConfirmForward(void *param, uint64_t version, int32_t code) { } } - taosReleaseRef(tsSyncRefId, pNode); + taosReleaseRef(tsSyncRefId, rid); } -void syncRecover(void *param) { - SSyncNode *pNode = param; +void syncRecover(int64_t rid) { SSyncPeer *pPeer; - int ret = taosAcquireRef(tsSyncRefId, pNode); - if (ret < 0) return; + SSyncNode *pNode = taosAcquireRef(tsSyncRefId, rid); + if (pNode == NULL) return; // to do: add a few lines to check if recover is OK // if take this node to unsync state, the whole system may not work @@ -393,14 +386,12 @@ void syncRecover(void *param) { pthread_mutex_unlock(&(pNode->mutex)); - taosReleaseRef(tsSyncRefId, pNode); + taosReleaseRef(tsSyncRefId, rid); } -int syncGetNodesRole(void *param, SNodesRole *pNodesRole) { - SSyncNode *pNode = param; - - int ret = taosAcquireRef(tsSyncRefId, pNode); - if (ret < 0) return -1; +int syncGetNodesRole(int64_t rid, SNodesRole *pNodesRole) { + SSyncNode *pNode = taosAcquireRef(tsSyncRefId, rid); + if (pNode == NULL) return -1; pNodesRole->selfIndex = pNode->selfIndex; for (int i = 0; i < pNode->replica; ++i) { @@ -408,7 +399,7 @@ int syncGetNodesRole(void *param, SNodesRole *pNodesRole) { pNodesRole->role[i] = pNode->peerInfo[i]->role; } - taosReleaseRef(tsSyncRefId, pNode); + taosReleaseRef(tsSyncRefId, rid); return 0; } @@ -455,7 +446,7 @@ void syncAddPeerRef(SSyncPeer *pPeer) { atomic_add_fetch_8(&pPeer->refCount, 1); int syncDecPeerRef(SSyncPeer *pPeer) { if (atomic_sub_fetch_8(&pPeer->refCount, 1) == 0) { - taosReleaseRef(tsSyncRefId, pPeer->pSyncNode); + taosReleaseRef(tsSyncRefId, pPeer->pSyncNode->rid); sDebug("%s, resource is freed", pPeer->id); taosTFree(pPeer->watchFd); @@ -512,7 +503,7 @@ static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) { taosTmrReset(syncCheckPeerConnection, checkMs, pPeer, syncTmrCtrl, &pPeer->timer); } - taosAcquireRef(tsSyncRefId, pNode); + taosAcquireRef(tsSyncRefId, pNode->rid); return pPeer; } @@ -1105,7 +1096,7 @@ static void syncProcessBrokenLink(void *param) { SSyncPeer *pPeer = param; SSyncNode *pNode = pPeer->pSyncNode; - if (taosAcquireRef(tsSyncRefId, pNode) < 0) return; + if (taosAcquireRef(tsSyncRefId, pNode->rid) < 0) return; pthread_mutex_lock(&(pNode->mutex)); sDebug("%s, TCP link is broken(%s)", pPeer->id, strerror(errno)); @@ -1116,7 +1107,7 @@ static void syncProcessBrokenLink(void *param) { } pthread_mutex_unlock(&(pNode->mutex)); - taosReleaseRef(tsSyncRefId, pNode); + taosReleaseRef(tsSyncRefId, pNode->rid); } static void syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle) { @@ -1184,10 +1175,9 @@ static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code } static void syncMonitorFwdInfos(void *param, void *tmrId) { - SSyncNode *pNode = param; - - int ret = taosAcquireRef(tsSyncRefId, pNode); - if ( ret < 0) return; + int64_t rid = (int64_t) param; + SSyncNode *pNode = taosAcquireRef(tsSyncRefId, rid); + if (pNode == NULL) return; SSyncFwds *pSyncFwds = pNode->pSyncFwds; @@ -1206,10 +1196,10 @@ static void syncMonitorFwdInfos(void *param, void *tmrId) { pthread_mutex_unlock(&(pNode->mutex)); } - pNode->pFwdTimer = taosTmrStart(syncMonitorFwdInfos, 300, pNode, syncTmrCtrl); + pNode->pFwdTimer = taosTmrStart(syncMonitorFwdInfos, 300, (void *)pNode->rid, syncTmrCtrl); } - taosReleaseRef(tsSyncRefId, pNode); + taosReleaseRef(tsSyncRefId, rid); } static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle, int qtype) { diff --git a/src/sync/test/syncServer.c b/src/sync/test/syncServer.c index 0cf752da97..9dd3feb461 100644 --- a/src/sync/test/syncServer.c +++ b/src/sync/test/syncServer.c @@ -30,7 +30,7 @@ int dataFd = -1; void * qhandle = NULL; int walNum = 0; uint64_t tversion = 0; -void * syncHandle; +int64_t syncHandle; int role; int nodeId; char path[256]; diff --git a/src/vnode/inc/vnodeInt.h b/src/vnode/inc/vnodeInt.h index 169334c611..9a7722c57d 100644 --- a/src/vnode/inc/vnodeInt.h +++ b/src/vnode/inc/vnodeInt.h @@ -47,7 +47,7 @@ typedef struct { void *rqueue; void *wal; void *tsdb; - void *sync; + int64_t sync; void *events; void *cq; // continuous query int32_t cfgVersion; diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index e206933116..ab1341a0da 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -44,12 +44,12 @@ static void vnodeCtrlFlow(void *handle, int32_t mseconds); static int vnodeNotifyFileSynced(void *ahandle, uint64_t fversion); #ifndef _SYNC -tsync_h syncStart(const SSyncInfo *info) { return NULL; } -int32_t syncForwardToPeer(tsync_h shandle, void *pHead, void *mhandle, int qtype) { return 0; } -void syncStop(tsync_h shandle) {} -int32_t syncReconfig(tsync_h shandle, const SSyncCfg * cfg) { return 0; } -int syncGetNodesRole(tsync_h shandle, SNodesRole * cfg) { return 0; } -void syncConfirmForward(tsync_h shandle, uint64_t version, int32_t code) {} +int64_t syncStart(const SSyncInfo *info) { return NULL; } +int32_t syncForwardToPeer(int64_t rid, void *pHead, void *mhandle, int qtype) { return 0; } +void syncStop(int64_t rid) {} +int32_t syncReconfig(int64_t rid, const SSyncCfg * cfg) { return 0; } +int syncGetNodesRole(int64_t rid, SNodesRole * cfg) { return 0; } +void syncConfirmForward(int64_t rid, uint64_t version, int32_t code) {} #endif char* vnodeStatus[] = { @@ -330,7 +330,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { #ifndef _SYNC pVnode->role = TAOS_SYNC_ROLE_MASTER; #else - if (pVnode->sync == NULL) { + if (pVnode->sync <= 0) { vError("vgId:%d, failed to open sync module, replica:%d reason:%s", pVnode->vgId, pVnode->syncCfg.replica, tstrerror(terrno)); vnodeCleanUp(pVnode); @@ -589,9 +589,9 @@ static void vnodeCleanUp(SVnodeObj *pVnode) { } // stop replication module - if (pVnode->sync) { - void *sync = pVnode->sync; - pVnode->sync = NULL; + if (pVnode->sync > 0) { + int64_t sync = pVnode->sync; + pVnode->sync = -1; syncStop(sync); } diff --git a/src/wal/inc/walInt.h b/src/wal/inc/walInt.h index 5273eb5b1c..d1e9772259 100644 --- a/src/wal/inc/walInt.h +++ b/src/wal/inc/walInt.h @@ -43,6 +43,7 @@ extern int32_t wDebugFlag; typedef struct { uint64_t version; int64_t fileId; + int64_t rid; int32_t vgId; int32_t fd; int32_t keep; diff --git a/src/wal/src/walMgmt.c b/src/wal/src/walMgmt.c index c8f0274174..2c4349ed3a 100644 --- a/src/wal/src/walMgmt.c +++ b/src/wal/src/walMgmt.c @@ -78,7 +78,8 @@ void *walOpen(char *path, SWalCfg *pCfg) { return NULL; } - if (taosAddRef(tsWal.refId, pWal) != TSDB_CODE_SUCCESS) { + pWal->rid = taosAddRef(tsWal.refId, pWal); + if (pWal->rid < 0) { walFreeObj(pWal); return NULL; } @@ -143,7 +144,7 @@ void walClose(void *handle) { } pthread_mutex_unlock(&pWal->mutex); - taosRemoveRef(tsWal.refId, pWal); + taosRemoveRef(tsWal.refId, pWal->rid); } static int32_t walInitObj(SWal *pWal) { @@ -185,7 +186,7 @@ static void walUpdateSeq() { } static void walFsyncAll() { - SWal *pWal = taosIterateRef(tsWal.refId, NULL); + SWal *pWal = taosIterateRef(tsWal.refId, 0); while (pWal) { if (walNeedFsync(pWal)) { wTrace("vgId:%d, do fsync, level:%d seq:%d rseq:%d", pWal->vgId, pWal->level, pWal->fsyncSeq, tsWal.seq); @@ -194,7 +195,7 @@ static void walFsyncAll() { wError("vgId:%d, file:%s, failed to fsync since %s", pWal->vgId, pWal->name, strerror(code)); } } - pWal = taosIterateRef(tsWal.refId, pWal); + pWal = taosIterateRef(tsWal.refId, pWal->rid); } } From cb0fcf10d5df213eae20da0d4e96f9e47cee0e1a Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Thu, 5 Nov 2020 14:40:20 +0000 Subject: [PATCH 04/13] minor changes --- src/rpc/src/rpcMain.c | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index a7123f250c..a24894c2bf 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -221,8 +221,7 @@ static void rpcFree(void *p) { free(p); } -static void rpcInit(void) { - +void rpcInit(void) { tsProgressTimer = tsRpcTimer/2; tsRpcMaxRetry = tsRpcMaxTime * 1000/tsProgressTimer; tsRpcHeadSize = RPC_MSG_OVERHEAD; @@ -231,6 +230,11 @@ static void rpcInit(void) { tsRpcRefId = taosOpenRef(200, rpcFree); } +void rpcCleanup(void) { + taosCloseRef(tsRpcRefId); + tsRpcRefId = -1; +} + void *rpcOpen(const SRpcInit *pInit) { SRpcInfo *pRpc; @@ -1621,11 +1625,7 @@ static void rpcDecRef(SRpcInfo *pRpc) tDebug("%s rpc resources are released", pRpc->label); taosTFree(pRpc); - int count = atomic_sub_fetch_32(&tsRpcNum, 1); - if (count == 0) { - // taosCloseRef(tsRpcRefId); - // tsRpcInit = PTHREAD_ONCE_INIT; // windows compliling error - } + atomic_sub_fetch_32(&tsRpcNum, 1); } } From 9bf73b64c3d7641355532db32c31dc33d12d4e1f Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Thu, 5 Nov 2020 15:18:34 +0000 Subject: [PATCH 05/13] minor changes on debug info --- src/client/src/tscSystem.c | 2 +- src/util/src/tref.c | 7 ++++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/src/client/src/tscSystem.c b/src/client/src/tscSystem.c index 839d5889f3..01046a2840 100644 --- a/src/client/src/tscSystem.c +++ b/src/client/src/tscSystem.c @@ -36,7 +36,7 @@ void * tscTmr; void * tscQhandle; void * tscCheckDiskUsageTmr; int tsInsertHeadSize; -int tscRefId; +int tscRefId = -1; int tscNumOfThreads; diff --git a/src/util/src/tref.c b/src/util/src/tref.c index 915ed53193..99d566e2ab 100644 --- a/src/util/src/tref.c +++ b/src/util/src/tref.c @@ -82,13 +82,13 @@ int taosOpenRef(int max, void (*fp)(void *)) for (i = 0; i < TSDB_REF_OBJECTS; ++i) { tsNextId = (tsNextId + 1) % TSDB_REF_OBJECTS; + if (tsNextId == 0) tsNextId = 1; // dont use 0 as rsetId if (tsRefSetList[tsNextId].state == TSDB_REF_STATE_EMPTY) break; } if (i < TSDB_REF_OBJECTS) { rsetId = tsNextId; pSet = tsRefSetList + rsetId; - taosIncRsetCount(pSet); pSet->max = max; pSet->nodeList = nodeList; pSet->lockedBy = lockedBy; @@ -96,6 +96,7 @@ int taosOpenRef(int max, void (*fp)(void *)) pSet->rid = 1; pSet->rsetId = rsetId; pSet->state = TSDB_REF_STATE_ACTIVE; + taosIncRsetCount(pSet); tsRefSetNum++; uTrace("rsetId:%d is opened, max:%d, fp:%p refSetNum:%d", rsetId, max, fp, tsRefSetNum); @@ -469,12 +470,12 @@ static void taosInitRefModule(void) { static void taosIncRsetCount(SRefSet *pSet) { atomic_add_fetch_32(&pSet->count, 1); - uTrace("rsetId:%d inc count:%d", pSet->rsetId, pSet->count); + // uTrace("rsetId:%d inc count:%d", pSet->rsetId, count); } static void taosDecRsetCount(SRefSet *pSet) { int32_t count = atomic_sub_fetch_32(&pSet->count, 1); - uTrace("rsetId:%d dec count:%d", pSet->rsetId, pSet->count); + // uTrace("rsetId:%d dec count:%d", pSet->rsetId, count); if (count > 0) return; From 10e951eb2ea5fc1230418cb9a933549771117d05 Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Fri, 6 Nov 2020 00:39:39 +0000 Subject: [PATCH 06/13] fix bug in tfile --- src/util/src/tfile.c | 22 ++++++---------------- 1 file changed, 6 insertions(+), 16 deletions(-) diff --git a/src/util/src/tfile.c b/src/util/src/tfile.c index ea699c2436..c3becef82c 100644 --- a/src/util/src/tfile.c +++ b/src/util/src/tfile.c @@ -26,13 +26,11 @@ static void taosCloseFile(void *p) { } int tfinit() { - tsFileRsetId = taosOpenRef(2000, taosCloseFile); return tsFileRsetId; } void tfcleanup() { - if (tsFileRsetId >= 0) taosCloseRef(tsFileRsetId); tsFileRsetId = -1; } @@ -45,17 +43,15 @@ int64_t tfopen(const char *pathname, int flags) { return -1; } - int64_t rid = taosAddRef(tsFileRsetId, (void *)(long)fd); - if (rid < 0) { - close(fd); - return -1; - } + void *p = (void *)(long)fd; + int64_t rid = taosAddRef(tsFileRsetId, p); + if (rid < 0) close(fd); return rid; } int64_t tfclose(int64_t tfd) { - return taosReleaseRef(tsFileRsetId, tfd); + return taosRemoveRef(tsFileRsetId, tfd); } ssize_t tfwrite(int64_t tfd, const void *buf, size_t count) { @@ -66,10 +62,7 @@ ssize_t tfwrite(int64_t tfd, const void *buf, size_t count) { int fd = (int)(uintptr_t)p; ssize_t ret = write(fd, buf, count); - if (ret < 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; - } + if (ret < 0) terrno = TAOS_SYSTEM_ERROR(errno); taosReleaseRef(tsFileRsetId, tfd); return ret; @@ -83,10 +76,7 @@ ssize_t tfread(int64_t tfd, void *buf, size_t count) { int fd = (int)(uintptr_t)p; ssize_t ret = read(fd, buf, count); - if (ret < 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; - } + if (ret < 0) terrno = TAOS_SYSTEM_ERROR(errno); taosReleaseRef(tsFileRsetId, tfd); return ret; From 54a50d66e5372a87f6a90a22214a897d081bcfe9 Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Fri, 6 Nov 2020 00:54:51 +0000 Subject: [PATCH 07/13] type conversion --- src/util/src/tfile.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/util/src/tfile.c b/src/util/src/tfile.c index c3becef82c..0324a5bae8 100644 --- a/src/util/src/tfile.c +++ b/src/util/src/tfile.c @@ -43,7 +43,7 @@ int64_t tfopen(const char *pathname, int flags) { return -1; } - void *p = (void *)(long)fd; + void *p = (void *)(int64_t)fd; int64_t rid = taosAddRef(tsFileRsetId, p); if (rid < 0) close(fd); From 5ffa312614b2561beae816c6d87260afbdd0a742 Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Fri, 6 Nov 2020 01:10:08 +0000 Subject: [PATCH 08/13] return -1 if error happens --- src/util/src/tfile.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/util/src/tfile.c b/src/util/src/tfile.c index 0324a5bae8..b9a25ffe23 100644 --- a/src/util/src/tfile.c +++ b/src/util/src/tfile.c @@ -57,7 +57,7 @@ int64_t tfclose(int64_t tfd) { ssize_t tfwrite(int64_t tfd, const void *buf, size_t count) { void *p = taosAcquireRef(tsFileRsetId, tfd); - if (p == NULL) return terrno; + if (p == NULL) return -1; int fd = (int)(uintptr_t)p; @@ -71,7 +71,7 @@ ssize_t tfwrite(int64_t tfd, const void *buf, size_t count) { ssize_t tfread(int64_t tfd, void *buf, size_t count) { void *p = taosAcquireRef(tsFileRsetId, tfd); - if (p == NULL) return terrno; + if (p == NULL) return -1; int fd = (int)(uintptr_t)p; From ecdd067900db71449648a8182673acf31ec986e8 Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Fri, 6 Nov 2020 01:20:53 +0000 Subject: [PATCH 09/13] size_t type conversion --- src/util/src/tfile.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/util/src/tfile.c b/src/util/src/tfile.c index b9a25ffe23..4807fea0d0 100644 --- a/src/util/src/tfile.c +++ b/src/util/src/tfile.c @@ -61,7 +61,7 @@ ssize_t tfwrite(int64_t tfd, const void *buf, size_t count) { int fd = (int)(uintptr_t)p; - ssize_t ret = write(fd, buf, count); + ssize_t ret = write(fd, buf, (uint32_t)count); if (ret < 0) terrno = TAOS_SYSTEM_ERROR(errno); taosReleaseRef(tsFileRsetId, tfd); @@ -75,7 +75,7 @@ ssize_t tfread(int64_t tfd, void *buf, size_t count) { int fd = (int)(uintptr_t)p; - ssize_t ret = read(fd, buf, count); + ssize_t ret = read(fd, buf, (uint32_t)count); if (ret < 0) terrno = TAOS_SYSTEM_ERROR(errno); taosReleaseRef(tsFileRsetId, tfd); From 4902effb55d878fd009a3c99ed41948870950542 Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Fri, 6 Nov 2020 01:27:37 +0000 Subject: [PATCH 10/13] data type for rid --- src/rpc/src/rpcMain.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index a24894c2bf..05275c28b0 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -82,7 +82,7 @@ typedef struct { int8_t oldInUse; // server EP inUse passed by app int8_t redirect; // flag to indicate redirect int8_t connType; // connection type - int32_t rid; // refId returned by taosAddRef + int64_t rid; // refId returned by taosAddRef SRpcMsg *pRsp; // for synchronous API tsem_t *pSem; // for synchronous API SRpcEpSet *pSet; // for synchronous API From 1e34d8ca9a173078e1a82759061c32affbd7b61f Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Fri, 6 Nov 2020 01:37:59 +0000 Subject: [PATCH 11/13] minor changes, dont set terrno --- src/util/src/tref.c | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/util/src/tref.c b/src/util/src/tref.c index 99d566e2ab..b998dfd43c 100644 --- a/src/util/src/tref.c +++ b/src/util/src/tref.c @@ -398,9 +398,7 @@ static int taosDecRefCount(int rsetId, int64_t rid, int remove) { return -1; } - terrno = 0; hash = rid % pSet->max; - taosLockList(pSet->lockedBy+hash); pNode = pSet->nodeList[hash]; From a191917452d021d1741b1a3359f6a8fbd99bc4f2 Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Fri, 6 Nov 2020 03:59:43 +0000 Subject: [PATCH 12/13] change head file --- src/util/inc/tfile.h | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/util/inc/tfile.h b/src/util/inc/tfile.h index 00b2fd6c32..981ee7cecb 100644 --- a/src/util/inc/tfile.h +++ b/src/util/inc/tfile.h @@ -22,19 +22,19 @@ extern "C" { #include +// init taos file module +int tfinit(); + +// clean up taos file module +void tfcleanup(); + // the same syntax as UNIX standard open/close/read/write // but FD is int64_t and will never be reused - int64_t tfopen(const char *pathname, int flags); int64_t tfclose(int64_t tfd); ssize_t tfwrite(int64_t tfd, const void *buf, size_t count); ssize_t tfread(int64_t tfd, void *buf, size_t count); -// init taos file module -int tfinit(); - -// clean up taos fle module -void tfcleanup(); #ifdef __cplusplus } From 4735b224ef6a52267a8ca5e9a7a9924c188adb05 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 6 Nov 2020 06:25:06 +0000 Subject: [PATCH 13/13] change read/write to taosRead/Write --- src/util/inc/tfile.h | 9 ++++----- src/util/src/tfile.c | 25 +++++++++++-------------- 2 files changed, 15 insertions(+), 19 deletions(-) diff --git a/src/util/inc/tfile.h b/src/util/inc/tfile.h index 981ee7cecb..10b7c1df35 100644 --- a/src/util/inc/tfile.h +++ b/src/util/inc/tfile.h @@ -23,18 +23,17 @@ extern "C" { #include // init taos file module -int tfinit(); +int32_t tfinit(); // clean up taos file module void tfcleanup(); // the same syntax as UNIX standard open/close/read/write // but FD is int64_t and will never be reused -int64_t tfopen(const char *pathname, int flags); +int64_t tfopen(const char *pathname, int32_t flags); int64_t tfclose(int64_t tfd); -ssize_t tfwrite(int64_t tfd, const void *buf, size_t count); -ssize_t tfread(int64_t tfd, void *buf, size_t count); - +int64_t tfwrite(int64_t tfd, void *buf, int64_t count); +int64_t tfread(int64_t tfd, void *buf, int64_t count); #ifdef __cplusplus } diff --git a/src/util/src/tfile.c b/src/util/src/tfile.c index 4807fea0d0..27ba30fe81 100644 --- a/src/util/src/tfile.c +++ b/src/util/src/tfile.c @@ -19,13 +19,13 @@ #include "tutil.h" #include "tref.h" -static int tsFileRsetId = -1; +static int32_t tsFileRsetId = -1; static void taosCloseFile(void *p) { - close((int)(uintptr_t)p); + close((int32_t)(uintptr_t)p); } -int tfinit() { +int32_t tfinit() { tsFileRsetId = taosOpenRef(2000, taosCloseFile); return tsFileRsetId; } @@ -35,8 +35,8 @@ void tfcleanup() { tsFileRsetId = -1; } -int64_t tfopen(const char *pathname, int flags) { - int fd = open(pathname, flags); +int64_t tfopen(const char *pathname, int32_t flags) { + int32_t fd = open(pathname, flags); if (fd < 0) { terrno = TAOS_SYSTEM_ERROR(errno); @@ -54,31 +54,28 @@ int64_t tfclose(int64_t tfd) { return taosRemoveRef(tsFileRsetId, tfd); } -ssize_t tfwrite(int64_t tfd, const void *buf, size_t count) { - +int64_t tfwrite(int64_t tfd, void *buf, int64_t count) { void *p = taosAcquireRef(tsFileRsetId, tfd); if (p == NULL) return -1; - int fd = (int)(uintptr_t)p; + int32_t fd = (int32_t)(uintptr_t)p; - ssize_t ret = write(fd, buf, (uint32_t)count); + int64_t ret = taosWrite(fd, buf, count); if (ret < 0) terrno = TAOS_SYSTEM_ERROR(errno); taosReleaseRef(tsFileRsetId, tfd); return ret; } -ssize_t tfread(int64_t tfd, void *buf, size_t count) { - +int64_t tfread(int64_t tfd, void *buf, int64_t count) { void *p = taosAcquireRef(tsFileRsetId, tfd); if (p == NULL) return -1; - int fd = (int)(uintptr_t)p; + int32_t fd = (int32_t)(uintptr_t)p; - ssize_t ret = read(fd, buf, (uint32_t)count); + int64_t ret = taosRead(fd, buf, count); if (ret < 0) terrno = TAOS_SYSTEM_ERROR(errno); taosReleaseRef(tsFileRsetId, tfd); return ret; } -