add more kvstore funcion

This commit is contained in:
Hongze Cheng 2020-06-10 12:06:29 +08:00
parent e0e577d3df
commit 969c9cd0ca
2 changed files with 144 additions and 40 deletions

View File

@ -46,11 +46,12 @@ typedef struct {
} SKVStore; } SKVStore;
int tdCreateKVStore(char *fname); int tdCreateKVStore(char *fname);
int tdDestroyKVStore(); int tdDestroyKVStore(char *fname);
SKVStore *tdOpenKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH); SKVStore *tdOpenKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH);
void tdCloseKVStore(SKVStore *pStore); void tdCloseKVStore(SKVStore *pStore);
int tdKVStoreStartCommit(SKVStore *pStore); int tdKVStoreStartCommit(SKVStore *pStore);
int tdUpdateRecordInKVStore(SKVStore *pStore, uint64_t uid, void *cont, int contLen); int tdUpdateKVStoreRecord(SKVStore *pStore, uint64_t uid, void *cont, int contLen);
int tdDropKVStoreRecord(SKVStore *pStore, uint64_t uid);
int tdKVStoreEndCommit(SKVStore *pStore); int tdKVStoreEndCommit(SKVStore *pStore);
#ifdef __cplusplus #ifdef __cplusplus

View File

@ -49,34 +49,47 @@ static char * tdGetKVStoreNewFname(char *fdata);
static void tdFreeKVStore(SKVStore *pStore); static void tdFreeKVStore(SKVStore *pStore);
static int tdUpdateKVStoreHeader(int fd, char *fname, SStoreInfo *pInfo); static int tdUpdateKVStoreHeader(int fd, char *fname, SStoreInfo *pInfo);
static int tdLoadKVStoreHeader(int fd, char *fname, SStoreInfo *pInfo); static int tdLoadKVStoreHeader(int fd, char *fname, SStoreInfo *pInfo);
// static void * tdEncodeKVRecord(void *buf, SKVRecord *pRecord); static void * tdEncodeKVRecord(void *buf, SKVRecord *pRecord);
static void * tdDecodeKVRecord(void *buf, SKVRecord *pRecord); static void * tdDecodeKVRecord(void *buf, SKVRecord *pRecord);
static int tdRestoreKVStore(SKVStore *pStore); static int tdRestoreKVStore(SKVStore *pStore);
int tdCreateKVStore(char *fname) { int tdCreateKVStore(char *fname) {
char *tname = strdup(fname);
if (tname == NULL) return TSDB_CODE_COM_OUT_OF_MEMORY;
int fd = open(fname, O_RDWR | O_CREAT, 0755); int fd = open(fname, O_RDWR | O_CREAT, 0755);
if (fd < 0) { if (fd < 0) {
uError("failed to open file %s since %s", fname, strerror(errno)); uError("failed to open file %s since %s", fname, strerror(errno));
return TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
} }
int code = tdInitKVStoreHeader(fd, fname); if (tdInitKVStoreHeader(fd, fname) < 0) {
if (code != TSDB_CODE_SUCCESS) return code; close(fd);
return -1;
}
if (fsync(fd) < 0) { if (fsync(fd) < 0) {
uError("failed to fsync file %s since %s", fname, strerror(errno)); uError("failed to fsync file %s since %s", fname, strerror(errno));
return TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
close(fd);
return -1;
} }
if (close(fd) < 0) { if (close(fd) < 0) {
uError("failed to close file %s since %s", fname, strerror(errno)); uError("failed to close file %s since %s", fname, strerror(errno));
return TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
} }
return TSDB_CODE_SUCCESS; return 0;
}
int tdDestroyKVStore(char *fname) {
if (remove(fname) < 0) {
uError("failed to remove file %s since %s", fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
return 0;
} }
SKVStore *tdOpenKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH) { SKVStore *tdOpenKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH) {
@ -101,23 +114,31 @@ SKVStore *tdOpenKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH
goto _err; goto _err;
} }
// TODO: rewind the file
if (tdLoadKVStoreHeader(pStore->sfd, pStore->fsnap, &info) < 0) goto _err; if (tdLoadKVStoreHeader(pStore->sfd, pStore->fsnap, &info) < 0) goto _err;
if (ftruncate(pStore->fd, info.size) < 0) { if (ftruncate(pStore->fd, info.size) < 0) {
uError("failed to truncate %s since %s", pStore->fname, strerror(errno)); uError("failed to truncate %s to " PRId64 " size since %s", pStore->fname, info.size, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
goto _err; goto _err;
} }
if (tdUpdateKVStoreHeader(pStore->fd, pStore->fname, &info) < 0) goto _err;
close(pStore->sfd); close(pStore->sfd);
pStore->sfd = -1; pStore->sfd = -1;
remove(pStore->fsnap); remove(pStore->fsnap);
} }
// TODO: Recover from the file if (tdLoadKVStoreHeader(pStore->fd, pStore->fname, &info) < 0) goto _err;
terrno = tdUpdateKVStoreHeader(pStore->fd, pStore->fname, &info);
if (terrno != TSDB_CODE_SUCCESS) goto _err; struct stat tfstat;
if (fstat(pStore->fd, &tfstat) < 0) {
uError("failed to fstat file %s since %s", pStore->fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
ASSERT(info.size == tfstat.st_size);
if (lseek(pStore->fd, TD_KVSTORE_HEADER_SIZE, SEEK_SET) < 0) { if (lseek(pStore->fd, TD_KVSTORE_HEADER_SIZE, SEEK_SET) < 0) {
uError("failed to lseek file %s since %s", pStore->fname, strerror(errno)); uError("failed to lseek file %s since %s", pStore->fname, strerror(errno));
@ -125,8 +146,9 @@ SKVStore *tdOpenKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH
goto _err; goto _err;
} }
terrno = tdRestoreKVStore(pStore); pStore->info.size += TD_KVSTORE_HEADER_SIZE;
if (terrno != TSDB_CODE_SUCCESS) goto _err;
if (tdRestoreKVStore(pStore) < 0) goto _err;
close(pStore->fd); close(pStore->fd);
pStore->fd = -1; pStore->fd = -1;
@ -146,7 +168,11 @@ _err:
return NULL; return NULL;
} }
void tdCloseKVStore(SKVStore *pStore) { tdFreeKVStore(pStore); }
int tdKVStoreStartCommit(SKVStore *pStore) { int tdKVStoreStartCommit(SKVStore *pStore) {
ASSERT(pStore->fd < 0);
pStore->fd = open(pStore->fname, O_RDWR); pStore->fd = open(pStore->fname, O_RDWR);
if (pStore->fd < 0) { if (pStore->fd < 0) {
uError("failed to open file %s since %s", pStore->fname, strerror(errno)); uError("failed to open file %s since %s", pStore->fname, strerror(errno));
@ -180,6 +206,14 @@ int tdKVStoreStartCommit(SKVStore *pStore) {
} }
pStore->sfd = -1; pStore->sfd = -1;
if (lseek(pStore->fd, 0, SEEK_END) < 0) {
uError("failed to lseek file %s since %s", pStore->fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
ASSERT(pStore->info.size == lseek(pStore->fd, 0, SEEK_CUR));
return 0; return 0;
_err: _err:
@ -195,11 +229,50 @@ _err:
return -1; return -1;
} }
int tdUpdateKVStoreRecord(SKVStore *pStore, uint64_t uid, void *cont, int contLen) {
SKVRecord *pRecord = taosHashGet(pStore->map, (void *)&uid, sizeof(uid));
if (pRecord != NULL) {
pStore->info.tombSize += (pRecord->size + sizeof(SKVRecord));
}
// TODO
return 0;
}
int tdDropKVStoreRecord(SKVStore *pStore, uint64_t uid) {
SKVRecord rInfo = {0};
char buf[128] = "\0";
SKVRecord *pRecord = taosHashGet(pStore->map, &uid, sizeof(uid));
if (pRecord == NULL) {
uError("failed to drop KV store record with key " PRIu64 " since not find", uid);
return -1;
}
rInfo.offset = -pRecord->offset;
rInfo.uid = pRecord->uid;
rInfo.size = pRecord->size;
void *pBuf = tdEncodeKVRecord(buf, &rInfo);
if (twrite(pStore->fd, buf, POINTER_DISTANCE(pBuf, buf)) < POINTER_DISTANCE(pBuf, buf)) {
uError("failed to write %d bytes to file %s since %s", POINTER_DISTANCE(pBuf, buf), pStore->fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
pStore->info.size += POINTER_DISTANCE(pBuf, buf);
pStore->info.nDels++;
pStore->info.nRecords--;
pStore->info.tombSize += (rInfo.size + sizeof(SKVRecord) * 2);
return 0;
}
int tdKVStoreEndCommit(SKVStore *pStore) { int tdKVStoreEndCommit(SKVStore *pStore) {
ASSERT(pStore->fd > 0); ASSERT(pStore->fd > 0);
terrno = tdUpdateKVStoreHeader(pStore->fd, pStore->fname, &(pStore->info)); if (tdUpdateKVStoreHeader(pStore->fd, pStore->fname, &(pStore->info)) < 0) return -1;
if (terrno != TSDB_CODE_SUCCESS) return -1;
if (fsync(pStore->fd) < 0) { if (fsync(pStore->fd) < 0) {
uError("failed to fsync file %s since %s", pStore->fname, strerror(errno)); uError("failed to fsync file %s since %s", pStore->fname, strerror(errno));
@ -212,6 +285,7 @@ int tdKVStoreEndCommit(SKVStore *pStore) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
} }
pStore->fd = -1;
remove(pStore->fsnap); remove(pStore->fsnap);
return 0; return 0;
@ -220,14 +294,21 @@ int tdKVStoreEndCommit(SKVStore *pStore) {
static int tdLoadKVStoreHeader(int fd, char *fname, SStoreInfo *pInfo) { static int tdLoadKVStoreHeader(int fd, char *fname, SStoreInfo *pInfo) {
char buf[TD_KVSTORE_HEADER_SIZE] = "\0"; char buf[TD_KVSTORE_HEADER_SIZE] = "\0";
if (lseek(fd, 0, SEEK_SET) < 0) {
uError("failed to lseek file %s since %s", fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
if (tread(fd, buf, TD_KVSTORE_HEADER_SIZE) < TD_KVSTORE_HEADER_SIZE) { if (tread(fd, buf, TD_KVSTORE_HEADER_SIZE) < TD_KVSTORE_HEADER_SIZE) {
uError("failed to read file %s since %s", fname, strerror(errno)); uError("failed to read %d bytes from file %s since %s", TD_KVSTORE_HEADER_SIZE, fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
} }
if (!taosCheckChecksumWhole((uint8_t *)buf, TD_KVSTORE_HEADER_SIZE)) { if (!taosCheckChecksumWhole((uint8_t *)buf, TD_KVSTORE_HEADER_SIZE)) {
uError("file %s is broken", fname); uError("file %s is broken", fname);
terrno = TSDB_CODE_COM_FILE_CORRUPTED;
return -1; return -1;
} }
@ -241,17 +322,19 @@ static int tdUpdateKVStoreHeader(int fd, char *fname, SStoreInfo *pInfo) {
if (lseek(fd, 0, SEEK_SET) < 0) { if (lseek(fd, 0, SEEK_SET) < 0) {
uError("failed to lseek file %s since %s", fname, strerror(errno)); uError("failed to lseek file %s since %s", fname, strerror(errno));
return TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
} }
tdEncodeStoreInfo(buf, pInfo); tdEncodeStoreInfo(buf, pInfo);
taosCalcChecksumAppend(0, (uint8_t *)buf, TD_KVSTORE_HEADER_SIZE); taosCalcChecksumAppend(0, (uint8_t *)buf, TD_KVSTORE_HEADER_SIZE);
if (twrite(fd, buf, TD_KVSTORE_HEADER_SIZE) < TD_KVSTORE_HEADER_SIZE) { if (twrite(fd, buf, TD_KVSTORE_HEADER_SIZE) < TD_KVSTORE_HEADER_SIZE) {
uError("failed to write file %s %d bytes since %s", fname, TD_KVSTORE_HEADER_SIZE, strerror(errno)); uError("failed to write %d bytes to file %s since %s", TD_KVSTORE_HEADER_SIZE, fname, strerror(errno));
return TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
} }
return TSDB_CODE_SUCCESS; return 0;
} }
static int tdInitKVStoreHeader(int fd, char *fname) { static int tdInitKVStoreHeader(int fd, char *fname) {
@ -283,7 +366,10 @@ static SKVStore *tdNewKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void
if (pStore == NULL) goto _err; if (pStore == NULL) goto _err;
pStore->fname = strdup(fname); pStore->fname = strdup(fname);
if (pStore->map == NULL) goto _err; if (pStore->map == NULL) {
terrno = TSDB_CODE_COM_OUT_OF_MEMORY;
goto _err;
}
pStore->fsnap = tdGetKVStoreSnapshotFname(fname); pStore->fsnap = tdGetKVStoreSnapshotFname(fname);
if (pStore->fsnap == NULL) goto _err; if (pStore->fsnap == NULL) goto _err;
@ -306,7 +392,6 @@ static SKVStore *tdNewKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void
return pStore; return pStore;
_err: _err:
terrno = TSDB_CODE_COM_OUT_OF_MEMORY;
tdFreeKVStore(pStore); tdFreeKVStore(pStore);
return NULL; return NULL;
} }
@ -343,13 +428,13 @@ static char *tdGetKVStoreNewFname(char *fdata) {
return fname; return fname;
} }
// static void *tdEncodeKVRecord(void *buf, SKVRecord *pRecord) { static void *tdEncodeKVRecord(void *buf, SKVRecord *pRecord) {
// buf = taosEncodeFixedU64(buf, pRecord->uid); buf = taosEncodeFixedU64(buf, pRecord->uid);
// buf = taosEncodeFixedI64(buf, pRecord->offset); buf = taosEncodeFixedI64(buf, pRecord->offset);
// buf = taosEncodeFixedI32(buf, pRecord->size); buf = taosEncodeFixedI32(buf, pRecord->size);
// return buf; return buf;
// } }
static void *tdDecodeKVRecord(void *buf, SKVRecord *pRecord) { static void *tdDecodeKVRecord(void *buf, SKVRecord *pRecord) {
buf = taosDecodeFixedU64(buf, &(pRecord->uid)); buf = taosDecodeFixedU64(buf, &(pRecord->uid));
@ -361,9 +446,12 @@ static void *tdDecodeKVRecord(void *buf, SKVRecord *pRecord) {
static int tdRestoreKVStore(SKVStore *pStore) { static int tdRestoreKVStore(SKVStore *pStore) {
char tbuf[128] = "\0"; char tbuf[128] = "\0";
char * buf = NULL; void * buf = NULL;
int maxBufSize = 0;
SKVRecord rInfo = {0}; SKVRecord rInfo = {0};
ASSERT(TD_KVSTORE_HEADER_SIZE == lseek(pStore->fd, 0, SEEK_CUR));
while (true) { while (true) {
ssize_t tsize = tread(pStore->fd, tbuf, sizeof(SKVRecord)); ssize_t tsize = tread(pStore->fd, tbuf, sizeof(SKVRecord));
if (tsize == 0) break; if (tsize == 0) break;
@ -375,17 +463,25 @@ static int tdRestoreKVStore(SKVStore *pStore) {
char *pBuf = tdDecodeKVRecord(tbuf, &rInfo); char *pBuf = tdDecodeKVRecord(tbuf, &rInfo);
ASSERT(POINTER_DISTANCE(pBuf, tbuf) == sizeof(SKVRecord)); ASSERT(POINTER_DISTANCE(pBuf, tbuf) == sizeof(SKVRecord));
ASSERT(rInfo.offset > 0 ? pStore->info.size == rInfo.offset : true);
if (rInfo.offset < 0) { if (rInfo.offset < 0) {
taosHashRemove(pStore->map, (void *)(&rInfo.uid), sizeof(rInfo.uid)); taosHashRemove(pStore->map, (void *)(&rInfo.uid), sizeof(rInfo.uid));
pStore->info.size += sizeof(SKVRecord);
pStore->info.nRecords--;
pStore->info.nDels++;
pStore->info.tombSize += (rInfo.size + sizeof(SKVRecord) + sizeof(SKVRecord));
} else { } else {
ASSERT(rInfo.offset > 0); // TODO: add statistics
ASSERT(rInfo.offset > 0 && rInfo.size > 0);
if (taosHashPut(pStore->map, (void *)(&rInfo.uid), sizeof(rInfo.uid), &rInfo, sizeof(rInfo)) < 0) { if (taosHashPut(pStore->map, (void *)(&rInfo.uid), sizeof(rInfo.uid), &rInfo, sizeof(rInfo)) < 0) {
uError("failed to put record in KV store %s", pStore->fname); uError("failed to put record in KV store %s", pStore->fname);
terrno = TSDB_CODE_COM_OUT_OF_MEMORY; terrno = TSDB_CODE_COM_OUT_OF_MEMORY;
goto _err; goto _err;
} }
maxBufSize = MAX(maxBufSize, rInfo.size);
if (lseek(pStore->fd, rInfo.size, SEEK_CUR) < 0) { if (lseek(pStore->fd, rInfo.size, SEEK_CUR) < 0) {
uError("failed to lseek file %s since %s", pStore->fname, strerror(errno)); uError("failed to lseek file %s since %s", pStore->fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
@ -394,6 +490,13 @@ static int tdRestoreKVStore(SKVStore *pStore) {
} }
} }
buf = malloc(maxBufSize);
if (buf == NULL) {
uError("failed to allocate %d bytes in KV store %s", maxBufSize, pStore->fname);
terrno = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
SHashMutableIterator *pIter = taosHashCreateIter(pStore->map); SHashMutableIterator *pIter = taosHashCreateIter(pStore->map);
if (pIter == NULL) { if (pIter == NULL) {
uError("failed to create hash iter while opening KV store %s", pStore->fname); uError("failed to create hash iter while opening KV store %s", pStore->fname);
@ -418,11 +521,11 @@ static int tdRestoreKVStore(SKVStore *pStore) {
if (!taosCheckChecksumWhole((uint8_t *)buf, pRecord->size)) { if (!taosCheckChecksumWhole((uint8_t *)buf, pRecord->size)) {
uError("file %s has checksum error, offset " PRId64 " size %d", pStore->fname, pRecord->offset, pRecord->size); uError("file %s has checksum error, offset " PRId64 " size %d", pStore->fname, pRecord->offset, pRecord->size);
terrno = TSDB_CODE_TDB_FILE_CORRUPTED; terrno = TSDB_CODE_COM_FILE_CORRUPTED;
goto _err; goto _err;
} }
if (pStore->iFunc) (*pStore->iFunc)(pStore->appH,buf, pRecord->size); if (pStore->iFunc) (*pStore->iFunc)(pStore->appH, buf, pRecord->size);
} }
taosHashDestroyIter(pIter); taosHashDestroyIter(pIter);
@ -430,9 +533,9 @@ static int tdRestoreKVStore(SKVStore *pStore) {
if (pStore->aFunc) (*pStore->aFunc)(pStore->appH); if (pStore->aFunc) (*pStore->aFunc)(pStore->appH);
tfree(buf); tfree(buf);
return TSDB_CODE_SUCCESS; return 0;
_err: _err:
tfree(buf); tfree(buf);
return terrno; return -1;
} }