diff --git a/src/util/inc/tkvstore.h b/src/util/inc/tkvstore.h index 724c94e21d..a57d0e95cf 100644 --- a/src/util/inc/tkvstore.h +++ b/src/util/inc/tkvstore.h @@ -46,11 +46,12 @@ typedef struct { } SKVStore; int tdCreateKVStore(char *fname); -int tdDestroyKVStore(); +int tdDestroyKVStore(char *fname); SKVStore *tdOpenKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH); void tdCloseKVStore(SKVStore *pStore); int tdKVStoreStartCommit(SKVStore *pStore); -int tdUpdateRecordInKVStore(SKVStore *pStore, uint64_t uid, void *cont, int contLen); +int tdUpdateKVStoreRecord(SKVStore *pStore, uint64_t uid, void *cont, int contLen); +int tdDropKVStoreRecord(SKVStore *pStore, uint64_t uid); int tdKVStoreEndCommit(SKVStore *pStore); #ifdef __cplusplus diff --git a/src/util/src/tkvstore.c b/src/util/src/tkvstore.c index f6ccdc9333..0c6c21eb4f 100644 --- a/src/util/src/tkvstore.c +++ b/src/util/src/tkvstore.c @@ -49,34 +49,47 @@ static char * tdGetKVStoreNewFname(char *fdata); static void tdFreeKVStore(SKVStore *pStore); static int tdUpdateKVStoreHeader(int fd, char *fname, SStoreInfo *pInfo); static int tdLoadKVStoreHeader(int fd, char *fname, SStoreInfo *pInfo); -// static void * tdEncodeKVRecord(void *buf, SKVRecord *pRecord); +static void * tdEncodeKVRecord(void *buf, SKVRecord *pRecord); static void * tdDecodeKVRecord(void *buf, SKVRecord *pRecord); static int tdRestoreKVStore(SKVStore *pStore); int tdCreateKVStore(char *fname) { - char *tname = strdup(fname); - if (tname == NULL) return TSDB_CODE_COM_OUT_OF_MEMORY; - int fd = open(fname, O_RDWR | O_CREAT, 0755); if (fd < 0) { uError("failed to open file %s since %s", fname, strerror(errno)); - return TAOS_SYSTEM_ERROR(errno); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; } - int code = tdInitKVStoreHeader(fd, fname); - if (code != TSDB_CODE_SUCCESS) return code; + if (tdInitKVStoreHeader(fd, fname) < 0) { + close(fd); + return -1; + } if (fsync(fd) < 0) { uError("failed to fsync file %s since %s", fname, strerror(errno)); - return TAOS_SYSTEM_ERROR(errno); + terrno = TAOS_SYSTEM_ERROR(errno); + close(fd); + return -1; } if (close(fd) < 0) { uError("failed to close file %s since %s", fname, strerror(errno)); - return TAOS_SYSTEM_ERROR(errno); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; } - return TSDB_CODE_SUCCESS; + return 0; +} + +int tdDestroyKVStore(char *fname) { + if (remove(fname) < 0) { + uError("failed to remove file %s since %s", fname, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + + return 0; } SKVStore *tdOpenKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH) { @@ -101,23 +114,31 @@ SKVStore *tdOpenKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH goto _err; } - // TODO: rewind the file if (tdLoadKVStoreHeader(pStore->sfd, pStore->fsnap, &info) < 0) goto _err; if (ftruncate(pStore->fd, info.size) < 0) { - uError("failed to truncate %s since %s", pStore->fname, strerror(errno)); + uError("failed to truncate %s to " PRId64 " size since %s", pStore->fname, info.size, strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); goto _err; } + if (tdUpdateKVStoreHeader(pStore->fd, pStore->fname, &info) < 0) goto _err; + close(pStore->sfd); pStore->sfd = -1; remove(pStore->fsnap); } - // TODO: Recover from the file - terrno = tdUpdateKVStoreHeader(pStore->fd, pStore->fname, &info); - if (terrno != TSDB_CODE_SUCCESS) goto _err; + if (tdLoadKVStoreHeader(pStore->fd, pStore->fname, &info) < 0) goto _err; + + struct stat tfstat; + if (fstat(pStore->fd, &tfstat) < 0) { + uError("failed to fstat file %s since %s", pStore->fname, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + ASSERT(info.size == tfstat.st_size); if (lseek(pStore->fd, TD_KVSTORE_HEADER_SIZE, SEEK_SET) < 0) { uError("failed to lseek file %s since %s", pStore->fname, strerror(errno)); @@ -125,8 +146,9 @@ SKVStore *tdOpenKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH goto _err; } - terrno = tdRestoreKVStore(pStore); - if (terrno != TSDB_CODE_SUCCESS) goto _err; + pStore->info.size += TD_KVSTORE_HEADER_SIZE; + + if (tdRestoreKVStore(pStore) < 0) goto _err; close(pStore->fd); pStore->fd = -1; @@ -146,7 +168,11 @@ _err: return NULL; } +void tdCloseKVStore(SKVStore *pStore) { tdFreeKVStore(pStore); } + int tdKVStoreStartCommit(SKVStore *pStore) { + ASSERT(pStore->fd < 0); + pStore->fd = open(pStore->fname, O_RDWR); if (pStore->fd < 0) { uError("failed to open file %s since %s", pStore->fname, strerror(errno)); @@ -180,6 +206,14 @@ int tdKVStoreStartCommit(SKVStore *pStore) { } pStore->sfd = -1; + if (lseek(pStore->fd, 0, SEEK_END) < 0) { + uError("failed to lseek file %s since %s", pStore->fname, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + ASSERT(pStore->info.size == lseek(pStore->fd, 0, SEEK_CUR)); + return 0; _err: @@ -195,11 +229,50 @@ _err: return -1; } +int tdUpdateKVStoreRecord(SKVStore *pStore, uint64_t uid, void *cont, int contLen) { + SKVRecord *pRecord = taosHashGet(pStore->map, (void *)&uid, sizeof(uid)); + if (pRecord != NULL) { + pStore->info.tombSize += (pRecord->size + sizeof(SKVRecord)); + } + + // TODO + return 0; +} + +int tdDropKVStoreRecord(SKVStore *pStore, uint64_t uid) { + SKVRecord rInfo = {0}; + char buf[128] = "\0"; + + SKVRecord *pRecord = taosHashGet(pStore->map, &uid, sizeof(uid)); + if (pRecord == NULL) { + uError("failed to drop KV store record with key " PRIu64 " since not find", uid); + return -1; + } + + rInfo.offset = -pRecord->offset; + rInfo.uid = pRecord->uid; + rInfo.size = pRecord->size; + + void *pBuf = tdEncodeKVRecord(buf, &rInfo); + + if (twrite(pStore->fd, buf, POINTER_DISTANCE(pBuf, buf)) < POINTER_DISTANCE(pBuf, buf)) { + uError("failed to write %d bytes to file %s since %s", POINTER_DISTANCE(pBuf, buf), pStore->fname, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + + pStore->info.size += POINTER_DISTANCE(pBuf, buf); + pStore->info.nDels++; + pStore->info.nRecords--; + pStore->info.tombSize += (rInfo.size + sizeof(SKVRecord) * 2); + + return 0; +} + int tdKVStoreEndCommit(SKVStore *pStore) { ASSERT(pStore->fd > 0); - terrno = tdUpdateKVStoreHeader(pStore->fd, pStore->fname, &(pStore->info)); - if (terrno != TSDB_CODE_SUCCESS) return -1; + if (tdUpdateKVStoreHeader(pStore->fd, pStore->fname, &(pStore->info)) < 0) return -1; if (fsync(pStore->fd) < 0) { uError("failed to fsync file %s since %s", pStore->fname, strerror(errno)); @@ -212,6 +285,7 @@ int tdKVStoreEndCommit(SKVStore *pStore) { terrno = TAOS_SYSTEM_ERROR(errno); return -1; } + pStore->fd = -1; remove(pStore->fsnap); return 0; @@ -220,14 +294,21 @@ int tdKVStoreEndCommit(SKVStore *pStore) { static int tdLoadKVStoreHeader(int fd, char *fname, SStoreInfo *pInfo) { char buf[TD_KVSTORE_HEADER_SIZE] = "\0"; + if (lseek(fd, 0, SEEK_SET) < 0) { + uError("failed to lseek file %s since %s", fname, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + if (tread(fd, buf, TD_KVSTORE_HEADER_SIZE) < TD_KVSTORE_HEADER_SIZE) { - uError("failed to read file %s since %s", fname, strerror(errno)); + uError("failed to read %d bytes from file %s since %s", TD_KVSTORE_HEADER_SIZE, fname, strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); return -1; } if (!taosCheckChecksumWhole((uint8_t *)buf, TD_KVSTORE_HEADER_SIZE)) { uError("file %s is broken", fname); + terrno = TSDB_CODE_COM_FILE_CORRUPTED; return -1; } @@ -241,17 +322,19 @@ static int tdUpdateKVStoreHeader(int fd, char *fname, SStoreInfo *pInfo) { if (lseek(fd, 0, SEEK_SET) < 0) { uError("failed to lseek file %s since %s", fname, strerror(errno)); - return TAOS_SYSTEM_ERROR(errno); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; } tdEncodeStoreInfo(buf, pInfo); taosCalcChecksumAppend(0, (uint8_t *)buf, TD_KVSTORE_HEADER_SIZE); if (twrite(fd, buf, TD_KVSTORE_HEADER_SIZE) < TD_KVSTORE_HEADER_SIZE) { - uError("failed to write file %s %d bytes since %s", fname, TD_KVSTORE_HEADER_SIZE, strerror(errno)); - return TAOS_SYSTEM_ERROR(errno); + uError("failed to write %d bytes to file %s since %s", TD_KVSTORE_HEADER_SIZE, fname, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; } - return TSDB_CODE_SUCCESS; + return 0; } static int tdInitKVStoreHeader(int fd, char *fname) { @@ -283,7 +366,10 @@ static SKVStore *tdNewKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void if (pStore == NULL) goto _err; pStore->fname = strdup(fname); - if (pStore->map == NULL) goto _err; + if (pStore->map == NULL) { + terrno = TSDB_CODE_COM_OUT_OF_MEMORY; + goto _err; + } pStore->fsnap = tdGetKVStoreSnapshotFname(fname); if (pStore->fsnap == NULL) goto _err; @@ -306,7 +392,6 @@ static SKVStore *tdNewKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void return pStore; _err: - terrno = TSDB_CODE_COM_OUT_OF_MEMORY; tdFreeKVStore(pStore); return NULL; } @@ -343,13 +428,13 @@ static char *tdGetKVStoreNewFname(char *fdata) { return fname; } -// static void *tdEncodeKVRecord(void *buf, SKVRecord *pRecord) { -// buf = taosEncodeFixedU64(buf, pRecord->uid); -// buf = taosEncodeFixedI64(buf, pRecord->offset); -// buf = taosEncodeFixedI32(buf, pRecord->size); +static void *tdEncodeKVRecord(void *buf, SKVRecord *pRecord) { + buf = taosEncodeFixedU64(buf, pRecord->uid); + buf = taosEncodeFixedI64(buf, pRecord->offset); + buf = taosEncodeFixedI32(buf, pRecord->size); -// return buf; -// } + return buf; +} static void *tdDecodeKVRecord(void *buf, SKVRecord *pRecord) { buf = taosDecodeFixedU64(buf, &(pRecord->uid)); @@ -361,9 +446,12 @@ static void *tdDecodeKVRecord(void *buf, SKVRecord *pRecord) { static int tdRestoreKVStore(SKVStore *pStore) { char tbuf[128] = "\0"; - char * buf = NULL; + void * buf = NULL; + int maxBufSize = 0; SKVRecord rInfo = {0}; + ASSERT(TD_KVSTORE_HEADER_SIZE == lseek(pStore->fd, 0, SEEK_CUR)); + while (true) { ssize_t tsize = tread(pStore->fd, tbuf, sizeof(SKVRecord)); if (tsize == 0) break; @@ -375,17 +463,25 @@ static int tdRestoreKVStore(SKVStore *pStore) { char *pBuf = tdDecodeKVRecord(tbuf, &rInfo); ASSERT(POINTER_DISTANCE(pBuf, tbuf) == sizeof(SKVRecord)); + ASSERT(rInfo.offset > 0 ? pStore->info.size == rInfo.offset : true); if (rInfo.offset < 0) { taosHashRemove(pStore->map, (void *)(&rInfo.uid), sizeof(rInfo.uid)); + pStore->info.size += sizeof(SKVRecord); + pStore->info.nRecords--; + pStore->info.nDels++; + pStore->info.tombSize += (rInfo.size + sizeof(SKVRecord) + sizeof(SKVRecord)); } else { - ASSERT(rInfo.offset > 0); + // TODO: add statistics + ASSERT(rInfo.offset > 0 && rInfo.size > 0); if (taosHashPut(pStore->map, (void *)(&rInfo.uid), sizeof(rInfo.uid), &rInfo, sizeof(rInfo)) < 0) { uError("failed to put record in KV store %s", pStore->fname); terrno = TSDB_CODE_COM_OUT_OF_MEMORY; goto _err; } + maxBufSize = MAX(maxBufSize, rInfo.size); + if (lseek(pStore->fd, rInfo.size, SEEK_CUR) < 0) { uError("failed to lseek file %s since %s", pStore->fname, strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); @@ -394,6 +490,13 @@ static int tdRestoreKVStore(SKVStore *pStore) { } } + buf = malloc(maxBufSize); + if (buf == NULL) { + uError("failed to allocate %d bytes in KV store %s", maxBufSize, pStore->fname); + terrno = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + SHashMutableIterator *pIter = taosHashCreateIter(pStore->map); if (pIter == NULL) { uError("failed to create hash iter while opening KV store %s", pStore->fname); @@ -418,11 +521,11 @@ static int tdRestoreKVStore(SKVStore *pStore) { if (!taosCheckChecksumWhole((uint8_t *)buf, pRecord->size)) { uError("file %s has checksum error, offset " PRId64 " size %d", pStore->fname, pRecord->offset, pRecord->size); - terrno = TSDB_CODE_TDB_FILE_CORRUPTED; + terrno = TSDB_CODE_COM_FILE_CORRUPTED; goto _err; } - if (pStore->iFunc) (*pStore->iFunc)(pStore->appH,buf, pRecord->size); + if (pStore->iFunc) (*pStore->iFunc)(pStore->appH, buf, pRecord->size); } taosHashDestroyIter(pIter); @@ -430,9 +533,9 @@ static int tdRestoreKVStore(SKVStore *pStore) { if (pStore->aFunc) (*pStore->aFunc)(pStore->appH); tfree(buf); - return TSDB_CODE_SUCCESS; + return 0; _err: tfree(buf); - return terrno; + return -1; } \ No newline at end of file