refactor: change lockfree to rwlock

This commit is contained in:
Shengliang Guan 2022-05-19 15:52:35 +08:00
parent 380780331d
commit 2ee38b94fd
4 changed files with 64 additions and 62 deletions

View File

@ -333,23 +333,23 @@ SSdbRow *sdbAllocRow(int32_t objSize);
void *sdbGetRowObj(SSdbRow *pRow);
typedef struct SSdb {
SMnode *pMnode;
char *currDir;
char *syncDir;
char *tmpDir;
int64_t lastCommitVer;
int64_t curVer;
int64_t tableVer[SDB_MAX];
int64_t maxId[SDB_MAX];
EKeyType keyTypes[SDB_MAX];
SHashObj *hashObjs[SDB_MAX];
SRWLatch locks[SDB_MAX];
SdbInsertFp insertFps[SDB_MAX];
SdbUpdateFp updateFps[SDB_MAX];
SdbDeleteFp deleteFps[SDB_MAX];
SdbDeployFp deployFps[SDB_MAX];
SdbEncodeFp encodeFps[SDB_MAX];
SdbDecodeFp decodeFps[SDB_MAX];
SMnode *pMnode;
char *currDir;
char *syncDir;
char *tmpDir;
int64_t lastCommitVer;
int64_t curVer;
int64_t tableVer[SDB_MAX];
int64_t maxId[SDB_MAX];
EKeyType keyTypes[SDB_MAX];
SHashObj *hashObjs[SDB_MAX];
TdThreadRwlock locks[SDB_MAX];
SdbInsertFp insertFps[SDB_MAX];
SdbUpdateFp updateFps[SDB_MAX];
SdbDeleteFp deleteFps[SDB_MAX];
SdbDeployFp deployFps[SDB_MAX];
SdbEncodeFp encodeFps[SDB_MAX];
SdbDecodeFp decodeFps[SDB_MAX];
} SSdb;
#ifdef __cplusplus

View File

