Merge branch 'feature/tdb' of https://github.com/taosdata/TDengine into feature/meta

This commit is contained in:
Hongze Cheng 2022-03-28 05:51:43 +00:00
commit 417703c9b7
16 changed files with 324 additions and 203 deletions

View File

@ -1,6 +1,6 @@
add_subdirectory(transport) add_subdirectory(transport)
add_subdirectory(sync) add_subdirectory(sync)
# add_subdirectory(tdb) add_subdirectory(tdb)
add_subdirectory(index) add_subdirectory(index)
add_subdirectory(wal) add_subdirectory(wal)
add_subdirectory(parser) add_subdirectory(parser)

View File

@ -9,6 +9,7 @@ target_sources(tdb
"src/db/tdbDb.c" "src/db/tdbDb.c"
"src/db/tdbEnv.c" "src/db/tdbEnv.c"
"src/db/tdbTxn.c" "src/db/tdbTxn.c"
"src/db/tdbOs.c"
"src/page/tdbPage.c" "src/page/tdbPage.c"
"src/page/tdbPageL.c" "src/page/tdbPageL.c"
) )

View File

@ -22,43 +22,6 @@
extern "C" { extern "C" {
#endif #endif
// typedef struct STDb TDB;
// typedef struct STDbEnv TENV;
// typedef struct STDbCurosr TDBC;
// typedef int32_t pgsz_t;
// typedef int32_t cachesz_t;
// typedef int (*TdbKeyCmprFn)(int keyLen1, const void *pKey1, int keyLen2, const void *pKey2);
// // TEVN
// int tdbEnvCreate(TENV **ppEnv, const char *rootDir);
// int tdbEnvOpen(TENV *ppEnv);
// int tdbEnvClose(TENV *pEnv);
// int tdbEnvSetCache(TENV *pEnv, pgsz_t pgSize, cachesz_t cacheSize);
// pgsz_t tdbEnvGetPageSize(TENV *pEnv);
// cachesz_t tdbEnvGetCacheSize(TENV *pEnv);
// int tdbEnvBeginTxn(TENV *pEnv);
// int tdbEnvCommit(TENV *pEnv);
// // TDB
// int tdbCreate(TDB **ppDb);
// int tdbOpen(TDB *pDb, const char *fname, const char *dbname, TENV *pEnv);
// int tdbClose(TDB *pDb);
// int tdbDrop(TDB *pDb);
// int tdbSetKeyLen(TDB *pDb, int klen);
// int tdbSetValLen(TDB *pDb, int vlen);
// int tdbSetDup(TDB *pDb, int dup);
// int tdbSetCmprFunc(TDB *pDb, TdbKeyCmprFn fn);
// int tdbGetKeyLen(TDB *pDb);
// int tdbGetValLen(TDB *pDb);
// int tdbGetDup(TDB *pDb);
// int tdbInsert(TDB *pDb, const void *pKey, int nKey, const void *pData, int nData);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -87,7 +87,7 @@ int tdbBtreeOpen(int keyLen, int valLen, SPager *pPager, FKeyComparator kcmpr, S
*ppBt = NULL; *ppBt = NULL;
pBt = (SBTree *)calloc(1, sizeof(*pBt)); pBt = (SBTree *)tdbOsCalloc(1, sizeof(*pBt));
if (pBt == NULL) { if (pBt == NULL) {
return -1; return -1;
} }
@ -121,7 +121,7 @@ int tdbBtreeOpen(int keyLen, int valLen, SPager *pPager, FKeyComparator kcmpr, S
// TODO: pBt->root // TODO: pBt->root
ret = tdbBtreeOpenImpl(pBt); ret = tdbBtreeOpenImpl(pBt);
if (ret < 0) { if (ret < 0) {
free(pBt); tdbOsFree(pBt);
return -1; return -1;
} }
@ -166,7 +166,7 @@ int tdbBtCursorInsert(SBTC *pBtc, const void *pKey, int kLen, const void *pVal,
// TODO: refact code here // TODO: refact code here
pBt = pBtc->pBt; pBt = pBtc->pBt;
if (!pBt->pTmp) { if (!pBt->pTmp) {
pBt->pTmp = (u8 *)malloc(pBt->pageSize); pBt->pTmp = (u8 *)tdbOsMalloc(pBt->pageSize);
if (pBt->pTmp == NULL) { if (pBt->pTmp == NULL) {
return -1; return -1;
} }
@ -550,7 +550,7 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx) {
if (sIdx + i < TDB_PAGE_TOTAL_CELLS(pParent)) { if (sIdx + i < TDB_PAGE_TOTAL_CELLS(pParent)) {
pCell = tdbPageGetCell(pParent, sIdx + i); pCell = tdbPageGetCell(pParent, sIdx + i);
szDivCell[i] = tdbBtreeCellSize(pParent, pCell); szDivCell[i] = tdbBtreeCellSize(pParent, pCell);
pDivCell[i] = malloc(szDivCell[i]); pDivCell[i] = tdbOsMalloc(szDivCell[i]);
memcpy(pDivCell[i], pCell, szDivCell[i]); memcpy(pDivCell[i], pCell, szDivCell[i]);
} }
@ -740,13 +740,13 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx) {
tdbBtreeDecodeCell(pPage, pCell, &cd); tdbBtreeDecodeCell(pPage, pCell, &cd);
// TODO: pCell here may be inserted as an overflow cell, handle it // TODO: pCell here may be inserted as an overflow cell, handle it
SCell *pNewCell = malloc(cd.kLen + 9); SCell *pNewCell = tdbOsMalloc(cd.kLen + 9);
int szNewCell; int szNewCell;
SPgno pgno; SPgno pgno;
pgno = TDB_PAGE_PGNO(pNews[iNew]); pgno = TDB_PAGE_PGNO(pNews[iNew]);
tdbBtreeEncodeCell(pParent, cd.pKey, cd.kLen, (void *)&pgno, sizeof(SPgno), pNewCell, &szNewCell); tdbBtreeEncodeCell(pParent, cd.pKey, cd.kLen, (void *)&pgno, sizeof(SPgno), pNewCell, &szNewCell);
tdbPageInsertCell(pParent, sIdx++, pNewCell, szNewCell, 0); tdbPageInsertCell(pParent, sIdx++, pNewCell, szNewCell, 0);
free(pNewCell); tdbOsFree(pNewCell);
} }
// move to next new page // move to next new page
@ -798,7 +798,7 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx) {
for (int i = 0; i < 3; i++) { for (int i = 0; i < 3; i++) {
if (pDivCell[i]) { if (pDivCell[i]) {
free(pDivCell[i]); tdbOsFree(pDivCell[i]);
} }
} }

View File

@ -34,7 +34,7 @@ int tdbDbOpen(const char *fname, int keyLen, int valLen, FKeyComparator keyCmprF
*ppDb = NULL; *ppDb = NULL;
pDb = (STDB *)calloc(1, sizeof(*pDb)); pDb = (STDB *)tdbOsCalloc(1, sizeof(*pDb));
if (pDb == NULL) { if (pDb == NULL) {
return -1; return -1;
} }
@ -101,7 +101,7 @@ int tdbDbcOpen(STDB *pDb, STDBC **ppDbc) {
STDBC *pDbc = NULL; STDBC *pDbc = NULL;
*ppDbc = NULL; *ppDbc = NULL;
pDbc = malloc(sizeof(*pDbc)); pDbc = (STDBC *)tdbOsMalloc(sizeof(*pDbc));
if (pDbc == NULL) { if (pDbc == NULL) {
return -1; return -1;
} }
@ -126,7 +126,7 @@ int tdbDbNext(STDBC *pDbc, void **ppKey, int *kLen, void **ppVal, int *vLen) {
int tdbDbcClose(STDBC *pDbc) { int tdbDbcClose(STDBC *pDbc) {
if (pDbc) { if (pDbc) {
free(pDbc); tdbOsFree(pDbc);
} }
return 0; return 0;

View File

@ -27,7 +27,7 @@ int tdbEnvOpen(const char *rootDir, int pageSize, int cacheSize, STEnv **ppEnv)
dsize = strlen(rootDir); dsize = strlen(rootDir);
zsize = sizeof(*pEnv) + dsize * 2 + strlen(TDB_JOURNAL_NAME) + 3; zsize = sizeof(*pEnv) + dsize * 2 + strlen(TDB_JOURNAL_NAME) + 3;
pPtr = (uint8_t *)calloc(1, zsize); pPtr = (uint8_t *)tdbOsCalloc(1, zsize);
if (pPtr == NULL) { if (pPtr == NULL) {
return -1; return -1;
} }

View File

@ -0,0 +1,98 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tdbInt.h"
#ifndef TDB_FOR_TDENGINE
// tdbOsRead
i64 tdbOsRead(tdb_fd_t fd, void *pData, i64 nBytes) {
i64 nRead = 0;
i64 iRead = 0;
u8 *pBuf = (u8 *)pData;
while (nBytes > 0) {
iRead = read(fd, pBuf, nBytes);
if (iRead < 0) {
if (errno == EINTR) {
continue;
} else {
return -1;
}
} else if (iRead == 0) {
break;
}
nRead += iRead;
pBuf += iRead;
nBytes -= iRead;
}
return nRead;
}
// tdbOsPRead
i64 tdbOsPRead(tdb_fd_t fd, void *pData, i64 nBytes, i64 offset) {
i64 nRead = 0;
i64 iRead = 0;
i64 iOffset = offset;
u8 *pBuf = (u8 *)pData;
while (nBytes > 0) {
iRead = pread(fd, pBuf, nBytes, iOffset);
if (iRead < 0) {
if (errno == EINTR) {
continue;
} else {
return -1;
}
} else if (iRead == 0) {
break;
}
nRead += iRead;
pBuf += iRead;
iOffset += iRead;
nBytes -= iRead;
}
return nRead;
}
// tdbOsWrite
i64 tdbOsWrite(tdb_fd_t fd, const void *pData, i64 nBytes) {
i64 nWrite = 0;
i64 iWrite = 0;
u8 *pBuf = (u8 *)pData;
while (nBytes > 0) {
iWrite = write(fd, pBuf, nBytes);
if (iWrite < 0) {
if (errno == EINTR) {
continue;
}
return -1;
}
nWrite += iWrite;
pBuf += iWrite;
nBytes -= iWrite;
}
return nWrite;
}
#endif

View File

@ -15,16 +15,16 @@
#include "tdbInt.h" #include "tdbInt.h"
struct SPCache { struct SPCache {
int pageSize; int pageSize;
int cacheSize; int cacheSize;
pthread_mutex_t mutex; tdb_mutex_t mutex;
int nFree; int nFree;
SPage *pFree; SPage *pFree;
int nPage; int nPage;
int nHash; int nHash;
SPage **pgHash; SPage **pgHash;
int nRecyclable; int nRecyclable;
SPage lru; SPage lru;
}; };
#define PCACHE_PAGE_HASH(pPgid) \ #define PCACHE_PAGE_HASH(pPgid) \
@ -63,7 +63,7 @@ int tdbPCacheOpen(int pageSize, int cacheSize, SPCache **ppCache) {
void *pPtr; void *pPtr;
SPage *pPgHdr; SPage *pPgHdr;
pCache = (SPCache *)calloc(1, sizeof(*pCache)); pCache = (SPCache *)tdbOsCalloc(1, sizeof(*pCache));
if (pCache == NULL) { if (pCache == NULL) {
return -1; return -1;
} }
@ -72,7 +72,7 @@ int tdbPCacheOpen(int pageSize, int cacheSize, SPCache **ppCache) {
pCache->cacheSize = cacheSize; pCache->cacheSize = cacheSize;
if (tdbPCacheOpenImpl(pCache) < 0) { if (tdbPCacheOpenImpl(pCache) < 0) {
free(pCache); tdbOsFree(pCache);
return -1; return -1;
} }
@ -116,13 +116,13 @@ void tdbPCacheRelease(SPCache *pCache, SPage *pPage) {
} }
} }
static void tdbPCacheInitLock(SPCache *pCache) { pthread_mutex_init(&(pCache->mutex), NULL); } static void tdbPCacheInitLock(SPCache *pCache) { tdbMutexInit(&(pCache->mutex), NULL); }
static void tdbPCacheClearLock(SPCache *pCache) { pthread_mutex_destroy(&(pCache->mutex)); } static void tdbPCacheClearLock(SPCache *pCache) { tdbMutexDestroy(&(pCache->mutex)); }
static void tdbPCacheLock(SPCache *pCache) { pthread_mutex_lock(&(pCache->mutex)); } static void tdbPCacheLock(SPCache *pCache) { tdbMutexLock(&(pCache->mutex)); }
static void tdbPCacheUnlock(SPCache *pCache) { pthread_mutex_unlock(&(pCache->mutex)); } static void tdbPCacheUnlock(SPCache *pCache) { tdbMutexDestroy(&(pCache->mutex)); }
static bool tdbPCacheLocked(SPCache *pCache) { static bool tdbPCacheLocked(SPCache *pCache) {
assert(0); assert(0);
@ -268,7 +268,7 @@ static int tdbPCacheOpenImpl(SPCache *pCache) {
// Open the hash table // Open the hash table
pCache->nPage = 0; pCache->nPage = 0;
pCache->nHash = pCache->cacheSize; pCache->nHash = pCache->cacheSize;
pCache->pgHash = (SPage **)calloc(pCache->nHash, sizeof(SPage *)); pCache->pgHash = (SPage **)tdbOsCalloc(pCache->nHash, sizeof(SPage *));
if (pCache->pgHash == NULL) { if (pCache->pgHash == NULL) {
// TODO // TODO
return -1; return -1;

View File

@ -20,8 +20,8 @@ struct SPager {
char *jFileName; char *jFileName;
int pageSize; int pageSize;
uint8_t fid[TDB_FILE_ID_LEN]; uint8_t fid[TDB_FILE_ID_LEN];
int fd; tdb_fd_t fd;
int jfd; tdb_fd_t jfd;
SPCache *pCache; SPCache *pCache;
SPgno dbFileSize; SPgno dbFileSize;
SPgno dbOrigSize; SPgno dbOrigSize;
@ -60,7 +60,7 @@ int tdbPagerOpen(SPCache *pCache, const char *fileName, SPager **ppPager) {
zsize = sizeof(*pPager) /* SPager */ zsize = sizeof(*pPager) /* SPager */
+ fsize + 1 /* dbFileName */ + fsize + 1 /* dbFileName */
+ fsize + 8 + 1; /* jFileName */ + fsize + 8 + 1; /* jFileName */
pPtr = (uint8_t *)calloc(1, zsize); pPtr = (uint8_t *)tdbOsCalloc(1, zsize);
if (pPtr == NULL) { if (pPtr == NULL) {
return -1; return -1;
} }
@ -80,7 +80,7 @@ int tdbPagerOpen(SPCache *pCache, const char *fileName, SPager **ppPager) {
// pPager->pCache // pPager->pCache
pPager->pCache = pCache; pPager->pCache = pCache;
pPager->fd = open(pPager->dbFileName, O_RDWR | O_CREAT, 0755); pPager->fd = tdbOsOpen(pPager->dbFileName, O_RDWR | O_CREAT);
if (pPager->fd < 0) { if (pPager->fd < 0) {
return -1; return -1;
} }
@ -90,7 +90,7 @@ int tdbPagerOpen(SPCache *pCache, const char *fileName, SPager **ppPager) {
return -1; return -1;
} }
pPager->jfd = -1; // pPager->jfd = -1;
pPager->pageSize = tdbPCacheGetPageSize(pCache); pPager->pageSize = tdbPCacheGetPageSize(pCache);
*ppPager = pPager; *ppPager = pPager;
@ -168,7 +168,7 @@ int tdbPagerBegin(SPager *pPager) {
} }
// Open the journal // Open the journal
pPager->jfd = open(pPager->jFileName, O_RDWR | O_CREAT, 0755); pPager->jfd = tdbOsOpen(pPager->jFileName, O_RDWR | O_CREAT);
if (pPager->jfd < 0) { if (pPager->jfd < 0) {
return -1; return -1;
} }
@ -206,11 +206,11 @@ int tdbPagerCommit(SPager *pPager) {
// TODO: release the page // TODO: release the page
} }
fsync(pPager->fd); tdbOsFSync(pPager->fd);
close(pPager->jfd); tdbOsClose(pPager->jfd);
remove(pPager->jFileName); tdbOsRemove(pPager->jFileName);
pPager->jfd = -1; // pPager->jfd = -1;
return 0; return 0;
} }
@ -222,7 +222,7 @@ static int tdbPagerReadPage(SPager *pPager, SPage *pPage) {
ASSERT(memcmp(pPager->fid, pPage->pgid.fileid, TDB_FILE_ID_LEN) == 0); ASSERT(memcmp(pPager->fid, pPage->pgid.fileid, TDB_FILE_ID_LEN) == 0);
offset = (pPage->pgid.pgno - 1) * (i64)(pPager->pageSize); offset = (pPage->pgid.pgno - 1) * (i64)(pPager->pageSize);
ret = tdbPRead(pPager->fd, pPage->pData, pPager->pageSize, offset); ret = tdbOsPRead(pPager->fd, pPage->pData, pPager->pageSize, offset);
if (ret < 0) { if (ret < 0) {
// TODO: handle error // TODO: handle error
return -1; return -1;
@ -377,12 +377,12 @@ static int tdbPagerWritePageToJournal(SPager *pPager, SPage *pPage) {
pgno = TDB_PAGE_PGNO(pPage); pgno = TDB_PAGE_PGNO(pPage);
ret = tdbWrite(pPager->jfd, &pgno, sizeof(pgno)); ret = tdbOsWrite(pPager->jfd, &pgno, sizeof(pgno));
if (ret < 0) { if (ret < 0) {
return -1; return -1;
} }
ret = tdbWrite(pPager->jfd, pPage->pData, pPage->pageSize); ret = tdbOsWrite(pPager->jfd, pPage->pData, pPage->pageSize);
if (ret < 0) { if (ret < 0) {
return -1; return -1;
} }
@ -395,12 +395,12 @@ static int tdbPagerWritePageToDB(SPager *pPager, SPage *pPage) {
int ret; int ret;
offset = pPage->pageSize * TDB_PAGE_PGNO(pPage); offset = pPage->pageSize * TDB_PAGE_PGNO(pPage);
if (lseek(pPager->fd, offset, SEEK_SET) < 0) { if (tdbOsLSeek(pPager->fd, offset, SEEK_SET) < 0) {
ASSERT(0); ASSERT(0);
return -1; return -1;
} }
ret = tdbWrite(pPager->fd, pPage->pData, pPage->pageSize); ret = tdbOsWrite(pPager->fd, pPage->pData, pPage->pageSize);
if (ret < 0) { if (ret < 0) {
ASSERT(0); ASSERT(0);
return -1; return -1;

View File

@ -32,66 +32,3 @@ int tdbGnrtFileID(const char *fname, uint8_t *fileid, bool unique) {
return 0; return 0;
} }
// int tdbCheckFileAccess(const char *pathname, int mode) {
// int flags = 0;
// if (mode & TDB_F_OK) {
// flags |= F_OK;
// }
// if (mode & TDB_R_OK) {
// flags |= R_OK;
// }
// if (mode & TDB_W_OK) {
// flags |= W_OK;
// }
// return access(pathname, flags);
// }
int tdbGetFileSize(const char *fname, int pgSize, SPgno *pSize) {
struct stat st;
int ret;
int64_t file_size = 0;
ret = taosStatFile(fname, &file_size, NULL);
if (ret != 0) {
return -1;
}
ASSERT(file_size % pgSize == 0);
*pSize = file_size / pgSize;
return 0;
}
int tdbPRead(int fd, void *pData, int count, i64 offset) {
void *pBuf;
int nbytes;
i64 ioffset;
int iread;
pBuf = pData;
nbytes = count;
ioffset = offset;
while (nbytes > 0) {
iread = pread(fd, pBuf, nbytes, ioffset);
if (iread < 0) {
/* TODO */
} else if (iread == 0) {
return (count - iread);
}
nbytes = nbytes - iread;
pBuf = (void *)((u8 *)pBuf + iread);
ioffset += iread;
}
return count;
}
int tdbWrite(int fd, void *pData, int count) {
// TODO
return write(fd, pData, count);
}

View File

@ -16,10 +16,11 @@
#ifndef _TD_TDB_INTERNAL_H_ #ifndef _TD_TDB_INTERNAL_H_
#define _TD_TDB_INTERNAL_H_ #define _TD_TDB_INTERNAL_H_
#include "os.h"
#include "tlist.h" #include "tlist.h"
#include "tlockfree.h" #include "tlockfree.h"
// #include "tdb.h" #include "tdb.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
@ -148,6 +149,8 @@ typedef struct SPager SPager;
typedef struct SPCache SPCache; typedef struct SPCache SPCache;
typedef struct SPage SPage; typedef struct SPage SPage;
#include "tdbOs.h"
#include "tdbUtil.h" #include "tdbUtil.h"
#include "tdbPCache.h" #include "tdbPCache.h"

View File

@ -0,0 +1,129 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TDB_OS_H_
#define _TDB_OS_H_
#ifdef __cplusplus
extern "C" {
#endif
// TODO: use cmake to control the option
#define TDB_FOR_TDENGINE
// For memory -----------------
#ifdef TDB_FOR_TDENGINE
#define tdbOsMalloc taosMemoryMalloc
#define tdbOsCalloc taosMemoryCalloc
#define tdbOsRealloc taosMemoryRealloc
#define tdbOsFree taosMemoryFree
#else
#define tdbOsMalloc malloc
#define tdbOsCalloc calloc
#define tdbOsRealloc realloc
#define tdbOsFree free
#endif
// For file and directory -----------------
#ifdef TDB_FOR_TDENGINE
/* file */
typedef TdFilePtr tdb_fd_t;
#define tdbOsOpen taosOpenFile
#define tdbOsClose(FD) taosCloseFile(&(FD))
#define tdbOsRead taosReadFile
#define tdbOsPRead taosPReadFile
#define tdbOsWrite taosWriteFile
#define tdbOsFSync taosFsyncFile
#define tdbOsLSeek taosLSeekFile
#define tdbOsRemove remove
/* directory */
#define tdbOsMkdir taosMkDir
#define tdbOsRmdir taosRemoveDir
#else
/* file */
typedef int tdb_fd_t;
#define tdbOsOpen open
#define tdbOsClose close
i64 tdbOsRead(tdb_fd_t fd, void *pData, i64 nBytes);
i64 tdbOsPRead(tdb_fd_t fd, void *pData, i64 nBytes, i64 offset);
i64 tdbOsWrite(tdb_fd_t fd, const void *pData, i64 nBytes);
#define tdbOsFSync fsync
#define tdbOsLSeek lseek
#define tdbOsRemove remove
/* directory */
#define tdbOsMkdir mkdir
#define tdbOsRmdir rmdir
#endif
// For threads and lock -----------------
#ifdef TDB_FOR_TDENGINE
/* spin lock */
typedef TdThreadSpinlock tdb_spinlock_t;
#define tdbSpinlockInit taosThreadSpinInit
#define tdbSpinlockDestroy taosThreadSpinDestroy
#define tdbSpinlockLock taosThreadSpinLock
#define tdbSpinlockUnlock taosThreadSpinUnlock
#define tdbSpinlockTrylock pthread_spin_trylock
/* mutex lock */
typedef TdThreadMutex tdb_mutex_t;
#define tdbMutexInit taosThreadMutexInit
#define tdbMutexDestroy taosThreadMutexDestroy
#define tdbMutexLock taosThreadMutexLock
#define tdbMutexUnlock taosThreadMutexUnlock
#else
/* spin lock */
typedef pthread_spinlock_t tdb_spinlock_t;
#define tdbSpinlockInit pthread_spin_init
#define tdbSpinlockDestroy pthread_spin_destroy
#define tdbSpinlockLock pthread_spin_lock
#define tdbSpinlockUnlock pthread_spin_unlock
#define tdbSpinlockTrylock pthread_spin_trylock
/* mutex lock */
typedef pthread_mutex_t tdb_mutex_t;
#define tdbMutexInit pthread_mutex_init
#define tdbMutexDestroy pthread_mutex_destroy
#define tdbMutexLock pthread_mutex_lock
#define tdbMutexUnlock pthread_mutex_unlock
#endif
#ifdef __cplusplus
}
#endif
#endif /*_TDB_OS_H_*/

View File

@ -53,10 +53,10 @@ typedef struct __attribute__((__packed__)) {
} SPageFtr; } SPageFtr;
struct SPage { struct SPage {
pthread_spinlock_t lock; tdb_spinlock_t lock;
int pageSize; int pageSize;
u8 *pData; u8 *pData;
SPageMethods *pPageMethods; SPageMethods *pPageMethods;
// Fields below used by pager and am // Fields below used by pager and am
u8 *pPageHdr; u8 *pPageHdr;
u8 *pCellIdx; u8 *pCellIdx;
@ -80,21 +80,21 @@ struct SPage {
#define P_LOCK_BUSY 1 #define P_LOCK_BUSY 1
#define P_LOCK_FAIL -1 #define P_LOCK_FAIL -1
#define TDB_INIT_PAGE_LOCK(pPage) pthread_spin_init(&((pPage)->lock), 0) #define TDB_INIT_PAGE_LOCK(pPage) tdbSpinlockInit(&((pPage)->lock), 0)
#define TDB_DESTROY_PAGE_LOCK(pPage) pthread_spin_destroy(&((pPage)->lock)) #define TDB_DESTROY_PAGE_LOCK(pPage) tdbSpinlockDestroy(&((pPage)->lock))
#define TDB_LOCK_PAGE(pPage) pthread_spin_lock(&((pPage)->lock)) #define TDB_LOCK_PAGE(pPage) tdbSpinlockLock(&((pPage)->lock))
#define TDB_UNLOCK_PAGE(pPage) pthread_spin_unlock(&((pPage)->lock)) #define TDB_UNLOCK_PAGE(pPage) tdbSpinlockUnlock(&((pPage)->lock))
#define TDB_TRY_LOCK_PAGE(pPage) \ #define TDB_TRY_LOCK_PAGE(pPage) \
({ \ ({ \
int ret; \ int ret; \
if (pthread_spin_trylock(&((pPage)->lock)) == 0) { \ if (tdbSpinlockTrylock(&((pPage)->lock)) == 0) { \
ret = P_LOCK_SUCC; \ ret = P_LOCK_SUCC; \
} else if (errno == EBUSY) { \ } else if (errno == EBUSY) { \
ret = P_LOCK_BUSY; \ ret = P_LOCK_BUSY; \
} else { \ } else { \
ret = P_LOCK_FAIL; \ ret = P_LOCK_FAIL; \
} \ } \
ret; \ ret; \
}) })
// APIs // APIs

View File

@ -30,47 +30,37 @@ extern "C" {
int tdbGnrtFileID(const char *fname, uint8_t *fileid, bool unique); int tdbGnrtFileID(const char *fname, uint8_t *fileid, bool unique);
// #define TDB_F_OK 0x1 #define TDB_REALLOC(PTR, SIZE) \
// #define TDB_R_OK 0x2 ({ \
// #define TDB_W_OK 0x4 void *nPtr; \
// int tdbCheckFileAccess(const char *pathname, int mode); if ((PTR) == NULL || ((int *)(PTR))[-1] < (SIZE)) { \
nPtr = tdbOsRealloc((PTR) ? (char *)(PTR) - sizeof(int) : NULL, (SIZE) + sizeof(int)); \
int tdbGetFileSize(const char *fname, int pgSize, SPgno *pSize); if (nPtr) { \
((int *)nPtr)[0] = (SIZE); \
int tdbPRead(int fd, void *pData, int count, i64 offset); nPtr = (char *)nPtr + sizeof(int); \
int tdbWrite(int fd, void *pData, int count); } \
} else { \
#define TDB_REALLOC(PTR, SIZE) \ nPtr = (PTR); \
({ \ } \
void *nPtr; \ nPtr; \
if ((PTR) == NULL || ((int *)(PTR))[-1] < (SIZE)) { \
nPtr = realloc((PTR) ? (char *)(PTR) - sizeof(int) : NULL, (SIZE) + sizeof(int)); \
if (nPtr) { \
((int *)nPtr)[0] = (SIZE); \
nPtr = (char *)nPtr + sizeof(int); \
} \
} else { \
nPtr = (PTR); \
} \
nPtr; \
}) })
#define TDB_FREE(PTR) \ #define TDB_FREE(PTR) \
do { \ do { \
if (PTR) { \ if (PTR) { \
free((char *)(PTR) - sizeof(int)); \ tdbOsFree((char *)(PTR) - sizeof(int)); \
} \ } \
} while (0) } while (0)
static inline void *tdbOsMalloc(void *arg, size_t size) { static inline void *tdbDefaultMalloc(void *arg, size_t size) {
void *ptr; void *ptr;
ptr = malloc(size); ptr = tdbOsMalloc(size);
return ptr; return ptr;
} }
static inline void tdbOsFree(void *arg, void *ptr) { free(ptr); } static inline void tdbDefaultFree(void *arg, void *ptr) { tdbOsFree(ptr); }
static inline int tdbPutVarInt(u8 *p, int v) { static inline int tdbPutVarInt(u8 *p, int v) {
int n = 0; int n = 0;

View File

@ -48,7 +48,7 @@ int tdbPageCreate(int pageSize, SPage **ppPage, void *(*xMalloc)(void *, size_t)
*ppPage = NULL; *ppPage = NULL;
size = pageSize + sizeof(*pPage); size = pageSize + sizeof(*pPage);
if (xMalloc == NULL) { if (xMalloc == NULL) {
xMalloc = tdbOsMalloc; xMalloc = tdbDefaultMalloc;
} }
ptr = (u8 *)((*xMalloc)(arg, size)); ptr = (u8 *)((*xMalloc)(arg, size));
@ -76,7 +76,7 @@ int tdbPageDestroy(SPage *pPage, void (*xFree)(void *arg, void *ptr), void *arg)
u8 *ptr; u8 *ptr;
if (!xFree) { if (!xFree) {
xFree = tdbOsFree; xFree = tdbDefaultFree;
} }
ptr = pPage->pData; ptr = pPage->pData;
@ -144,7 +144,7 @@ int tdbPageInsertCell(SPage *pPage, int idx, SCell *pCell, int szCell, u8 asOvfl
} }
// TODO: here has memory leak // TODO: here has memory leak
pNewCell = (SCell *)malloc(szCell); pNewCell = (SCell *)tdbOsMalloc(szCell);
memcpy(pNewCell, pCell, szCell); memcpy(pNewCell, pCell, szCell);
pPage->apOvfl[iOvfl] = pNewCell; pPage->apOvfl[iOvfl] = pNewCell;

View File

@ -11,7 +11,7 @@ typedef struct SPoolMem {
} SPoolMem; } SPoolMem;
static SPoolMem *openPool() { static SPoolMem *openPool() {
SPoolMem *pPool = (SPoolMem *)malloc(sizeof(*pPool)); SPoolMem *pPool = (SPoolMem *)tdbOsMalloc(sizeof(*pPool));
pPool->prev = pPool->next = pPool; pPool->prev = pPool->next = pPool;
pPool->size = 0; pPool->size = 0;
@ -31,12 +31,12 @@ static void closePool(SPoolMem *pPool) {
pMem->prev->next = pMem->next; pMem->prev->next = pMem->next;
pPool->size -= pMem->size; pPool->size -= pMem->size;
free(pMem); tdbOsFree(pMem);
} while (1); } while (1);
assert(pPool->size == 0); assert(pPool->size == 0);
free(pPool); tdbOsFree(pPool);
} }
#define clearPool closePool #define clearPool closePool
@ -46,7 +46,7 @@ static void *poolMalloc(void *arg, int size) {
SPoolMem *pPool = (SPoolMem *)arg; SPoolMem *pPool = (SPoolMem *)arg;
SPoolMem *pMem; SPoolMem *pMem;
pMem = (SPoolMem *)malloc(sizeof(*pMem) + size); pMem = (SPoolMem *)tdbOsMalloc(sizeof(*pMem) + size);
if (pMem == NULL) { if (pMem == NULL) {
assert(0); assert(0);
} }
@ -73,7 +73,7 @@ static void poolFree(void *arg, void *ptr) {
pMem->prev->next = pMem->next; pMem->prev->next = pMem->next;
pPool->size -= pMem->size; pPool->size -= pMem->size;
free(pMem); tdbOsFree(pMem);
} }
static int tKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2) { static int tKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2) {