From 652331bd18d3035055cf20e0db0a3287349b82c4 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Tue, 9 Jun 2020 03:54:08 +0000 Subject: [PATCH] more --- src/util/src/tkvstore.c | 108 ++++++++++++++++++++++++++++++++++++++-- 1 file changed, 104 insertions(+), 4 deletions(-) diff --git a/src/util/src/tkvstore.c b/src/util/src/tkvstore.c index 03bf4b1b00..f6ccdc9333 100644 --- a/src/util/src/tkvstore.c +++ b/src/util/src/tkvstore.c @@ -34,7 +34,7 @@ #define TD_KVSTORE_SNAP_SUFFIX ".snap" #define TD_KVSTORE_NEW_SUFFIX ".new" -typedef struct { +typedef struct __attribute__((packed)) { uint64_t uid; int64_t offset; int32_t size; @@ -49,6 +49,9 @@ static char * tdGetKVStoreNewFname(char *fdata); static void tdFreeKVStore(SKVStore *pStore); static int tdUpdateKVStoreHeader(int fd, char *fname, SStoreInfo *pInfo); static int tdLoadKVStoreHeader(int fd, char *fname, SStoreInfo *pInfo); +// static void * tdEncodeKVRecord(void *buf, SKVRecord *pRecord); +static void * tdDecodeKVRecord(void *buf, SKVRecord *pRecord); +static int tdRestoreKVStore(SKVStore *pStore); int tdCreateKVStore(char *fname) { char *tname = strdup(fname); @@ -78,6 +81,7 @@ int tdCreateKVStore(char *fname) { SKVStore *tdOpenKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH) { SStoreInfo info = {0}; + SKVStore *pStore = tdNewKVStore(fname, iFunc, aFunc, appH); if (pStore == NULL) return NULL; @@ -121,9 +125,11 @@ SKVStore *tdOpenKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH goto _err; } - while (true) { - - } + terrno = tdRestoreKVStore(pStore); + if (terrno != TSDB_CODE_SUCCESS) goto _err; + + close(pStore->fd); + pStore->fd = -1; return pStore; @@ -335,4 +341,98 @@ static char *tdGetKVStoreNewFname(char *fdata) { } sprintf(fname, "%s%s", fdata, TD_KVSTORE_NEW_SUFFIX); return fname; +} + +// static void *tdEncodeKVRecord(void *buf, SKVRecord *pRecord) { +// buf = taosEncodeFixedU64(buf, pRecord->uid); +// buf = taosEncodeFixedI64(buf, pRecord->offset); +// buf = taosEncodeFixedI32(buf, pRecord->size); + +// return buf; +// } + +static void *tdDecodeKVRecord(void *buf, SKVRecord *pRecord) { + buf = taosDecodeFixedU64(buf, &(pRecord->uid)); + buf = taosDecodeFixedI64(buf, &(pRecord->offset)); + buf = taosDecodeFixedI32(buf, &(pRecord->size)); + + return buf; +} + +static int tdRestoreKVStore(SKVStore *pStore) { + char tbuf[128] = "\0"; + char * buf = NULL; + SKVRecord rInfo = {0}; + + while (true) { + ssize_t tsize = tread(pStore->fd, tbuf, sizeof(SKVRecord)); + if (tsize == 0) break; + if (tsize < sizeof(SKVRecord)) { + uError("failed to read %d bytes from file %s since %s", sizeof(SKVRecord), pStore->fname, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + char *pBuf = tdDecodeKVRecord(tbuf, &rInfo); + ASSERT(POINTER_DISTANCE(pBuf, tbuf) == sizeof(SKVRecord)); + + if (rInfo.offset < 0) { + taosHashRemove(pStore->map, (void *)(&rInfo.uid), sizeof(rInfo.uid)); + } else { + ASSERT(rInfo.offset > 0); + if (taosHashPut(pStore->map, (void *)(&rInfo.uid), sizeof(rInfo.uid), &rInfo, sizeof(rInfo)) < 0) { + uError("failed to put record in KV store %s", pStore->fname); + terrno = TSDB_CODE_COM_OUT_OF_MEMORY; + goto _err; + } + + if (lseek(pStore->fd, rInfo.size, SEEK_CUR) < 0) { + uError("failed to lseek file %s since %s", pStore->fname, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + } + } + + SHashMutableIterator *pIter = taosHashCreateIter(pStore->map); + if (pIter == NULL) { + uError("failed to create hash iter while opening KV store %s", pStore->fname); + terrno = TSDB_CODE_COM_OUT_OF_MEMORY; + goto _err; + } + + while (taosHashIterNext(pIter)) { + SKVRecord *pRecord = taosHashIterGet(pIter); + + if (lseek(pStore->fd, pRecord->offset, SEEK_SET) < 0) { + uError("failed to lseek file %s since %s", pStore->fname, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + if (tread(pStore->fd, buf, pRecord->size) < pRecord->size) { + uError("failed to read %d bytes from file %s since %s", pRecord->size, pStore->fname, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + if (!taosCheckChecksumWhole((uint8_t *)buf, pRecord->size)) { + uError("file %s has checksum error, offset " PRId64 " size %d", pStore->fname, pRecord->offset, pRecord->size); + terrno = TSDB_CODE_TDB_FILE_CORRUPTED; + goto _err; + } + + if (pStore->iFunc) (*pStore->iFunc)(pStore->appH,buf, pRecord->size); + } + + taosHashDestroyIter(pIter); + + if (pStore->aFunc) (*pStore->aFunc)(pStore->appH); + + tfree(buf); + return TSDB_CODE_SUCCESS; + +_err: + tfree(buf); + return terrno; } \ No newline at end of file