commit
b014566260
|
@ -201,10 +201,10 @@ void dataColInit(SDataCol *pDataCol, STColumn *pCol, void **pBuf, int maxPoints)
|
||||||
|
|
||||||
pDataCol->len = 0;
|
pDataCol->len = 0;
|
||||||
if (pDataCol->type == TSDB_DATA_TYPE_BINARY || pDataCol->type == TSDB_DATA_TYPE_NCHAR) {
|
if (pDataCol->type == TSDB_DATA_TYPE_BINARY || pDataCol->type == TSDB_DATA_TYPE_NCHAR) {
|
||||||
pDataCol->spaceSize = (sizeof(VarDataLenT) + pDataCol->bytes) * maxPoints;
|
|
||||||
pDataCol->dataOff = (VarDataOffsetT *)(*pBuf);
|
pDataCol->dataOff = (VarDataOffsetT *)(*pBuf);
|
||||||
pDataCol->pData = POINTER_SHIFT(*pBuf, TYPE_BYTES[pDataCol->type] * maxPoints);
|
pDataCol->pData = POINTER_SHIFT(*pBuf, sizeof(VarDataOffsetT) * maxPoints);
|
||||||
*pBuf = POINTER_SHIFT(*pBuf, pDataCol->spaceSize + TYPE_BYTES[pDataCol->type] * maxPoints);
|
pDataCol->spaceSize = pDataCol->bytes * maxPoints;
|
||||||
|
*pBuf = POINTER_SHIFT(*pBuf, pDataCol->spaceSize + sizeof(VarDataOffsetT) * maxPoints);
|
||||||
} else {
|
} else {
|
||||||
pDataCol->spaceSize = pDataCol->bytes * maxPoints;
|
pDataCol->spaceSize = pDataCol->bytes * maxPoints;
|
||||||
pDataCol->dataOff = NULL;
|
pDataCol->dataOff = NULL;
|
||||||
|
|
|
@ -71,6 +71,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_COM_OPS_NOT_SUPPORT, 0, 0x0100, "operations
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_COM_MEMORY_CORRUPTED, 0, 0x0101, "memory corrupted")
|
TAOS_DEFINE_ERROR(TSDB_CODE_COM_MEMORY_CORRUPTED, 0, 0x0101, "memory corrupted")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_COM_OUT_OF_MEMORY, 0, 0x0102, "out of memory")
|
TAOS_DEFINE_ERROR(TSDB_CODE_COM_OUT_OF_MEMORY, 0, 0x0102, "out of memory")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_COM_INVALID_CFG_MSG, 0, 0x0103, "invalid config message")
|
TAOS_DEFINE_ERROR(TSDB_CODE_COM_INVALID_CFG_MSG, 0, 0x0103, "invalid config message")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_COM_FILE_CORRUPTED, 0, 0x0104, "file is corrupted")
|
||||||
|
|
||||||
//client
|
//client
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_SQL, 0, 0x0200, "invalid sql")
|
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_SQL, 0, 0x0200, "invalid sql")
|
||||||
|
|
|
@ -161,7 +161,7 @@ typedef struct {
|
||||||
int64_t index;
|
int64_t index;
|
||||||
int numOfCacheBlocks;
|
int numOfCacheBlocks;
|
||||||
SList * memPool;
|
SList * memPool;
|
||||||
} STsdbCachePool;
|
} STsdbBufferPool;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
TSKEY keyFirst;
|
TSKEY keyFirst;
|
||||||
|
@ -173,7 +173,7 @@ typedef struct {
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int cacheBlockSize;
|
int cacheBlockSize;
|
||||||
int totalCacheBlocks;
|
int totalCacheBlocks;
|
||||||
STsdbCachePool pool;
|
STsdbBufferPool pool;
|
||||||
STsdbCacheBlock *curBlock;
|
STsdbCacheBlock *curBlock;
|
||||||
SCacheMem * mem;
|
SCacheMem * mem;
|
||||||
SCacheMem * imem;
|
SCacheMem * imem;
|
||||||
|
|
|
@ -35,7 +35,7 @@ STsdbCache *tsdbInitCache(int cacheBlockSize, int totalBlocks, TsdbRepoT *pRepo)
|
||||||
pCache->totalCacheBlocks = totalBlocks;
|
pCache->totalCacheBlocks = totalBlocks;
|
||||||
pCache->pRepo = pRepo;
|
pCache->pRepo = pRepo;
|
||||||
|
|
||||||
STsdbCachePool *pPool = &(pCache->pool);
|
STsdbBufferPool *pPool = &(pCache->pool);
|
||||||
pPool->index = 0;
|
pPool->index = 0;
|
||||||
pPool->memPool = tdListNew(sizeof(STsdbCacheBlock *));
|
pPool->memPool = tdListNew(sizeof(STsdbCacheBlock *));
|
||||||
if (pPool->memPool == NULL) goto _err;
|
if (pPool->memPool == NULL) goto _err;
|
||||||
|
@ -106,7 +106,7 @@ static void tsdbFreeCacheMem(SCacheMem *mem) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int tsdbAllocBlockFromPool(STsdbCache *pCache) {
|
static int tsdbAllocBlockFromPool(STsdbCache *pCache) {
|
||||||
STsdbCachePool *pPool = &(pCache->pool);
|
STsdbBufferPool *pPool = &(pCache->pool);
|
||||||
|
|
||||||
tsdbLockRepo(pCache->pRepo);
|
tsdbLockRepo(pCache->pRepo);
|
||||||
if (listNEles(pPool->memPool) == 0) {
|
if (listNEles(pPool->memPool) == 0) {
|
||||||
|
@ -170,7 +170,7 @@ int tsdbAlterCacheTotalBlocks(STsdbRepo *pRepo, int totalBlocks) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int tsdbAddCacheBlockToPool(STsdbCache *pCache) {
|
static int tsdbAddCacheBlockToPool(STsdbCache *pCache) {
|
||||||
STsdbCachePool *pPool = &pCache->pool;
|
STsdbBufferPool *pPool = &pCache->pool;
|
||||||
|
|
||||||
STsdbCacheBlock *pBlock = malloc(sizeof(STsdbCacheBlock) + pCache->cacheBlockSize);
|
STsdbCacheBlock *pBlock = malloc(sizeof(STsdbCacheBlock) + pCache->cacheBlockSize);
|
||||||
if (pBlock == NULL) return -1;
|
if (pBlock == NULL) return -1;
|
||||||
|
@ -184,7 +184,7 @@ static int tsdbAddCacheBlockToPool(STsdbCache *pCache) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int tsdbRemoveCacheBlockFromPool(STsdbCache *pCache) {
|
static int tsdbRemoveCacheBlockFromPool(STsdbCache *pCache) {
|
||||||
STsdbCachePool *pPool = &pCache->pool;
|
STsdbBufferPool *pPool = &pCache->pool;
|
||||||
STsdbCacheBlock *pBlock = NULL;
|
STsdbCacheBlock *pBlock = NULL;
|
||||||
|
|
||||||
ASSERT(pCache->totalCacheBlocks >= 0);
|
ASSERT(pCache->totalCacheBlocks >= 0);
|
||||||
|
|
|
@ -105,7 +105,7 @@ int32_t tsdbInsertMetaRecord(SMetaFile *mfh, uint64_t uid, void *cont, int32_t c
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
fsync(mfh->fd);
|
// fsync(mfh->fd);
|
||||||
|
|
||||||
mfh->tombSize++;
|
mfh->tombSize++;
|
||||||
|
|
||||||
|
@ -132,7 +132,7 @@ int32_t tsdbDeleteMetaRecord(SMetaFile *mfh, uint64_t uid) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
fsync(mfh->fd);
|
// fsync(mfh->fd);
|
||||||
|
|
||||||
mfh->nDel++;
|
mfh->nDel++;
|
||||||
|
|
||||||
|
@ -167,7 +167,7 @@ int32_t tsdbUpdateMetaRecord(SMetaFile *mfh, uint64_t uid, void *cont, int32_t c
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
fsync(mfh->fd);
|
// fsync(mfh->fd);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -46,11 +46,12 @@ typedef struct {
|
||||||
} SKVStore;
|
} SKVStore;
|
||||||
|
|
||||||
int tdCreateKVStore(char *fname);
|
int tdCreateKVStore(char *fname);
|
||||||
int tdDestroyKVStore();
|
int tdDestroyKVStore(char *fname);
|
||||||
SKVStore *tdOpenKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH);
|
SKVStore *tdOpenKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH);
|
||||||
void tdCloseKVStore(SKVStore *pStore);
|
void tdCloseKVStore(SKVStore *pStore);
|
||||||
int tdKVStoreStartCommit(SKVStore *pStore);
|
int tdKVStoreStartCommit(SKVStore *pStore);
|
||||||
int tdUpdateRecordInKVStore(SKVStore *pStore, uint64_t uid, void *cont, int contLen);
|
int tdUpdateKVStoreRecord(SKVStore *pStore, uint64_t uid, void *cont, int contLen);
|
||||||
|
int tdDropKVStoreRecord(SKVStore *pStore, uint64_t uid);
|
||||||
int tdKVStoreEndCommit(SKVStore *pStore);
|
int tdKVStoreEndCommit(SKVStore *pStore);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
|
|
|
@ -34,42 +34,67 @@
|
||||||
#define TD_KVSTORE_SNAP_SUFFIX ".snap"
|
#define TD_KVSTORE_SNAP_SUFFIX ".snap"
|
||||||
#define TD_KVSTORE_NEW_SUFFIX ".new"
|
#define TD_KVSTORE_NEW_SUFFIX ".new"
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
uint64_t uid;
|
||||||
|
int64_t offset;
|
||||||
|
int64_t size;
|
||||||
|
} SKVRecord;
|
||||||
|
|
||||||
static int tdInitKVStoreHeader(int fd, char *fname);
|
static int tdInitKVStoreHeader(int fd, char *fname);
|
||||||
static void * tdEncodeStoreInfo(void *buf, SStoreInfo *pInfo);
|
static void * tdEncodeStoreInfo(void *buf, SStoreInfo *pInfo);
|
||||||
// static void * tdDecodeStoreInfo(void *buf, SStoreInfo *pInfo);
|
static void * tdDecodeStoreInfo(void *buf, SStoreInfo *pInfo);
|
||||||
static SKVStore *tdNewKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH);
|
static SKVStore *tdNewKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH);
|
||||||
static char * tdGetKVStoreSnapshotFname(char *fdata);
|
static char * tdGetKVStoreSnapshotFname(char *fdata);
|
||||||
static char * tdGetKVStoreNewFname(char *fdata);
|
static char * tdGetKVStoreNewFname(char *fdata);
|
||||||
static void tdFreeKVStore(SKVStore *pStore);
|
static void tdFreeKVStore(SKVStore *pStore);
|
||||||
static int tdUpdateKVStoreHeader(int fd, char *fname, SStoreInfo *pInfo);
|
static int tdUpdateKVStoreHeader(int fd, char *fname, SStoreInfo *pInfo);
|
||||||
|
static int tdLoadKVStoreHeader(int fd, char *fname, SStoreInfo *pInfo);
|
||||||
|
static void * tdEncodeKVRecord(void *buf, SKVRecord *pRecord);
|
||||||
|
static void * tdDecodeKVRecord(void *buf, SKVRecord *pRecord);
|
||||||
|
static int tdRestoreKVStore(SKVStore *pStore);
|
||||||
|
|
||||||
int tdCreateKVStore(char *fname) {
|
int tdCreateKVStore(char *fname) {
|
||||||
char *tname = strdup(fname);
|
|
||||||
if (tname == NULL) return TSDB_CODE_COM_OUT_OF_MEMORY;
|
|
||||||
|
|
||||||
int fd = open(fname, O_RDWR | O_CREAT, 0755);
|
int fd = open(fname, O_RDWR | O_CREAT, 0755);
|
||||||
if (fd < 0) {
|
if (fd < 0) {
|
||||||
uError("failed to open file %s since %s", fname, strerror(errno));
|
uError("failed to open file %s since %s", fname, strerror(errno));
|
||||||
return TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
int code = tdInitKVStoreHeader(fd, fname);
|
if (tdInitKVStoreHeader(fd, fname) < 0) {
|
||||||
if (code != TSDB_CODE_SUCCESS) return code;
|
close(fd);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
if (fsync(fd) < 0) {
|
if (fsync(fd) < 0) {
|
||||||
uError("failed to fsync file %s since %s", fname, strerror(errno));
|
uError("failed to fsync file %s since %s", fname, strerror(errno));
|
||||||
return TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
close(fd);
|
||||||
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (close(fd) < 0) {
|
if (close(fd) < 0) {
|
||||||
uError("failed to close file %s since %s", fname, strerror(errno));
|
uError("failed to close file %s since %s", fname, strerror(errno));
|
||||||
return TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int tdDestroyKVStore(char *fname) {
|
||||||
|
if (remove(fname) < 0) {
|
||||||
|
uError("failed to remove file %s since %s", fname, strerror(errno));
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
SKVStore *tdOpenKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH) {
|
SKVStore *tdOpenKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH) {
|
||||||
|
SStoreInfo info = {0};
|
||||||
|
|
||||||
SKVStore *pStore = tdNewKVStore(fname, iFunc, aFunc, appH);
|
SKVStore *pStore = tdNewKVStore(fname, iFunc, aFunc, appH);
|
||||||
if (pStore == NULL) return NULL;
|
if (pStore == NULL) return NULL;
|
||||||
|
|
||||||
|
@ -89,14 +114,44 @@ SKVStore *tdOpenKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: rewind the file
|
if (tdLoadKVStoreHeader(pStore->sfd, pStore->fsnap, &info) < 0) goto _err;
|
||||||
|
|
||||||
|
if (ftruncate(pStore->fd, info.size) < 0) {
|
||||||
|
uError("failed to truncate %s to " PRId64 " size since %s", pStore->fname, info.size, strerror(errno));
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tdUpdateKVStoreHeader(pStore->fd, pStore->fname, &info) < 0) goto _err;
|
||||||
|
|
||||||
close(pStore->sfd);
|
close(pStore->sfd);
|
||||||
pStore->sfd = -1;
|
pStore->sfd = -1;
|
||||||
remove(pStore->fsnap);
|
remove(pStore->fsnap);
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: Recover from the file
|
if (tdLoadKVStoreHeader(pStore->fd, pStore->fname, &info) < 0) goto _err;
|
||||||
|
|
||||||
|
struct stat tfstat;
|
||||||
|
if (fstat(pStore->fd, &tfstat) < 0) {
|
||||||
|
uError("failed to fstat file %s since %s", pStore->fname, strerror(errno));
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
ASSERT(info.size == tfstat.st_size);
|
||||||
|
|
||||||
|
if (lseek(pStore->fd, TD_KVSTORE_HEADER_SIZE, SEEK_SET) < 0) {
|
||||||
|
uError("failed to lseek file %s since %s", pStore->fname, strerror(errno));
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
pStore->info.size += TD_KVSTORE_HEADER_SIZE;
|
||||||
|
|
||||||
|
if (tdRestoreKVStore(pStore) < 0) goto _err;
|
||||||
|
|
||||||
|
close(pStore->fd);
|
||||||
|
pStore->fd = -1;
|
||||||
|
|
||||||
return pStore;
|
return pStore;
|
||||||
|
|
||||||
|
@ -113,7 +168,11 @@ _err:
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void tdCloseKVStore(SKVStore *pStore) { tdFreeKVStore(pStore); }
|
||||||
|
|
||||||
int tdKVStoreStartCommit(SKVStore *pStore) {
|
int tdKVStoreStartCommit(SKVStore *pStore) {
|
||||||
|
ASSERT(pStore->fd < 0);
|
||||||
|
|
||||||
pStore->fd = open(pStore->fname, O_RDWR);
|
pStore->fd = open(pStore->fname, O_RDWR);
|
||||||
if (pStore->fd < 0) {
|
if (pStore->fd < 0) {
|
||||||
uError("failed to open file %s since %s", pStore->fname, strerror(errno));
|
uError("failed to open file %s since %s", pStore->fname, strerror(errno));
|
||||||
|
@ -147,6 +206,14 @@ int tdKVStoreStartCommit(SKVStore *pStore) {
|
||||||
}
|
}
|
||||||
pStore->sfd = -1;
|
pStore->sfd = -1;
|
||||||
|
|
||||||
|
if (lseek(pStore->fd, 0, SEEK_END) < 0) {
|
||||||
|
uError("failed to lseek file %s since %s", pStore->fname, strerror(errno));
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
ASSERT(pStore->info.size == lseek(pStore->fd, 0, SEEK_CUR));
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
_err:
|
_err:
|
||||||
|
@ -162,11 +229,50 @@ _err:
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int tdUpdateKVStoreRecord(SKVStore *pStore, uint64_t uid, void *cont, int contLen) {
|
||||||
|
SKVRecord *pRecord = taosHashGet(pStore->map, (void *)&uid, sizeof(uid));
|
||||||
|
if (pRecord != NULL) {
|
||||||
|
pStore->info.tombSize += (pRecord->size + sizeof(SKVRecord));
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int tdDropKVStoreRecord(SKVStore *pStore, uint64_t uid) {
|
||||||
|
SKVRecord rInfo = {0};
|
||||||
|
char buf[128] = "\0";
|
||||||
|
|
||||||
|
SKVRecord *pRecord = taosHashGet(pStore->map, &uid, sizeof(uid));
|
||||||
|
if (pRecord == NULL) {
|
||||||
|
uError("failed to drop KV store record with key " PRIu64 " since not find", uid);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
rInfo.offset = -pRecord->offset;
|
||||||
|
rInfo.uid = pRecord->uid;
|
||||||
|
rInfo.size = pRecord->size;
|
||||||
|
|
||||||
|
void *pBuf = tdEncodeKVRecord(buf, &rInfo);
|
||||||
|
|
||||||
|
if (twrite(pStore->fd, buf, POINTER_DISTANCE(pBuf, buf)) < POINTER_DISTANCE(pBuf, buf)) {
|
||||||
|
uError("failed to write %d bytes to file %s since %s", POINTER_DISTANCE(pBuf, buf), pStore->fname, strerror(errno));
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
pStore->info.size += POINTER_DISTANCE(pBuf, buf);
|
||||||
|
pStore->info.nDels++;
|
||||||
|
pStore->info.nRecords--;
|
||||||
|
pStore->info.tombSize += (rInfo.size + sizeof(SKVRecord) * 2);
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int tdKVStoreEndCommit(SKVStore *pStore) {
|
int tdKVStoreEndCommit(SKVStore *pStore) {
|
||||||
ASSERT(pStore->fd > 0);
|
ASSERT(pStore->fd > 0);
|
||||||
|
|
||||||
terrno = tdUpdateKVStoreHeader(pStore->fd, pStore->fname, &(pStore->info));
|
if (tdUpdateKVStoreHeader(pStore->fd, pStore->fname, &(pStore->info)) < 0) return -1;
|
||||||
if (terrno != TSDB_CODE_SUCCESS) return -1;
|
|
||||||
|
|
||||||
if (fsync(pStore->fd) < 0) {
|
if (fsync(pStore->fd) < 0) {
|
||||||
uError("failed to fsync file %s since %s", pStore->fname, strerror(errno));
|
uError("failed to fsync file %s since %s", pStore->fname, strerror(errno));
|
||||||
|
@ -179,27 +285,56 @@ int tdKVStoreEndCommit(SKVStore *pStore) {
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
pStore->fd = -1;
|
||||||
|
|
||||||
remove(pStore->fsnap);
|
remove(pStore->fsnap);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int tdLoadKVStoreHeader(int fd, char *fname, SStoreInfo *pInfo) {
|
||||||
|
char buf[TD_KVSTORE_HEADER_SIZE] = "\0";
|
||||||
|
|
||||||
|
if (lseek(fd, 0, SEEK_SET) < 0) {
|
||||||
|
uError("failed to lseek file %s since %s", fname, strerror(errno));
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tread(fd, buf, TD_KVSTORE_HEADER_SIZE) < TD_KVSTORE_HEADER_SIZE) {
|
||||||
|
uError("failed to read %d bytes from file %s since %s", TD_KVSTORE_HEADER_SIZE, fname, strerror(errno));
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!taosCheckChecksumWhole((uint8_t *)buf, TD_KVSTORE_HEADER_SIZE)) {
|
||||||
|
uError("file %s is broken", fname);
|
||||||
|
terrno = TSDB_CODE_COM_FILE_CORRUPTED;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
tdDecodeStoreInfo(buf, pInfo);
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
static int tdUpdateKVStoreHeader(int fd, char *fname, SStoreInfo *pInfo) {
|
static int tdUpdateKVStoreHeader(int fd, char *fname, SStoreInfo *pInfo) {
|
||||||
char buf[TD_KVSTORE_HEADER_SIZE] = "\0";
|
char buf[TD_KVSTORE_HEADER_SIZE] = "\0";
|
||||||
|
|
||||||
if (lseek(fd, 0, SEEK_SET) < 0) {
|
if (lseek(fd, 0, SEEK_SET) < 0) {
|
||||||
uError("failed to lseek file %s since %s", fname, strerror(errno));
|
uError("failed to lseek file %s since %s", fname, strerror(errno));
|
||||||
return TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
tdEncodeStoreInfo(buf, pInfo);
|
tdEncodeStoreInfo(buf, pInfo);
|
||||||
taosCalcChecksumAppend(0, (uint8_t *)buf, TD_KVSTORE_HEADER_SIZE);
|
taosCalcChecksumAppend(0, (uint8_t *)buf, TD_KVSTORE_HEADER_SIZE);
|
||||||
if (twrite(fd, buf, TD_KVSTORE_HEADER_SIZE) < TD_KVSTORE_HEADER_SIZE) {
|
if (twrite(fd, buf, TD_KVSTORE_HEADER_SIZE) < TD_KVSTORE_HEADER_SIZE) {
|
||||||
uError("failed to write file %s %d bytes since %s", fname, TD_KVSTORE_HEADER_SIZE, strerror(errno));
|
uError("failed to write %d bytes to file %s since %s", TD_KVSTORE_HEADER_SIZE, fname, strerror(errno));
|
||||||
return TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int tdInitKVStoreHeader(int fd, char *fname) {
|
static int tdInitKVStoreHeader(int fd, char *fname) {
|
||||||
|
@ -217,21 +352,24 @@ static void *tdEncodeStoreInfo(void *buf, SStoreInfo *pInfo) {
|
||||||
return buf;
|
return buf;
|
||||||
}
|
}
|
||||||
|
|
||||||
// static void *tdDecodeStoreInfo(void *buf, SStoreInfo *pInfo) {
|
static void *tdDecodeStoreInfo(void *buf, SStoreInfo *pInfo) {
|
||||||
// buf = taosDecodeVariantI64(buf, &(pInfo->size));
|
buf = taosDecodeVariantI64(buf, &(pInfo->size));
|
||||||
// buf = taosDecodeVariantI64(buf, &(pInfo->tombSize));
|
buf = taosDecodeVariantI64(buf, &(pInfo->tombSize));
|
||||||
// buf = taosDecodeVariantI64(buf, &(pInfo->nRecords));
|
buf = taosDecodeVariantI64(buf, &(pInfo->nRecords));
|
||||||
// buf = taosDecodeVariantI64(buf, &(pInfo->nDels));
|
buf = taosDecodeVariantI64(buf, &(pInfo->nDels));
|
||||||
|
|
||||||
// return buf;
|
return buf;
|
||||||
// }
|
}
|
||||||
|
|
||||||
static SKVStore *tdNewKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH) {
|
static SKVStore *tdNewKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH) {
|
||||||
SKVStore *pStore = (SKVStore *)malloc(sizeof(SKVStore));
|
SKVStore *pStore = (SKVStore *)malloc(sizeof(SKVStore));
|
||||||
if (pStore == NULL) goto _err;
|
if (pStore == NULL) goto _err;
|
||||||
|
|
||||||
pStore->fname = strdup(fname);
|
pStore->fname = strdup(fname);
|
||||||
if (pStore->map == NULL) goto _err;
|
if (pStore->map == NULL) {
|
||||||
|
terrno = TSDB_CODE_COM_OUT_OF_MEMORY;
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
pStore->fsnap = tdGetKVStoreSnapshotFname(fname);
|
pStore->fsnap = tdGetKVStoreSnapshotFname(fname);
|
||||||
if (pStore->fsnap == NULL) goto _err;
|
if (pStore->fsnap == NULL) goto _err;
|
||||||
|
@ -254,7 +392,6 @@ static SKVStore *tdNewKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void
|
||||||
return pStore;
|
return pStore;
|
||||||
|
|
||||||
_err:
|
_err:
|
||||||
terrno = TSDB_CODE_COM_OUT_OF_MEMORY;
|
|
||||||
tdFreeKVStore(pStore);
|
tdFreeKVStore(pStore);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -290,3 +427,115 @@ static char *tdGetKVStoreNewFname(char *fdata) {
|
||||||
sprintf(fname, "%s%s", fdata, TD_KVSTORE_NEW_SUFFIX);
|
sprintf(fname, "%s%s", fdata, TD_KVSTORE_NEW_SUFFIX);
|
||||||
return fname;
|
return fname;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void *tdEncodeKVRecord(void *buf, SKVRecord *pRecord) {
|
||||||
|
buf = taosEncodeFixedU64(buf, pRecord->uid);
|
||||||
|
buf = taosEncodeFixedI64(buf, pRecord->offset);
|
||||||
|
buf = taosEncodeFixedI64(buf, pRecord->size);
|
||||||
|
|
||||||
|
return buf;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void *tdDecodeKVRecord(void *buf, SKVRecord *pRecord) {
|
||||||
|
buf = taosDecodeFixedU64(buf, &(pRecord->uid));
|
||||||
|
buf = taosDecodeFixedI64(buf, &(pRecord->offset));
|
||||||
|
buf = taosDecodeFixedI64(buf, &(pRecord->size));
|
||||||
|
|
||||||
|
return buf;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int tdRestoreKVStore(SKVStore *pStore) {
|
||||||
|
char tbuf[128] = "\0";
|
||||||
|
void * buf = NULL;
|
||||||
|
int maxBufSize = 0;
|
||||||
|
SKVRecord rInfo = {0};
|
||||||
|
|
||||||
|
ASSERT(TD_KVSTORE_HEADER_SIZE == lseek(pStore->fd, 0, SEEK_CUR));
|
||||||
|
|
||||||
|
while (true) {
|
||||||
|
ssize_t tsize = tread(pStore->fd, tbuf, sizeof(SKVRecord));
|
||||||
|
if (tsize == 0) break;
|
||||||
|
if (tsize < sizeof(SKVRecord)) {
|
||||||
|
uError("failed to read %d bytes from file %s since %s", sizeof(SKVRecord), pStore->fname, strerror(errno));
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
char *pBuf = tdDecodeKVRecord(tbuf, &rInfo);
|
||||||
|
ASSERT(POINTER_DISTANCE(pBuf, tbuf) == sizeof(SKVRecord));
|
||||||
|
ASSERT(rInfo.offset > 0 ? pStore->info.size == rInfo.offset : true);
|
||||||
|
|
||||||
|
if (rInfo.offset < 0) {
|
||||||
|
taosHashRemove(pStore->map, (void *)(&rInfo.uid), sizeof(rInfo.uid));
|
||||||
|
pStore->info.size += sizeof(SKVRecord);
|
||||||
|
pStore->info.nRecords--;
|
||||||
|
pStore->info.nDels++;
|
||||||
|
pStore->info.tombSize += (rInfo.size + sizeof(SKVRecord) + sizeof(SKVRecord));
|
||||||
|
} else {
|
||||||
|
// TODO: add statistics
|
||||||
|
ASSERT(rInfo.offset > 0 && rInfo.size > 0);
|
||||||
|
if (taosHashPut(pStore->map, (void *)(&rInfo.uid), sizeof(rInfo.uid), &rInfo, sizeof(rInfo)) < 0) {
|
||||||
|
uError("failed to put record in KV store %s", pStore->fname);
|
||||||
|
terrno = TSDB_CODE_COM_OUT_OF_MEMORY;
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
maxBufSize = MAX(maxBufSize, rInfo.size);
|
||||||
|
|
||||||
|
if (lseek(pStore->fd, rInfo.size, SEEK_CUR) < 0) {
|
||||||
|
uError("failed to lseek file %s since %s", pStore->fname, strerror(errno));
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
buf = malloc(maxBufSize);
|
||||||
|
if (buf == NULL) {
|
||||||
|
uError("failed to allocate %d bytes in KV store %s", maxBufSize, pStore->fname);
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
SHashMutableIterator *pIter = taosHashCreateIter(pStore->map);
|
||||||
|
if (pIter == NULL) {
|
||||||
|
uError("failed to create hash iter while opening KV store %s", pStore->fname);
|
||||||
|
terrno = TSDB_CODE_COM_OUT_OF_MEMORY;
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
while (taosHashIterNext(pIter)) {
|
||||||
|
SKVRecord *pRecord = taosHashIterGet(pIter);
|
||||||
|
|
||||||
|
if (lseek(pStore->fd, pRecord->offset, SEEK_SET) < 0) {
|
||||||
|
uError("failed to lseek file %s since %s", pStore->fname, strerror(errno));
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tread(pStore->fd, buf, pRecord->size) < pRecord->size) {
|
||||||
|
uError("failed to read %d bytes from file %s since %s", pRecord->size, pStore->fname, strerror(errno));
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!taosCheckChecksumWhole((uint8_t *)buf, pRecord->size)) {
|
||||||
|
uError("file %s has checksum error, offset " PRId64 " size %d", pStore->fname, pRecord->offset, pRecord->size);
|
||||||
|
terrno = TSDB_CODE_COM_FILE_CORRUPTED;
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pStore->iFunc) (*pStore->iFunc)(pStore->appH, buf, pRecord->size);
|
||||||
|
}
|
||||||
|
|
||||||
|
taosHashDestroyIter(pIter);
|
||||||
|
|
||||||
|
if (pStore->aFunc) (*pStore->aFunc)(pStore->appH);
|
||||||
|
|
||||||
|
tfree(buf);
|
||||||
|
return 0;
|
||||||
|
|
||||||
|
_err:
|
||||||
|
tfree(buf);
|
||||||
|
return -1;
|
||||||
|
}
|
Loading…
Reference in New Issue