@ -48,7 +48,7 @@ SSdb *sdbInit(SSdbOpt *pOption) {
}
for (ESdbType i = 0; i < SDB_MAX; ++i) {
taosInitRWLatch(&pSdb->locks[i]);
taosThreadRwlockInit(&pSdb->locks[i], NULL);
pSdb->maxId[i] = 0;
pSdb->tableVer[i] = 0;
pSdb->keyTypes[i] = SDB_KEY_INT32;
@ -98,7 +98,10 @@ void sdbCleanup(SSdb *pSdb) {
taosHashClear(hash);
taosHashCleanup(hash);
taosThreadRwlockDestroy(&pSdb->locks[i]);
pSdb->hashObjs[i] = NULL;
memset(&pSdb->locks[i], 0, sizeof(pSdb->locks[i]));
mDebug("sdb table:%s is cleaned up", sdbTableName(i));
}
@ -134,7 +137,6 @@ int32_t sdbSetTable(SSdb *pSdb, SSdbTable table) {
pSdb->maxId[sdbType] = 0;
pSdb->hashObjs[sdbType] = hash;
taosInitRWLatch(&pSdb->locks[sdbType]);
mDebug("sdb table:%s is initialized", sdbTableName(sdbType));
return 0;

View File

@ -257,8 +257,8 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) {
mTrace("write %s to file, total %d rows", sdbTableName(i), sdbGetSize(pSdb, i));
SHashObj *hash = pSdb->hashObjs[i];
SRWLatch *pLock = &pSdb->locks[i];
taosWLockLatch(pLock);
TdThreadRwlock *pLock = &pSdb->locks[i];
taosThreadRwlockWrlock(pLock);
SSdbRow **ppRow = taosHashIterate(hash, NULL);
while (ppRow != NULL) {
@ -303,7 +303,7 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) {
sdbFreeRaw(pRaw);
ppRow = taosHashIterate(hash, ppRow);
}
taosWUnLockLatch(pLock);
taosThreadRwlockUnlock(pLock);
}
if (code == 0) {

View File

@ -129,12 +129,12 @@ static int32_t sdbGetkeySize(SSdb *pSdb, ESdbType type, const void *pKey) {
}
static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pRow, int32_t keySize) {
SRWLatch *pLock = &pSdb->locks[pRow->type];
taosWLockLatch(pLock);
TdThreadRwlock *pLock = &pSdb->locks[pRow->type];
taosThreadRwlockWrlock(pLock);
SSdbRow *pOldRow = taosHashGet(hash, pRow->pObj, keySize);
if (pOldRow != NULL) {
taosWUnLockLatch(pLock);
taosThreadRwlockUnlock(pLock);
sdbFreeRow(pSdb, pRow, false);
terrno = TSDB_CODE_SDB_OBJ_ALREADY_THERE;
return terrno;
@ -145,13 +145,13 @@ static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
sdbPrintOper(pSdb, pRow, "insert");
if (taosHashPut(hash, pRow->pObj, keySize, &pRow, sizeof(void *)) != 0) {
taosWUnLockLatch(pLock);
taosThreadRwlockUnlock(pLock);
sdbFreeRow(pSdb, pRow, false);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return terrno;
}
taosWUnLockLatch(pLock);
taosThreadRwlockUnlock(pLock);
int32_t code = 0;
SdbInsertFp insertFp = pSdb->insertFps[pRow->type];
@ -159,9 +159,9 @@ static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
code = (*insertFp)(pSdb, pRow->pObj);
if (code != 0) {
code = terrno;
taosWLockLatch(pLock);
taosThreadRwlockWrlock(pLock);
taosHashRemove(hash, pRow->pObj, keySize);
taosWUnLockLatch(pLock);
taosThreadRwlockUnlock(pLock);
sdbFreeRow(pSdb, pRow, false);
terrno = code;
return terrno;
@ -180,19 +180,19 @@ static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
}
static int32_t sdbUpdateRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pNewRow, int32_t keySize) {
SRWLatch *pLock = &pSdb->locks[pNewRow->type];
taosWLockLatch(pLock);
TdThreadRwlock *pLock = &pSdb->locks[pNewRow->type];
taosThreadRwlockWrlock(pLock);
SSdbRow **ppOldRow = taosHashGet(hash, pNewRow->pObj, keySize);
if (ppOldRow == NULL || *ppOldRow == NULL) {
taosWUnLockLatch(pLock);
taosThreadRwlockUnlock(pLock);
return sdbInsertRow(pSdb, hash, pRaw, pNewRow, keySize);
}
SSdbRow *pOldRow = *ppOldRow;
pOldRow->status = pRaw->status;
sdbPrintOper(pSdb, pOldRow, "update");
taosWUnLockLatch(pLock);
taosThreadRwlockUnlock(pLock);
int32_t code = 0;
SdbUpdateFp updateFp = pSdb->updateFps[pNewRow->type];
@ -207,12 +207,12 @@ static int32_t sdbUpdateRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
}
static int32_t sdbDeleteRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pRow, int32_t keySize) {
SRWLatch *pLock = &pSdb->locks[pRow->type];
taosWLockLatch(pLock);
TdThreadRwlock *pLock = &pSdb->locks[pRow->type];
taosThreadRwlockWrlock(pLock);
SSdbRow **ppOldRow = taosHashGet(hash, pRow->pObj, keySize);
if (ppOldRow == NULL || *ppOldRow == NULL) {
taosWUnLockLatch(pLock);
taosThreadRwlockUnlock(pLock);
sdbFreeRow(pSdb, pRow, false);
terrno = TSDB_CODE_SDB_OBJ_NOT_THERE;
return terrno;
@ -223,7 +223,7 @@ static int32_t sdbDeleteRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
sdbPrintOper(pSdb, pOldRow, "delete");
taosHashRemove(hash, pOldRow->pObj, keySize);
taosWUnLockLatch(pLock);
taosThreadRwlockUnlock(pLock);
pSdb->tableVer[pOldRow->type]++;
sdbFreeRow(pSdb, pRow, false);
@ -278,12 +278,12 @@ void *sdbAcquire(SSdb *pSdb, ESdbType type, const void *pKey) {
void *pRet = NULL;
int32_t keySize = sdbGetkeySize(pSdb, type, pKey);
SRWLatch *pLock = &pSdb->locks[type];
taosRLockLatch(pLock);
TdThreadRwlock *pLock = &pSdb->locks[type];
taosThreadRwlockRdlock(pLock);
SSdbRow **ppRow = taosHashGet(hash, pKey, keySize);
if (ppRow == NULL || *ppRow == NULL) {
taosRUnLockLatch(pLock);
taosThreadRwlockUnlock(pLock);
terrno = TSDB_CODE_SDB_OBJ_NOT_THERE;
return NULL;
}
@ -306,13 +306,13 @@ void *sdbAcquire(SSdb *pSdb, ESdbType type, const void *pKey) {
break;
}
taosRUnLockLatch(pLock);
taosThreadRwlockUnlock(pLock);
return pRet;
}
static void sdbCheckRow(SSdb *pSdb, SSdbRow *pRow) {
SRWLatch *pLock = &pSdb->locks[pRow->type];
taosWLockLatch(pLock);
TdThreadRwlock *pLock = &pSdb->locks[pRow->type];
taosThreadRwlockWrlock(pLock);
int32_t ref = atomic_load_32(&pRow->refCount);
sdbPrintOper(pSdb, pRow, "check");
@ -320,7 +320,7 @@ static void sdbCheckRow(SSdb *pSdb, SSdbRow *pRow) {
sdbFreeRow(pSdb, pRow, true);
}
taosWUnLockLatch(pLock);
taosThreadRwlockUnlock(pLock);
}
void sdbRelease(SSdb *pSdb, void *pObj) {
@ -329,8 +329,8 @@ void sdbRelease(SSdb *pSdb, void *pObj) {
SSdbRow *pRow = (SSdbRow *)((char *)pObj - sizeof(SSdbRow));
if (pRow->type >= SDB_MAX) return;
SRWLatch *pLock = &pSdb->locks[pRow->type];
taosWLockLatch(pLock);
TdThreadRwlock *pLock = &pSdb->locks[pRow->type];
taosThreadRwlockWrlock(pLock);
int32_t ref = atomic_sub_fetch_32(&pRow->refCount, 1);
sdbPrintOper(pSdb, pRow, "release");
@ -338,7 +338,7 @@ void sdbRelease(SSdb *pSdb, void *pObj) {
sdbFreeRow(pSdb, pRow, true);
}
taosWUnLockLatch(pLock);
taosThreadRwlockUnlock(pLock);
}
void *sdbFetch(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj) {
@ -347,8 +347,8 @@ void *sdbFetch(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj) {
SHashObj *hash = sdbGetHash(pSdb, type);
if (hash == NULL) return NULL;
SRWLatch *pLock = &pSdb->locks[type];
taosRLockLatch(pLock);
TdThreadRwlock *pLock = &pSdb->locks[type];
taosThreadRwlockRdlock(pLock);
SSdbRow **ppRow = taosHashIterate(hash, pIter);
while (ppRow != NULL) {
@ -363,7 +363,7 @@ void *sdbFetch(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj) {
*ppObj = pRow->pObj;
break;
}
taosRUnLockLatch(pLock);
taosThreadRwlockUnlock(pLock);
return ppRow;
}
@ -374,18 +374,18 @@ void sdbCancelFetch(SSdb *pSdb, void *pIter) {
SHashObj *hash = sdbGetHash(pSdb, pRow->type);
if (hash == NULL) return;
SRWLatch *pLock = &pSdb->locks[pRow->type];
taosRLockLatch(pLock);
TdThreadRwlock *pLock = &pSdb->locks[pRow->type];
taosThreadRwlockRdlock(pLock);
taosHashCancelIterate(hash, pIter);
taosRUnLockLatch(pLock);
taosThreadRwlockUnlock(pLock);
}
void sdbTraverse(SSdb *pSdb, ESdbType type, sdbTraverseFp fp, void *p1, void *p2, void *p3) {
SHashObj *hash = sdbGetHash(pSdb, type);
if (hash == NULL) return;
SRWLatch *pLock = &pSdb->locks[type];
taosRLockLatch(pLock);
TdThreadRwlock *pLock = &pSdb->locks[type];
taosThreadRwlockRdlock(pLock);
SSdbRow **ppRow = taosHashIterate(hash, NULL);
while (ppRow != NULL) {
@ -401,17 +401,17 @@ void sdbTraverse(SSdb *pSdb, ESdbType type, sdbTraverseFp fp, void *p1, void *p2
ppRow = taosHashIterate(hash, ppRow);
}
taosRUnLockLatch(pLock);
taosThreadRwlockUnlock(pLock);
}
int32_t sdbGetSize(SSdb *pSdb, ESdbType type) {
SHashObj *hash = sdbGetHash(pSdb, type);
if (hash == NULL) return 0;
SRWLatch *pLock = &pSdb->locks[type];
taosRLockLatch(pLock);
TdThreadRwlock *pLock = &pSdb->locks[type];
taosThreadRwlockRdlock(pLock);
int32_t size = taosHashGetSize(hash);
taosRUnLockLatch(pLock);
taosThreadRwlockUnlock(pLock);
return size;
}
@ -424,8 +424,8 @@ int32_t sdbGetMaxId(SSdb *pSdb, ESdbType type) {
int32_t maxId = 0;
SRWLatch *pLock = &pSdb->locks[type];
taosRLockLatch(pLock);
TdThreadRwlock *pLock = &pSdb->locks[type];
taosThreadRwlockRdlock(pLock);
SSdbRow **ppRow = taosHashIterate(hash, NULL);
while (ppRow != NULL) {
@ -435,7 +435,7 @@ int32_t sdbGetMaxId(SSdb *pSdb, ESdbType type) {
ppRow = taosHashIterate(hash, ppRow);
}
taosRUnLockLatch(pLock);
taosThreadRwlockUnlock(pLock);
maxId = TMAX(maxId, pSdb->maxId[type]);
return maxId + 1;