Merge pull request #12697 from taosdata/fix/dnode

fix: acquire vnode on restart may deadlock
This commit is contained in:
Shengliang Guan 2022-05-19 16:43:49 +08:00 committed by GitHub
commit a9aa8e5863
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 149 additions and 142 deletions

View File

@ -333,23 +333,23 @@ SSdbRow *sdbAllocRow(int32_t objSize);
void *sdbGetRowObj(SSdbRow *pRow); void *sdbGetRowObj(SSdbRow *pRow);
typedef struct SSdb { typedef struct SSdb {
SMnode *pMnode; SMnode *pMnode;
char *currDir; char *currDir;
char *syncDir; char *syncDir;
char *tmpDir; char *tmpDir;
int64_t lastCommitVer; int64_t lastCommitVer;
int64_t curVer; int64_t curVer;
int64_t tableVer[SDB_MAX]; int64_t tableVer[SDB_MAX];
int64_t maxId[SDB_MAX]; int64_t maxId[SDB_MAX];
EKeyType keyTypes[SDB_MAX]; EKeyType keyTypes[SDB_MAX];
SHashObj *hashObjs[SDB_MAX]; SHashObj *hashObjs[SDB_MAX];
SRWLatch locks[SDB_MAX]; TdThreadRwlock locks[SDB_MAX];
SdbInsertFp insertFps[SDB_MAX]; SdbInsertFp insertFps[SDB_MAX];
SdbUpdateFp updateFps[SDB_MAX]; SdbUpdateFp updateFps[SDB_MAX];
SdbDeleteFp deleteFps[SDB_MAX]; SdbDeleteFp deleteFps[SDB_MAX];
SdbDeployFp deployFps[SDB_MAX]; SdbDeployFp deployFps[SDB_MAX];
SdbEncodeFp encodeFps[SDB_MAX]; SdbEncodeFp encodeFps[SDB_MAX];
SdbDecodeFp decodeFps[SDB_MAX]; SdbDecodeFp decodeFps[SDB_MAX];
} SSdb; } SSdb;
#ifdef __cplusplus #ifdef __cplusplus

View File

@ -19,11 +19,11 @@
static void dmUpdateDnodeCfg(SDnodeMgmt *pMgmt, SDnodeCfg *pCfg) { static void dmUpdateDnodeCfg(SDnodeMgmt *pMgmt, SDnodeCfg *pCfg) {
if (pMgmt->pData->dnodeId == 0 || pMgmt->pData->clusterId == 0) { if (pMgmt->pData->dnodeId == 0 || pMgmt->pData->clusterId == 0) {
dInfo("set dnodeId:%d clusterId:%" PRId64, pCfg->dnodeId, pCfg->clusterId); dInfo("set dnodeId:%d clusterId:%" PRId64, pCfg->dnodeId, pCfg->clusterId);
taosWLockLatch(&pMgmt->pData->latch); taosThreadRwlockWrlock(&pMgmt->pData->lock);
pMgmt->pData->dnodeId = pCfg->dnodeId; pMgmt->pData->dnodeId = pCfg->dnodeId;
pMgmt->pData->clusterId = pCfg->clusterId; pMgmt->pData->clusterId = pCfg->clusterId;
dmWriteEps(pMgmt->pData); dmWriteEps(pMgmt->pData);
taosWUnLockLatch(&pMgmt->pData->latch); taosThreadRwlockUnlock(&pMgmt->pData->lock);
} }
} }
@ -50,7 +50,7 @@ static void dmProcessStatusRsp(SDnodeMgmt *pMgmt, SRpcMsg *pRsp) {
void dmSendStatusReq(SDnodeMgmt *pMgmt) { void dmSendStatusReq(SDnodeMgmt *pMgmt) {
SStatusReq req = {0}; SStatusReq req = {0};
taosRLockLatch(&pMgmt->pData->latch); taosThreadRwlockRdlock(&pMgmt->pData->lock);
req.sver = tsVersion; req.sver = tsVersion;
req.dnodeVer = pMgmt->pData->dnodeVer; req.dnodeVer = pMgmt->pData->dnodeVer;
req.dnodeId = pMgmt->pData->dnodeId; req.dnodeId = pMgmt->pData->dnodeId;
@ -69,7 +69,7 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
memcpy(req.clusterCfg.timezone, tsTimezoneStr, TD_TIMEZONE_LEN); memcpy(req.clusterCfg.timezone, tsTimezoneStr, TD_TIMEZONE_LEN);
memcpy(req.clusterCfg.locale, tsLocale, TD_LOCALE_LEN); memcpy(req.clusterCfg.locale, tsLocale, TD_LOCALE_LEN);
memcpy(req.clusterCfg.charset, tsCharset, TD_LOCALE_LEN); memcpy(req.clusterCfg.charset, tsCharset, TD_LOCALE_LEN);
taosRUnLockLatch(&pMgmt->pData->latch); taosThreadRwlockUnlock(&pMgmt->pData->lock);
SMonVloadInfo vinfo = {0}; SMonVloadInfo vinfo = {0};
(*pMgmt->getVnodeLoadsFp)(&vinfo); (*pMgmt->getVnodeLoadsFp)(&vinfo);

View File

@ -30,7 +30,6 @@ typedef struct SSnodeMgmt {
SMsgCb msgCb; SMsgCb msgCb;
const char *path; const char *path;
const char *name; const char *name;
SRWLatch latch;
int8_t uniqueWorkerInUse; int8_t uniqueWorkerInUse;
SArray *uniqueWorkers; // SArray<SMultiWorker*> SArray *uniqueWorkers; // SArray<SMultiWorker*>
SSingleWorker sharedWorker; SSingleWorker sharedWorker;

View File

@ -26,21 +26,21 @@ extern "C" {
#endif #endif
typedef struct SVnodeMgmt { typedef struct SVnodeMgmt {
SDnodeData *pData; SDnodeData *pData;
SMsgCb msgCb; SMsgCb msgCb;
const char *path; const char *path;
const char *name; const char *name;
SQWorkerPool queryPool; SQWorkerPool queryPool;
SQWorkerPool fetchPool; SQWorkerPool fetchPool;
SWWorkerPool syncPool; SWWorkerPool syncPool;
SWWorkerPool writePool; SWWorkerPool writePool;
SWWorkerPool mergePool; SWWorkerPool mergePool;
SSingleWorker mgmtWorker; SSingleWorker mgmtWorker;
SSingleWorker monitorWorker; SSingleWorker monitorWorker;
SHashObj *hash; SHashObj *hash;
SRWLatch latch; TdThreadRwlock lock;
SVnodesStat state; SVnodesStat state;
STfs *pTfs; STfs *pTfs;
} SVnodeMgmt; } SVnodeMgmt;
typedef struct { typedef struct {

View File

@ -17,7 +17,7 @@
#include "vmInt.h" #include "vmInt.h"
SVnodeObj **vmGetVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes) { SVnodeObj **vmGetVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes) {
taosRLockLatch(&pMgmt->latch); taosThreadRwlockRdlock(&pMgmt->lock);
int32_t num = 0; int32_t num = 0;
int32_t size = taosHashGetSize(pMgmt->hash); int32_t size = taosHashGetSize(pMgmt->hash);
@ -38,7 +38,7 @@ SVnodeObj **vmGetVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes) {
} }
} }
taosRUnLockLatch(&pMgmt->latch); taosThreadRwlockUnlock(&pMgmt->lock);
*numOfVnodes = num; *numOfVnodes = num;
return pVnodes; return pVnodes;

View File

@ -20,7 +20,7 @@ void vmGetVnodeLoads(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo) {
pInfo->pVloads = taosArrayInit(pMgmt->state.totalVnodes, sizeof(SVnodeLoad)); pInfo->pVloads = taosArrayInit(pMgmt->state.totalVnodes, sizeof(SVnodeLoad));
if (pInfo->pVloads == NULL) return; if (pInfo->pVloads == NULL) return;
taosRLockLatch(&pMgmt->latch); taosThreadRwlockRdlock(&pMgmt->lock);
void *pIter = taosHashIterate(pMgmt->hash, NULL); void *pIter = taosHashIterate(pMgmt->hash, NULL);
while (pIter) { while (pIter) {
@ -34,7 +34,7 @@ void vmGetVnodeLoads(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo) {
pIter = taosHashIterate(pMgmt->hash, pIter); pIter = taosHashIterate(pMgmt->hash, pIter);
} }
taosRUnLockLatch(&pMgmt->latch); taosThreadRwlockUnlock(&pMgmt->lock);
} }
void vmGetMonitorInfo(SVnodeMgmt *pMgmt, SMonVmInfo *pInfo) { void vmGetMonitorInfo(SVnodeMgmt *pMgmt, SMonVmInfo *pInfo) {

View File

@ -20,14 +20,14 @@ SVnodeObj *vmAcquireVnode(SVnodeMgmt *pMgmt, int32_t vgId) {
SVnodeObj *pVnode = NULL; SVnodeObj *pVnode = NULL;
int32_t refCount = 0; int32_t refCount = 0;
taosRLockLatch(&pMgmt->latch); taosThreadRwlockRdlock(&pMgmt->lock);
taosHashGetDup(pMgmt->hash, &vgId, sizeof(int32_t), (void *)&pVnode); taosHashGetDup(pMgmt->hash, &vgId, sizeof(int32_t), (void *)&pVnode);
if (pVnode == NULL) { if (pVnode == NULL) {
terrno = TSDB_CODE_VND_INVALID_VGROUP_ID; terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
} else { } else {
refCount = atomic_add_fetch_32(&pVnode->refCount, 1); refCount = atomic_add_fetch_32(&pVnode->refCount, 1);
} }
taosRUnLockLatch(&pMgmt->latch); taosThreadRwlockUnlock(&pMgmt->lock);
if (pVnode != NULL) { if (pVnode != NULL) {
dTrace("vgId:%d, acquire vnode, refCount:%d", pVnode->vgId, refCount); dTrace("vgId:%d, acquire vnode, refCount:%d", pVnode->vgId, refCount);
@ -39,9 +39,9 @@ SVnodeObj *vmAcquireVnode(SVnodeMgmt *pMgmt, int32_t vgId) {
void vmReleaseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { void vmReleaseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
if (pVnode == NULL) return; if (pVnode == NULL) return;
taosRLockLatch(&pMgmt->latch); taosThreadRwlockRdlock(&pMgmt->lock);
int32_t refCount = atomic_sub_fetch_32(&pVnode->refCount, 1); int32_t refCount = atomic_sub_fetch_32(&pVnode->refCount, 1);
taosRUnLockLatch(&pMgmt->latch); taosThreadRwlockUnlock(&pMgmt->lock);
dTrace("vgId:%d, release vnode, refCount:%d", pVnode->vgId, refCount); dTrace("vgId:%d, release vnode, refCount:%d", pVnode->vgId, refCount);
} }
@ -70,9 +70,9 @@ int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) {
return -1; return -1;
} }
taosWLockLatch(&pMgmt->latch); taosThreadRwlockWrlock(&pMgmt->lock);
int32_t code = taosHashPut(pMgmt->hash, &pVnode->vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *)); int32_t code = taosHashPut(pMgmt->hash, &pVnode->vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *));
taosWUnLockLatch(&pMgmt->latch); taosThreadRwlockUnlock(&pMgmt->lock);
return code; return code;
} }
@ -80,9 +80,9 @@ int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) {
void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
char path[TSDB_FILENAME_LEN] = {0}; char path[TSDB_FILENAME_LEN] = {0};
taosWLockLatch(&pMgmt->latch); taosThreadRwlockWrlock(&pMgmt->lock);
taosHashRemove(pMgmt->hash, &pVnode->vgId, sizeof(int32_t)); taosHashRemove(pMgmt->hash, &pVnode->vgId, sizeof(int32_t));
taosWUnLockLatch(&pMgmt->latch); taosThreadRwlockUnlock(&pMgmt->lock);
vmReleaseVnode(pMgmt, pVnode); vmReleaseVnode(pMgmt, pVnode);
while (pVnode->refCount > 0) taosMsleep(10); while (pVnode->refCount > 0) taosMsleep(10);
@ -239,6 +239,7 @@ static void vmCleanup(SVnodeMgmt *pMgmt) {
vmStopWorker(pMgmt); vmStopWorker(pMgmt);
vnodeCleanup(); vnodeCleanup();
tfsClose(pMgmt->pTfs); tfsClose(pMgmt->pTfs);
taosThreadRwlockDestroy(&pMgmt->lock);
taosMemoryFree(pMgmt); taosMemoryFree(pMgmt);
} }
@ -260,7 +261,7 @@ static int32_t vmInit(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
pMgmt->msgCb.queueFps[MERGE_QUEUE] = (PutToQueueFp)vmPutRpcMsgToMergeQueue; pMgmt->msgCb.queueFps[MERGE_QUEUE] = (PutToQueueFp)vmPutRpcMsgToMergeQueue;
pMgmt->msgCb.qsizeFp = (GetQueueSizeFp)vmGetQueueSize; pMgmt->msgCb.qsizeFp = (GetQueueSizeFp)vmGetQueueSize;
pMgmt->msgCb.mgmt = pMgmt; pMgmt->msgCb.mgmt = pMgmt;
taosInitRWLatch(&pMgmt->latch); taosThreadRwlockInit(&pMgmt->lock, NULL);
SDiskCfg dCfg = {0}; SDiskCfg dCfg = {0};
tstrncpy(dCfg.dir, tsDataDir, TSDB_FILENAME_LEN); tstrncpy(dCfg.dir, tsDataDir, TSDB_FILENAME_LEN);
@ -334,19 +335,23 @@ static int32_t vmRequire(const SMgmtInputOpt *pInput, bool *required) {
} }
static int32_t vmStart(SVnodeMgmt *pMgmt) { static int32_t vmStart(SVnodeMgmt *pMgmt) {
taosRLockLatch(&pMgmt->latch); int32_t numOfVnodes = 0;
SVnodeObj **pVnodes = vmGetVnodeListFromHash(pMgmt, &numOfVnodes);
void *pIter = taosHashIterate(pMgmt->hash, NULL); for (int32_t i = 0; i < numOfVnodes; ++i) {
while (pIter) { SVnodeObj *pVnode = pVnodes[i];
SVnodeObj **ppVnode = pIter;
if (ppVnode == NULL || *ppVnode == NULL) continue;
SVnodeObj *pVnode = *ppVnode;
vnodeStart(pVnode->pImpl); vnodeStart(pVnode->pImpl);
pIter = taosHashIterate(pMgmt->hash, pIter);
} }
taosRUnLockLatch(&pMgmt->latch); for (int32_t i = 0; i < numOfVnodes; ++i) {
SVnodeObj *pVnode = pVnodes[i];
vmReleaseVnode(pMgmt, pVnode);
}
if (pVnodes != NULL) {
taosMemoryFree(pVnodes);
}
return 0; return 0;
} }

View File

@ -70,7 +70,7 @@ typedef struct SMgmtWrapper {
const char *name; const char *name;
char *path; char *path;
int32_t refCount; int32_t refCount;
SRWLatch latch; TdThreadRwlock lock;
EDndNodeType ntype; EDndNodeType ntype;
bool deployed; bool deployed;
bool required; bool required;

View File

@ -91,7 +91,7 @@ static int32_t dmInitVars(SDnode *pDnode, EDndNodeType rtype) {
return -1; return -1;
} }
taosInitRWLatch(&pData->latch); taosThreadRwlockInit(&pData->lock, NULL);
taosThreadMutexInit(&pDnode->mutex, NULL); taosThreadMutexInit(&pDnode->mutex, NULL);
return 0; return 0;
} }
@ -100,6 +100,7 @@ static void dmClearVars(SDnode *pDnode) {
for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) { for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype]; SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
taosMemoryFreeClear(pWrapper->path); taosMemoryFreeClear(pWrapper->path);
taosThreadRwlockDestroy(&pWrapper->lock);
} }
if (pDnode->lockfile != NULL) { if (pDnode->lockfile != NULL) {
taosUnLockFile(pDnode->lockfile); taosUnLockFile(pDnode->lockfile);
@ -108,7 +109,7 @@ static void dmClearVars(SDnode *pDnode) {
} }
SDnodeData *pData = &pDnode->data; SDnodeData *pData = &pDnode->data;
taosWLockLatch(&pData->latch); taosThreadRwlockWrlock(&pData->lock);
if (pData->dnodeEps != NULL) { if (pData->dnodeEps != NULL) {
taosArrayDestroy(pData->dnodeEps); taosArrayDestroy(pData->dnodeEps);
pData->dnodeEps = NULL; pData->dnodeEps = NULL;
@ -117,8 +118,9 @@ static void dmClearVars(SDnode *pDnode) {
taosHashCleanup(pData->dnodeHash); taosHashCleanup(pData->dnodeHash);
pData->dnodeHash = NULL; pData->dnodeHash = NULL;
} }
taosWUnLockLatch(&pData->latch); taosThreadRwlockUnlock(&pData->lock);
taosThreadRwlockDestroy(&pData->lock);
taosThreadMutexDestroy(&pDnode->mutex); taosThreadMutexDestroy(&pDnode->mutex);
memset(&pDnode->mutex, 0, sizeof(pDnode->mutex)); memset(&pDnode->mutex, 0, sizeof(pDnode->mutex));
} }
@ -151,7 +153,7 @@ int32_t dmInitDnode(SDnode *pDnode, EDndNodeType rtype) {
if (ntype == DNODE) { if (ntype == DNODE) {
pWrapper->proc.ptype = DND_PROC_SINGLE; pWrapper->proc.ptype = DND_PROC_SINGLE;
} }
taosInitRWLatch(&pWrapper->latch); taosThreadRwlockInit(&pWrapper->lock, NULL);
snprintf(path, sizeof(path), "%s%s%s", tsDataDir, TD_DIRSEP, pWrapper->name); snprintf(path, sizeof(path), "%s%s%s", tsDataDir, TD_DIRSEP, pWrapper->name);
pWrapper->path = strdup(path); pWrapper->path = strdup(path);
@ -223,7 +225,7 @@ SMgmtWrapper *dmAcquireWrapper(SDnode *pDnode, EDndNodeType ntype) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype]; SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
SMgmtWrapper *pRetWrapper = pWrapper; SMgmtWrapper *pRetWrapper = pWrapper;
taosRLockLatch(&pWrapper->latch); taosThreadRwlockRdlock(&pWrapper->lock);
if (pWrapper->deployed) { if (pWrapper->deployed) {
int32_t refCount = atomic_add_fetch_32(&pWrapper->refCount, 1); int32_t refCount = atomic_add_fetch_32(&pWrapper->refCount, 1);
dTrace("node:%s, is acquired, ref:%d", pWrapper->name, refCount); dTrace("node:%s, is acquired, ref:%d", pWrapper->name, refCount);
@ -231,7 +233,7 @@ SMgmtWrapper *dmAcquireWrapper(SDnode *pDnode, EDndNodeType ntype) {
terrno = TSDB_CODE_NODE_NOT_DEPLOYED; terrno = TSDB_CODE_NODE_NOT_DEPLOYED;
pRetWrapper = NULL; pRetWrapper = NULL;
} }
taosRUnLockLatch(&pWrapper->latch); taosThreadRwlockUnlock(&pWrapper->lock);
return pRetWrapper; return pRetWrapper;
} }
@ -239,7 +241,7 @@ SMgmtWrapper *dmAcquireWrapper(SDnode *pDnode, EDndNodeType ntype) {
int32_t dmMarkWrapper(SMgmtWrapper *pWrapper) { int32_t dmMarkWrapper(SMgmtWrapper *pWrapper) {
int32_t code = 0; int32_t code = 0;
taosRLockLatch(&pWrapper->latch); taosThreadRwlockRdlock(&pWrapper->lock);
if (pWrapper->deployed || (InParentProc(pWrapper) && pWrapper->required)) { if (pWrapper->deployed || (InParentProc(pWrapper) && pWrapper->required)) {
int32_t refCount = atomic_add_fetch_32(&pWrapper->refCount, 1); int32_t refCount = atomic_add_fetch_32(&pWrapper->refCount, 1);
dTrace("node:%s, is marked, ref:%d", pWrapper->name, refCount); dTrace("node:%s, is marked, ref:%d", pWrapper->name, refCount);
@ -247,7 +249,7 @@ int32_t dmMarkWrapper(SMgmtWrapper *pWrapper) {
terrno = TSDB_CODE_NODE_NOT_DEPLOYED; terrno = TSDB_CODE_NODE_NOT_DEPLOYED;
code = -1; code = -1;
} }
taosRUnLockLatch(&pWrapper->latch); taosThreadRwlockUnlock(&pWrapper->lock);
return code; return code;
} }
@ -255,9 +257,9 @@ int32_t dmMarkWrapper(SMgmtWrapper *pWrapper) {
void dmReleaseWrapper(SMgmtWrapper *pWrapper) { void dmReleaseWrapper(SMgmtWrapper *pWrapper) {
if (pWrapper == NULL) return; if (pWrapper == NULL) return;
taosRLockLatch(&pWrapper->latch); taosThreadRwlockRdlock(&pWrapper->lock);
int32_t refCount = atomic_sub_fetch_32(&pWrapper->refCount, 1); int32_t refCount = atomic_sub_fetch_32(&pWrapper->refCount, 1);
taosRUnLockLatch(&pWrapper->latch); taosThreadRwlockUnlock(&pWrapper->lock);
dTrace("node:%s, is released, ref:%d", pWrapper->name, refCount); dTrace("node:%s, is released, ref:%d", pWrapper->name, refCount);
} }

View File

@ -186,12 +186,12 @@ void dmCloseNode(SMgmtWrapper *pWrapper) {
} }
} }
taosWLockLatch(&pWrapper->latch); taosThreadRwlockWrlock(&pWrapper->lock);
if (pWrapper->pMgmt != NULL) { if (pWrapper->pMgmt != NULL) {
(*pWrapper->func.closeFp)(pWrapper->pMgmt); (*pWrapper->func.closeFp)(pWrapper->pMgmt);
pWrapper->pMgmt = NULL; pWrapper->pMgmt = NULL;
} }
taosWUnLockLatch(&pWrapper->latch); taosThreadRwlockUnlock(&pWrapper->lock);
if (!OnlyInSingleProc(pWrapper)) { if (!OnlyInSingleProc(pWrapper)) {
dmCleanupProc(pWrapper); dmCleanupProc(pWrapper);

View File

@ -94,18 +94,18 @@ typedef void (*GetVnodeLoadsFp)();
typedef void (*GetMnodeLoadsFp)(); typedef void (*GetMnodeLoadsFp)();
typedef struct { typedef struct {
int32_t dnodeId; int32_t dnodeId;
int64_t clusterId; int64_t clusterId;
int64_t dnodeVer; int64_t dnodeVer;
int64_t updateTime; int64_t updateTime;
int64_t rebootTime; int64_t rebootTime;
bool dropped; bool dropped;
bool stopped; bool stopped;
SEpSet mnodeEps; SEpSet mnodeEps;
SArray *dnodeEps; SArray *dnodeEps;
SHashObj *dnodeHash; SHashObj *dnodeHash;
SRWLatch latch; TdThreadRwlock lock;
SMsgCb msgCb; SMsgCb msgCb;
} SDnodeData; } SDnodeData;
typedef struct { typedef struct {

View File

@ -21,7 +21,7 @@ static bool dmIsEpChanged(SDnodeData *pData, int32_t dnodeId, const char *ep);
static void dmResetEps(SDnodeData *pData, SArray *dnodeEps); static void dmResetEps(SDnodeData *pData, SArray *dnodeEps);
static void dmGetDnodeEp(SDnodeData *pData, int32_t dnodeId, char *pEp, char *pFqdn, uint16_t *pPort) { static void dmGetDnodeEp(SDnodeData *pData, int32_t dnodeId, char *pEp, char *pFqdn, uint16_t *pPort) {
taosRLockLatch(&pData->latch); taosThreadRwlockRdlock(&pData->lock);
SDnodeEp *pDnodeEp = taosHashGet(pData->dnodeHash, &dnodeId, sizeof(int32_t)); SDnodeEp *pDnodeEp = taosHashGet(pData->dnodeHash, &dnodeId, sizeof(int32_t));
if (pDnodeEp != NULL) { if (pDnodeEp != NULL) {
@ -36,7 +36,7 @@ static void dmGetDnodeEp(SDnodeData *pData, int32_t dnodeId, char *pEp, char *pF
} }
} }
taosRUnLockLatch(&pData->latch); taosThreadRwlockUnlock(&pData->lock);
} }
int32_t dmReadEps(SDnodeData *pData) { int32_t dmReadEps(SDnodeData *pData) {
@ -232,7 +232,7 @@ void dmUpdateEps(SDnodeData *pData, SArray *eps) {
int32_t numOfEps = taosArrayGetSize(eps); int32_t numOfEps = taosArrayGetSize(eps);
if (numOfEps <= 0) return; if (numOfEps <= 0) return;
taosWLockLatch(&pData->latch); taosThreadRwlockWrlock(&pData->lock);
int32_t numOfEpsOld = (int32_t)taosArrayGetSize(pData->dnodeEps); int32_t numOfEpsOld = (int32_t)taosArrayGetSize(pData->dnodeEps);
if (numOfEps != numOfEpsOld) { if (numOfEps != numOfEpsOld) {
@ -246,7 +246,7 @@ void dmUpdateEps(SDnodeData *pData, SArray *eps) {
} }
} }
taosWUnLockLatch(&pData->latch); taosThreadRwlockUnlock(&pData->lock);
} }
static void dmResetEps(SDnodeData *pData, SArray *dnodeEps) { static void dmResetEps(SDnodeData *pData, SArray *dnodeEps) {
@ -292,7 +292,7 @@ static void dmPrintEps(SDnodeData *pData) {
static bool dmIsEpChanged(SDnodeData *pData, int32_t dnodeId, const char *ep) { static bool dmIsEpChanged(SDnodeData *pData, int32_t dnodeId, const char *ep) {
bool changed = false; bool changed = false;
if (dnodeId == 0) return changed; if (dnodeId == 0) return changed;
taosRLockLatch(&pData->latch); taosThreadRwlockRdlock(&pData->lock);
SDnodeEp *pDnodeEp = taosHashGet(pData->dnodeHash, &dnodeId, sizeof(int32_t)); SDnodeEp *pDnodeEp = taosHashGet(pData->dnodeHash, &dnodeId, sizeof(int32_t));
if (pDnodeEp != NULL) { if (pDnodeEp != NULL) {
@ -304,24 +304,23 @@ static bool dmIsEpChanged(SDnodeData *pData, int32_t dnodeId, const char *ep) {
} }
} }
taosRUnLockLatch(&pData->latch); taosThreadRwlockUnlock(&pData->lock);
return changed; return changed;
} }
void dmGetMnodeEpSet(SDnodeData *pData, SEpSet *pEpSet) { void dmGetMnodeEpSet(SDnodeData *pData, SEpSet *pEpSet) {
taosRLockLatch(&pData->latch); taosThreadRwlockRdlock(&pData->lock);
*pEpSet = pData->mnodeEps; *pEpSet = pData->mnodeEps;
taosRUnLockLatch(&pData->latch); taosThreadRwlockUnlock(&pData->lock);
} }
void dmSetMnodeEpSet(SDnodeData *pData, SEpSet *pEpSet) { void dmSetMnodeEpSet(SDnodeData *pData, SEpSet *pEpSet) {
dInfo("mnode is changed, num:%d use:%d", pEpSet->numOfEps, pEpSet->inUse); taosThreadRwlockWrlock(&pData->lock);
taosWLockLatch(&pData->latch);
pData->mnodeEps = *pEpSet; pData->mnodeEps = *pEpSet;
taosThreadRwlockUnlock(&pData->lock);
dInfo("mnode is changed, num:%d use:%d", pEpSet->numOfEps, pEpSet->inUse);
for (int32_t i = 0; i < pEpSet->numOfEps; ++i) { for (int32_t i = 0; i < pEpSet->numOfEps; ++i) {
dInfo("mnode index:%d %s:%u", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port); dInfo("mnode index:%d %s:%u", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port);
} }
taosWUnLockLatch(&pData->latch);
} }

View File

@ -48,7 +48,7 @@ SSdb *sdbInit(SSdbOpt *pOption) {
} }
for (ESdbType i = 0; i < SDB_MAX; ++i) { for (ESdbType i = 0; i < SDB_MAX; ++i) {
taosInitRWLatch(&pSdb->locks[i]); taosThreadRwlockInit(&pSdb->locks[i], NULL);
pSdb->maxId[i] = 0; pSdb->maxId[i] = 0;
pSdb->tableVer[i] = 0; pSdb->tableVer[i] = 0;
pSdb->keyTypes[i] = SDB_KEY_INT32; pSdb->keyTypes[i] = SDB_KEY_INT32;
@ -98,7 +98,10 @@ void sdbCleanup(SSdb *pSdb) {
taosHashClear(hash); taosHashClear(hash);
taosHashCleanup(hash); taosHashCleanup(hash);
taosThreadRwlockDestroy(&pSdb->locks[i]);
pSdb->hashObjs[i] = NULL; pSdb->hashObjs[i] = NULL;
memset(&pSdb->locks[i], 0, sizeof(pSdb->locks[i]));
mDebug("sdb table:%s is cleaned up", sdbTableName(i)); mDebug("sdb table:%s is cleaned up", sdbTableName(i));
} }
@ -134,7 +137,6 @@ int32_t sdbSetTable(SSdb *pSdb, SSdbTable table) {
pSdb->maxId[sdbType] = 0; pSdb->maxId[sdbType] = 0;
pSdb->hashObjs[sdbType] = hash; pSdb->hashObjs[sdbType] = hash;
taosInitRWLatch(&pSdb->locks[sdbType]);
mDebug("sdb table:%s is initialized", sdbTableName(sdbType)); mDebug("sdb table:%s is initialized", sdbTableName(sdbType));
return 0; return 0;

View File

@ -257,8 +257,8 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) {
mTrace("write %s to file, total %d rows", sdbTableName(i), sdbGetSize(pSdb, i)); mTrace("write %s to file, total %d rows", sdbTableName(i), sdbGetSize(pSdb, i));
SHashObj *hash = pSdb->hashObjs[i]; SHashObj *hash = pSdb->hashObjs[i];
SRWLatch *pLock = &pSdb->locks[i]; TdThreadRwlock *pLock = &pSdb->locks[i];
taosWLockLatch(pLock); taosThreadRwlockWrlock(pLock);
SSdbRow **ppRow = taosHashIterate(hash, NULL); SSdbRow **ppRow = taosHashIterate(hash, NULL);
while (ppRow != NULL) { while (ppRow != NULL) {
@ -303,7 +303,7 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) {
sdbFreeRaw(pRaw); sdbFreeRaw(pRaw);
ppRow = taosHashIterate(hash, ppRow); ppRow = taosHashIterate(hash, ppRow);
} }
taosWUnLockLatch(pLock); taosThreadRwlockUnlock(pLock);
} }
if (code == 0) { if (code == 0) {

View File

@ -129,12 +129,12 @@ static int32_t sdbGetkeySize(SSdb *pSdb, ESdbType type, const void *pKey) {
} }
static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pRow, int32_t keySize) { static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pRow, int32_t keySize) {
SRWLatch *pLock = &pSdb->locks[pRow->type]; TdThreadRwlock *pLock = &pSdb->locks[pRow->type];
taosWLockLatch(pLock); taosThreadRwlockWrlock(pLock);
SSdbRow *pOldRow = taosHashGet(hash, pRow->pObj, keySize); SSdbRow *pOldRow = taosHashGet(hash, pRow->pObj, keySize);
if (pOldRow != NULL) { if (pOldRow != NULL) {
taosWUnLockLatch(pLock); taosThreadRwlockUnlock(pLock);
sdbFreeRow(pSdb, pRow, false); sdbFreeRow(pSdb, pRow, false);
terrno = TSDB_CODE_SDB_OBJ_ALREADY_THERE; terrno = TSDB_CODE_SDB_OBJ_ALREADY_THERE;
return terrno; return terrno;
@ -145,13 +145,13 @@ static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
sdbPrintOper(pSdb, pRow, "insert"); sdbPrintOper(pSdb, pRow, "insert");
if (taosHashPut(hash, pRow->pObj, keySize, &pRow, sizeof(void *)) != 0) { if (taosHashPut(hash, pRow->pObj, keySize, &pRow, sizeof(void *)) != 0) {
taosWUnLockLatch(pLock); taosThreadRwlockUnlock(pLock);
sdbFreeRow(pSdb, pRow, false); sdbFreeRow(pSdb, pRow, false);
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return terrno; return terrno;
} }
taosWUnLockLatch(pLock); taosThreadRwlockUnlock(pLock);
int32_t code = 0; int32_t code = 0;
SdbInsertFp insertFp = pSdb->insertFps[pRow->type]; SdbInsertFp insertFp = pSdb->insertFps[pRow->type];
@ -159,9 +159,9 @@ static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
code = (*insertFp)(pSdb, pRow->pObj); code = (*insertFp)(pSdb, pRow->pObj);
if (code != 0) { if (code != 0) {
code = terrno; code = terrno;
taosWLockLatch(pLock); taosThreadRwlockWrlock(pLock);
taosHashRemove(hash, pRow->pObj, keySize); taosHashRemove(hash, pRow->pObj, keySize);
taosWUnLockLatch(pLock); taosThreadRwlockUnlock(pLock);
sdbFreeRow(pSdb, pRow, false); sdbFreeRow(pSdb, pRow, false);
terrno = code; terrno = code;
return terrno; return terrno;
@ -180,19 +180,19 @@ static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
} }
static int32_t sdbUpdateRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pNewRow, int32_t keySize) { static int32_t sdbUpdateRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pNewRow, int32_t keySize) {
SRWLatch *pLock = &pSdb->locks[pNewRow->type]; TdThreadRwlock *pLock = &pSdb->locks[pNewRow->type];
taosWLockLatch(pLock); taosThreadRwlockWrlock(pLock);
SSdbRow **ppOldRow = taosHashGet(hash, pNewRow->pObj, keySize); SSdbRow **ppOldRow = taosHashGet(hash, pNewRow->pObj, keySize);
if (ppOldRow == NULL || *ppOldRow == NULL) { if (ppOldRow == NULL || *ppOldRow == NULL) {
taosWUnLockLatch(pLock); taosThreadRwlockUnlock(pLock);
return sdbInsertRow(pSdb, hash, pRaw, pNewRow, keySize); return sdbInsertRow(pSdb, hash, pRaw, pNewRow, keySize);
} }
SSdbRow *pOldRow = *ppOldRow; SSdbRow *pOldRow = *ppOldRow;
pOldRow->status = pRaw->status; pOldRow->status = pRaw->status;
sdbPrintOper(pSdb, pOldRow, "update"); sdbPrintOper(pSdb, pOldRow, "update");
taosWUnLockLatch(pLock); taosThreadRwlockUnlock(pLock);
int32_t code = 0; int32_t code = 0;
SdbUpdateFp updateFp = pSdb->updateFps[pNewRow->type]; SdbUpdateFp updateFp = pSdb->updateFps[pNewRow->type];
@ -207,12 +207,12 @@ static int32_t sdbUpdateRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
} }
static int32_t sdbDeleteRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pRow, int32_t keySize) { static int32_t sdbDeleteRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pRow, int32_t keySize) {
SRWLatch *pLock = &pSdb->locks[pRow->type]; TdThreadRwlock *pLock = &pSdb->locks[pRow->type];
taosWLockLatch(pLock); taosThreadRwlockWrlock(pLock);
SSdbRow **ppOldRow = taosHashGet(hash, pRow->pObj, keySize); SSdbRow **ppOldRow = taosHashGet(hash, pRow->pObj, keySize);
if (ppOldRow == NULL || *ppOldRow == NULL) { if (ppOldRow == NULL || *ppOldRow == NULL) {
taosWUnLockLatch(pLock); taosThreadRwlockUnlock(pLock);
sdbFreeRow(pSdb, pRow, false); sdbFreeRow(pSdb, pRow, false);
terrno = TSDB_CODE_SDB_OBJ_NOT_THERE; terrno = TSDB_CODE_SDB_OBJ_NOT_THERE;
return terrno; return terrno;
@ -223,7 +223,7 @@ static int32_t sdbDeleteRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
sdbPrintOper(pSdb, pOldRow, "delete"); sdbPrintOper(pSdb, pOldRow, "delete");
taosHashRemove(hash, pOldRow->pObj, keySize); taosHashRemove(hash, pOldRow->pObj, keySize);
taosWUnLockLatch(pLock); taosThreadRwlockUnlock(pLock);
pSdb->tableVer[pOldRow->type]++; pSdb->tableVer[pOldRow->type]++;
sdbFreeRow(pSdb, pRow, false); sdbFreeRow(pSdb, pRow, false);
@ -278,12 +278,12 @@ void *sdbAcquire(SSdb *pSdb, ESdbType type, const void *pKey) {
void *pRet = NULL; void *pRet = NULL;
int32_t keySize = sdbGetkeySize(pSdb, type, pKey); int32_t keySize = sdbGetkeySize(pSdb, type, pKey);
SRWLatch *pLock = &pSdb->locks[type]; TdThreadRwlock *pLock = &pSdb->locks[type];
taosRLockLatch(pLock); taosThreadRwlockRdlock(pLock);
SSdbRow **ppRow = taosHashGet(hash, pKey, keySize); SSdbRow **ppRow = taosHashGet(hash, pKey, keySize);
if (ppRow == NULL || *ppRow == NULL) { if (ppRow == NULL || *ppRow == NULL) {
taosRUnLockLatch(pLock); taosThreadRwlockUnlock(pLock);
terrno = TSDB_CODE_SDB_OBJ_NOT_THERE; terrno = TSDB_CODE_SDB_OBJ_NOT_THERE;
return NULL; return NULL;
} }
@ -306,13 +306,13 @@ void *sdbAcquire(SSdb *pSdb, ESdbType type, const void *pKey) {
break; break;
} }
taosRUnLockLatch(pLock); taosThreadRwlockUnlock(pLock);
return pRet; return pRet;
} }
static void sdbCheckRow(SSdb *pSdb, SSdbRow *pRow) { static void sdbCheckRow(SSdb *pSdb, SSdbRow *pRow) {
SRWLatch *pLock = &pSdb->locks[pRow->type]; TdThreadRwlock *pLock = &pSdb->locks[pRow->type];
taosWLockLatch(pLock); taosThreadRwlockWrlock(pLock);
int32_t ref = atomic_load_32(&pRow->refCount); int32_t ref = atomic_load_32(&pRow->refCount);
sdbPrintOper(pSdb, pRow, "check"); sdbPrintOper(pSdb, pRow, "check");
@ -320,7 +320,7 @@ static void sdbCheckRow(SSdb *pSdb, SSdbRow *pRow) {
sdbFreeRow(pSdb, pRow, true); sdbFreeRow(pSdb, pRow, true);
} }
taosWUnLockLatch(pLock); taosThreadRwlockUnlock(pLock);
} }
void sdbRelease(SSdb *pSdb, void *pObj) { void sdbRelease(SSdb *pSdb, void *pObj) {
@ -329,8 +329,8 @@ void sdbRelease(SSdb *pSdb, void *pObj) {
SSdbRow *pRow = (SSdbRow *)((char *)pObj - sizeof(SSdbRow)); SSdbRow *pRow = (SSdbRow *)((char *)pObj - sizeof(SSdbRow));
if (pRow->type >= SDB_MAX) return; if (pRow->type >= SDB_MAX) return;
SRWLatch *pLock = &pSdb->locks[pRow->type]; TdThreadRwlock *pLock = &pSdb->locks[pRow->type];
taosWLockLatch(pLock); taosThreadRwlockWrlock(pLock);
int32_t ref = atomic_sub_fetch_32(&pRow->refCount, 1); int32_t ref = atomic_sub_fetch_32(&pRow->refCount, 1);
sdbPrintOper(pSdb, pRow, "release"); sdbPrintOper(pSdb, pRow, "release");
@ -338,7 +338,7 @@ void sdbRelease(SSdb *pSdb, void *pObj) {
sdbFreeRow(pSdb, pRow, true); sdbFreeRow(pSdb, pRow, true);
} }
taosWUnLockLatch(pLock); taosThreadRwlockUnlock(pLock);
} }
void *sdbFetch(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj) { void *sdbFetch(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj) {
@ -347,8 +347,8 @@ void *sdbFetch(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj) {
SHashObj *hash = sdbGetHash(pSdb, type); SHashObj *hash = sdbGetHash(pSdb, type);
if (hash == NULL) return NULL; if (hash == NULL) return NULL;
SRWLatch *pLock = &pSdb->locks[type]; TdThreadRwlock *pLock = &pSdb->locks[type];
taosRLockLatch(pLock); taosThreadRwlockRdlock(pLock);
SSdbRow **ppRow = taosHashIterate(hash, pIter); SSdbRow **ppRow = taosHashIterate(hash, pIter);
while (ppRow != NULL) { while (ppRow != NULL) {
@ -363,7 +363,7 @@ void *sdbFetch(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj) {
*ppObj = pRow->pObj; *ppObj = pRow->pObj;
break; break;
} }
taosRUnLockLatch(pLock); taosThreadRwlockUnlock(pLock);
return ppRow; return ppRow;
} }
@ -374,18 +374,18 @@ void sdbCancelFetch(SSdb *pSdb, void *pIter) {
SHashObj *hash = sdbGetHash(pSdb, pRow->type); SHashObj *hash = sdbGetHash(pSdb, pRow->type);
if (hash == NULL) return; if (hash == NULL) return;
SRWLatch *pLock = &pSdb->locks[pRow->type]; TdThreadRwlock *pLock = &pSdb->locks[pRow->type];
taosRLockLatch(pLock); taosThreadRwlockRdlock(pLock);
taosHashCancelIterate(hash, pIter); taosHashCancelIterate(hash, pIter);
taosRUnLockLatch(pLock); taosThreadRwlockUnlock(pLock);
} }
void sdbTraverse(SSdb *pSdb, ESdbType type, sdbTraverseFp fp, void *p1, void *p2, void *p3) { void sdbTraverse(SSdb *pSdb, ESdbType type, sdbTraverseFp fp, void *p1, void *p2, void *p3) {
SHashObj *hash = sdbGetHash(pSdb, type); SHashObj *hash = sdbGetHash(pSdb, type);
if (hash == NULL) return; if (hash == NULL) return;
SRWLatch *pLock = &pSdb->locks[type]; TdThreadRwlock *pLock = &pSdb->locks[type];
taosRLockLatch(pLock); taosThreadRwlockRdlock(pLock);
SSdbRow **ppRow = taosHashIterate(hash, NULL); SSdbRow **ppRow = taosHashIterate(hash, NULL);
while (ppRow != NULL) { while (ppRow != NULL) {
@ -401,17 +401,17 @@ void sdbTraverse(SSdb *pSdb, ESdbType type, sdbTraverseFp fp, void *p1, void *p2
ppRow = taosHashIterate(hash, ppRow); ppRow = taosHashIterate(hash, ppRow);
} }
taosRUnLockLatch(pLock); taosThreadRwlockUnlock(pLock);
} }
int32_t sdbGetSize(SSdb *pSdb, ESdbType type) { int32_t sdbGetSize(SSdb *pSdb, ESdbType type) {
SHashObj *hash = sdbGetHash(pSdb, type); SHashObj *hash = sdbGetHash(pSdb, type);
if (hash == NULL) return 0; if (hash == NULL) return 0;
SRWLatch *pLock = &pSdb->locks[type]; TdThreadRwlock *pLock = &pSdb->locks[type];
taosRLockLatch(pLock); taosThreadRwlockRdlock(pLock);
int32_t size = taosHashGetSize(hash); int32_t size = taosHashGetSize(hash);
taosRUnLockLatch(pLock); taosThreadRwlockUnlock(pLock);
return size; return size;
} }
@ -424,8 +424,8 @@ int32_t sdbGetMaxId(SSdb *pSdb, ESdbType type) {
int32_t maxId = 0; int32_t maxId = 0;
SRWLatch *pLock = &pSdb->locks[type]; TdThreadRwlock *pLock = &pSdb->locks[type];
taosRLockLatch(pLock); taosThreadRwlockRdlock(pLock);
SSdbRow **ppRow = taosHashIterate(hash, NULL); SSdbRow **ppRow = taosHashIterate(hash, NULL);
while (ppRow != NULL) { while (ppRow != NULL) {
@ -435,7 +435,7 @@ int32_t sdbGetMaxId(SSdb *pSdb, ESdbType type) {
ppRow = taosHashIterate(hash, ppRow); ppRow = taosHashIterate(hash, ppRow);
} }
taosRUnLockLatch(pLock); taosThreadRwlockUnlock(pLock);
maxId = TMAX(maxId, pSdb->maxId[type]); maxId = TMAX(maxId, pSdb->maxId[type]);
return maxId + 1; return maxId + 1;