From 7d7d4ddbeda7696c4d05f54ef5d45cf830fd2097 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Mon, 8 Jun 2020 10:28:32 +0000 Subject: [PATCH 1/8] TD-402 --- src/tsdb/src/tsdbRead.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index 86ce44ad7b..b9ade9496c 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -665,7 +665,7 @@ static void handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBlock* if ((ASCENDING_TRAVERSE(pQueryHandle->order) && ((k1 != TSKEY_INITIAL_VAL && k1 < binfo.window.skey) || (k2 != TSKEY_INITIAL_VAL && k2 < binfo.window.skey))) || (!ASCENDING_TRAVERSE(pQueryHandle->order) && - (((k1 != TSKEY_INITIAL_VAL && k1 > binfo.window.skey) || (k2 != TSKEY_INITIAL_VAL && k2 > binfo.window.skey))))) { + (((k1 != TSKEY_INITIAL_VAL && k1 > binfo.window.ekey) || (k2 != TSKEY_INITIAL_VAL && k2 > binfo.window.ekey))))) { // do not load file block into buffer int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order) ? 1 : -1; From f8d84036d1cc38a7500db49301931a89f12e1016 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Mon, 8 Jun 2020 15:11:17 +0000 Subject: [PATCH 2/8] more --- src/util/src/tkvstore.c | 62 +++++++++++++++++++++++++++++++++++------ 1 file changed, 54 insertions(+), 8 deletions(-) diff --git a/src/util/src/tkvstore.c b/src/util/src/tkvstore.c index 148d8235a6..03bf4b1b00 100644 --- a/src/util/src/tkvstore.c +++ b/src/util/src/tkvstore.c @@ -34,14 +34,21 @@ #define TD_KVSTORE_SNAP_SUFFIX ".snap" #define TD_KVSTORE_NEW_SUFFIX ".new" +typedef struct { + uint64_t uid; + int64_t offset; + int32_t size; +} SKVRecord; + static int tdInitKVStoreHeader(int fd, char *fname); static void * tdEncodeStoreInfo(void *buf, SStoreInfo *pInfo); -// static void * tdDecodeStoreInfo(void *buf, SStoreInfo *pInfo); +static void * tdDecodeStoreInfo(void *buf, SStoreInfo *pInfo); static SKVStore *tdNewKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH); static char * tdGetKVStoreSnapshotFname(char *fdata); static char * tdGetKVStoreNewFname(char *fdata); static void tdFreeKVStore(SKVStore *pStore); static int tdUpdateKVStoreHeader(int fd, char *fname, SStoreInfo *pInfo); +static int tdLoadKVStoreHeader(int fd, char *fname, SStoreInfo *pInfo); int tdCreateKVStore(char *fname) { char *tname = strdup(fname); @@ -70,6 +77,7 @@ int tdCreateKVStore(char *fname) { } SKVStore *tdOpenKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH) { + SStoreInfo info = {0}; SKVStore *pStore = tdNewKVStore(fname, iFunc, aFunc, appH); if (pStore == NULL) return NULL; @@ -90,6 +98,13 @@ SKVStore *tdOpenKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH } // TODO: rewind the file + if (tdLoadKVStoreHeader(pStore->sfd, pStore->fsnap, &info) < 0) goto _err; + + if (ftruncate(pStore->fd, info.size) < 0) { + uError("failed to truncate %s since %s", pStore->fname, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + goto _err; + } close(pStore->sfd); pStore->sfd = -1; @@ -97,6 +112,18 @@ SKVStore *tdOpenKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH } // TODO: Recover from the file + terrno = tdUpdateKVStoreHeader(pStore->fd, pStore->fname, &info); + if (terrno != TSDB_CODE_SUCCESS) goto _err; + + if (lseek(pStore->fd, TD_KVSTORE_HEADER_SIZE, SEEK_SET) < 0) { + uError("failed to lseek file %s since %s", pStore->fname, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + while (true) { + + } return pStore; @@ -184,6 +211,25 @@ int tdKVStoreEndCommit(SKVStore *pStore) { return 0; } +static int tdLoadKVStoreHeader(int fd, char *fname, SStoreInfo *pInfo) { + char buf[TD_KVSTORE_HEADER_SIZE] = "\0"; + + if (tread(fd, buf, TD_KVSTORE_HEADER_SIZE) < TD_KVSTORE_HEADER_SIZE) { + uError("failed to read file %s since %s", fname, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + + if (!taosCheckChecksumWhole((uint8_t *)buf, TD_KVSTORE_HEADER_SIZE)) { + uError("file %s is broken", fname); + return -1; + } + + tdDecodeStoreInfo(buf, pInfo); + + return 0; +} + static int tdUpdateKVStoreHeader(int fd, char *fname, SStoreInfo *pInfo) { char buf[TD_KVSTORE_HEADER_SIZE] = "\0"; @@ -217,14 +263,14 @@ static void *tdEncodeStoreInfo(void *buf, SStoreInfo *pInfo) { return buf; } -// static void *tdDecodeStoreInfo(void *buf, SStoreInfo *pInfo) { -// buf = taosDecodeVariantI64(buf, &(pInfo->size)); -// buf = taosDecodeVariantI64(buf, &(pInfo->tombSize)); -// buf = taosDecodeVariantI64(buf, &(pInfo->nRecords)); -// buf = taosDecodeVariantI64(buf, &(pInfo->nDels)); +static void *tdDecodeStoreInfo(void *buf, SStoreInfo *pInfo) { + buf = taosDecodeVariantI64(buf, &(pInfo->size)); + buf = taosDecodeVariantI64(buf, &(pInfo->tombSize)); + buf = taosDecodeVariantI64(buf, &(pInfo->nRecords)); + buf = taosDecodeVariantI64(buf, &(pInfo->nDels)); -// return buf; -// } + return buf; +} static SKVStore *tdNewKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH) { SKVStore *pStore = (SKVStore *)malloc(sizeof(SKVStore)); From 652331bd18d3035055cf20e0db0a3287349b82c4 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Tue, 9 Jun 2020 03:54:08 +0000 Subject: [PATCH 3/8] more --- src/util/src/tkvstore.c | 108 ++++++++++++++++++++++++++++++++++++++-- 1 file changed, 104 insertions(+), 4 deletions(-) diff --git a/src/util/src/tkvstore.c b/src/util/src/tkvstore.c index 03bf4b1b00..f6ccdc9333 100644 --- a/src/util/src/tkvstore.c +++ b/src/util/src/tkvstore.c @@ -34,7 +34,7 @@ #define TD_KVSTORE_SNAP_SUFFIX ".snap" #define TD_KVSTORE_NEW_SUFFIX ".new" -typedef struct { +typedef struct __attribute__((packed)) { uint64_t uid; int64_t offset; int32_t size; @@ -49,6 +49,9 @@ static char * tdGetKVStoreNewFname(char *fdata); static void tdFreeKVStore(SKVStore *pStore); static int tdUpdateKVStoreHeader(int fd, char *fname, SStoreInfo *pInfo); static int tdLoadKVStoreHeader(int fd, char *fname, SStoreInfo *pInfo); +// static void * tdEncodeKVRecord(void *buf, SKVRecord *pRecord); +static void * tdDecodeKVRecord(void *buf, SKVRecord *pRecord); +static int tdRestoreKVStore(SKVStore *pStore); int tdCreateKVStore(char *fname) { char *tname = strdup(fname); @@ -78,6 +81,7 @@ int tdCreateKVStore(char *fname) { SKVStore *tdOpenKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH) { SStoreInfo info = {0}; + SKVStore *pStore = tdNewKVStore(fname, iFunc, aFunc, appH); if (pStore == NULL) return NULL; @@ -121,9 +125,11 @@ SKVStore *tdOpenKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH goto _err; } - while (true) { - - } + terrno = tdRestoreKVStore(pStore); + if (terrno != TSDB_CODE_SUCCESS) goto _err; + + close(pStore->fd); + pStore->fd = -1; return pStore; @@ -335,4 +341,98 @@ static char *tdGetKVStoreNewFname(char *fdata) { } sprintf(fname, "%s%s", fdata, TD_KVSTORE_NEW_SUFFIX); return fname; +} + +// static void *tdEncodeKVRecord(void *buf, SKVRecord *pRecord) { +// buf = taosEncodeFixedU64(buf, pRecord->uid); +// buf = taosEncodeFixedI64(buf, pRecord->offset); +// buf = taosEncodeFixedI32(buf, pRecord->size); + +// return buf; +// } + +static void *tdDecodeKVRecord(void *buf, SKVRecord *pRecord) { + buf = taosDecodeFixedU64(buf, &(pRecord->uid)); + buf = taosDecodeFixedI64(buf, &(pRecord->offset)); + buf = taosDecodeFixedI32(buf, &(pRecord->size)); + + return buf; +} + +static int tdRestoreKVStore(SKVStore *pStore) { + char tbuf[128] = "\0"; + char * buf = NULL; + SKVRecord rInfo = {0}; + + while (true) { + ssize_t tsize = tread(pStore->fd, tbuf, sizeof(SKVRecord)); + if (tsize == 0) break; + if (tsize < sizeof(SKVRecord)) { + uError("failed to read %d bytes from file %s since %s", sizeof(SKVRecord), pStore->fname, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + char *pBuf = tdDecodeKVRecord(tbuf, &rInfo); + ASSERT(POINTER_DISTANCE(pBuf, tbuf) == sizeof(SKVRecord)); + + if (rInfo.offset < 0) { + taosHashRemove(pStore->map, (void *)(&rInfo.uid), sizeof(rInfo.uid)); + } else { + ASSERT(rInfo.offset > 0); + if (taosHashPut(pStore->map, (void *)(&rInfo.uid), sizeof(rInfo.uid), &rInfo, sizeof(rInfo)) < 0) { + uError("failed to put record in KV store %s", pStore->fname); + terrno = TSDB_CODE_COM_OUT_OF_MEMORY; + goto _err; + } + + if (lseek(pStore->fd, rInfo.size, SEEK_CUR) < 0) { + uError("failed to lseek file %s since %s", pStore->fname, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + } + } + + SHashMutableIterator *pIter = taosHashCreateIter(pStore->map); + if (pIter == NULL) { + uError("failed to create hash iter while opening KV store %s", pStore->fname); + terrno = TSDB_CODE_COM_OUT_OF_MEMORY; + goto _err; + } + + while (taosHashIterNext(pIter)) { + SKVRecord *pRecord = taosHashIterGet(pIter); + + if (lseek(pStore->fd, pRecord->offset, SEEK_SET) < 0) { + uError("failed to lseek file %s since %s", pStore->fname, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + if (tread(pStore->fd, buf, pRecord->size) < pRecord->size) { + uError("failed to read %d bytes from file %s since %s", pRecord->size, pStore->fname, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + if (!taosCheckChecksumWhole((uint8_t *)buf, pRecord->size)) { + uError("file %s has checksum error, offset " PRId64 " size %d", pStore->fname, pRecord->offset, pRecord->size); + terrno = TSDB_CODE_TDB_FILE_CORRUPTED; + goto _err; + } + + if (pStore->iFunc) (*pStore->iFunc)(pStore->appH,buf, pRecord->size); + } + + taosHashDestroyIter(pIter); + + if (pStore->aFunc) (*pStore->aFunc)(pStore->appH); + + tfree(buf); + return TSDB_CODE_SUCCESS; + +_err: + tfree(buf); + return terrno; } \ No newline at end of file From 0f06fb97af83abd867311ce67c29e6dcc55137f5 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Tue, 9 Jun 2020 05:45:26 +0000 Subject: [PATCH 4/8] TD-353 --- src/tsdb/inc/tsdbMain.h | 4 ++-- src/tsdb/src/tsdbCache.c | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h index e06778a872..9601bbadfe 100644 --- a/src/tsdb/inc/tsdbMain.h +++ b/src/tsdb/inc/tsdbMain.h @@ -161,7 +161,7 @@ typedef struct { int64_t index; int numOfCacheBlocks; SList * memPool; -} STsdbCachePool; +} STsdbBufferPool; typedef struct { TSKEY keyFirst; @@ -173,7 +173,7 @@ typedef struct { typedef struct { int cacheBlockSize; int totalCacheBlocks; - STsdbCachePool pool; + STsdbBufferPool pool; STsdbCacheBlock *curBlock; SCacheMem * mem; SCacheMem * imem; diff --git a/src/tsdb/src/tsdbCache.c b/src/tsdb/src/tsdbCache.c index edc8472b34..24476d8997 100644 --- a/src/tsdb/src/tsdbCache.c +++ b/src/tsdb/src/tsdbCache.c @@ -35,7 +35,7 @@ STsdbCache *tsdbInitCache(int cacheBlockSize, int totalBlocks, TsdbRepoT *pRepo) pCache->totalCacheBlocks = totalBlocks; pCache->pRepo = pRepo; - STsdbCachePool *pPool = &(pCache->pool); + STsdbBufferPool *pPool = &(pCache->pool); pPool->index = 0; pPool->memPool = tdListNew(sizeof(STsdbCacheBlock *)); if (pPool->memPool == NULL) goto _err; @@ -106,7 +106,7 @@ static void tsdbFreeCacheMem(SCacheMem *mem) { } static int tsdbAllocBlockFromPool(STsdbCache *pCache) { - STsdbCachePool *pPool = &(pCache->pool); + STsdbBufferPool *pPool = &(pCache->pool); tsdbLockRepo(pCache->pRepo); if (listNEles(pPool->memPool) == 0) { @@ -170,7 +170,7 @@ int tsdbAlterCacheTotalBlocks(STsdbRepo *pRepo, int totalBlocks) { } static int tsdbAddCacheBlockToPool(STsdbCache *pCache) { - STsdbCachePool *pPool = &pCache->pool; + STsdbBufferPool *pPool = &pCache->pool; STsdbCacheBlock *pBlock = malloc(sizeof(STsdbCacheBlock) + pCache->cacheBlockSize); if (pBlock == NULL) return -1; @@ -184,7 +184,7 @@ static int tsdbAddCacheBlockToPool(STsdbCache *pCache) { } static int tsdbRemoveCacheBlockFromPool(STsdbCache *pCache) { - STsdbCachePool *pPool = &pCache->pool; + STsdbBufferPool *pPool = &pCache->pool; STsdbCacheBlock *pBlock = NULL; ASSERT(pCache->totalCacheBlocks >= 0); From 969c9cd0ca439d7822d45c7f08c04c52d038c958 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Wed, 10 Jun 2020 12:06:29 +0800 Subject: [PATCH 5/8] add more kvstore funcion --- src/util/inc/tkvstore.h | 5 +- src/util/src/tkvstore.c | 179 +++++++++++++++++++++++++++++++--------- 2 files changed, 144 insertions(+), 40 deletions(-) diff --git a/src/util/inc/tkvstore.h b/src/util/inc/tkvstore.h index 724c94e21d..a57d0e95cf 100644 --- a/src/util/inc/tkvstore.h +++ b/src/util/inc/tkvstore.h @@ -46,11 +46,12 @@ typedef struct { } SKVStore; int tdCreateKVStore(char *fname); -int tdDestroyKVStore(); +int tdDestroyKVStore(char *fname); SKVStore *tdOpenKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH); void tdCloseKVStore(SKVStore *pStore); int tdKVStoreStartCommit(SKVStore *pStore); -int tdUpdateRecordInKVStore(SKVStore *pStore, uint64_t uid, void *cont, int contLen); +int tdUpdateKVStoreRecord(SKVStore *pStore, uint64_t uid, void *cont, int contLen); +int tdDropKVStoreRecord(SKVStore *pStore, uint64_t uid); int tdKVStoreEndCommit(SKVStore *pStore); #ifdef __cplusplus diff --git a/src/util/src/tkvstore.c b/src/util/src/tkvstore.c index f6ccdc9333..0c6c21eb4f 100644 --- a/src/util/src/tkvstore.c +++ b/src/util/src/tkvstore.c @@ -49,34 +49,47 @@ static char * tdGetKVStoreNewFname(char *fdata); static void tdFreeKVStore(SKVStore *pStore); static int tdUpdateKVStoreHeader(int fd, char *fname, SStoreInfo *pInfo); static int tdLoadKVStoreHeader(int fd, char *fname, SStoreInfo *pInfo); -// static void * tdEncodeKVRecord(void *buf, SKVRecord *pRecord); +static void * tdEncodeKVRecord(void *buf, SKVRecord *pRecord); static void * tdDecodeKVRecord(void *buf, SKVRecord *pRecord); static int tdRestoreKVStore(SKVStore *pStore); int tdCreateKVStore(char *fname) { - char *tname = strdup(fname); - if (tname == NULL) return TSDB_CODE_COM_OUT_OF_MEMORY; - int fd = open(fname, O_RDWR | O_CREAT, 0755); if (fd < 0) { uError("failed to open file %s since %s", fname, strerror(errno)); - return TAOS_SYSTEM_ERROR(errno); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; } - int code = tdInitKVStoreHeader(fd, fname); - if (code != TSDB_CODE_SUCCESS) return code; + if (tdInitKVStoreHeader(fd, fname) < 0) { + close(fd); + return -1; + } if (fsync(fd) < 0) { uError("failed to fsync file %s since %s", fname, strerror(errno)); - return TAOS_SYSTEM_ERROR(errno); + terrno = TAOS_SYSTEM_ERROR(errno); + close(fd); + return -1; } if (close(fd) < 0) { uError("failed to close file %s since %s", fname, strerror(errno)); - return TAOS_SYSTEM_ERROR(errno); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; } - return TSDB_CODE_SUCCESS; + return 0; +} + +int tdDestroyKVStore(char *fname) { + if (remove(fname) < 0) { + uError("failed to remove file %s since %s", fname, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + + return 0; } SKVStore *tdOpenKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH) { @@ -101,23 +114,31 @@ SKVStore *tdOpenKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH goto _err; } - // TODO: rewind the file if (tdLoadKVStoreHeader(pStore->sfd, pStore->fsnap, &info) < 0) goto _err; if (ftruncate(pStore->fd, info.size) < 0) { - uError("failed to truncate %s since %s", pStore->fname, strerror(errno)); + uError("failed to truncate %s to " PRId64 " size since %s", pStore->fname, info.size, strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); goto _err; } + if (tdUpdateKVStoreHeader(pStore->fd, pStore->fname, &info) < 0) goto _err; + close(pStore->sfd); pStore->sfd = -1; remove(pStore->fsnap); } - // TODO: Recover from the file - terrno = tdUpdateKVStoreHeader(pStore->fd, pStore->fname, &info); - if (terrno != TSDB_CODE_SUCCESS) goto _err; + if (tdLoadKVStoreHeader(pStore->fd, pStore->fname, &info) < 0) goto _err; + + struct stat tfstat; + if (fstat(pStore->fd, &tfstat) < 0) { + uError("failed to fstat file %s since %s", pStore->fname, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + ASSERT(info.size == tfstat.st_size); if (lseek(pStore->fd, TD_KVSTORE_HEADER_SIZE, SEEK_SET) < 0) { uError("failed to lseek file %s since %s", pStore->fname, strerror(errno)); @@ -125,8 +146,9 @@ SKVStore *tdOpenKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH goto _err; } - terrno = tdRestoreKVStore(pStore); - if (terrno != TSDB_CODE_SUCCESS) goto _err; + pStore->info.size += TD_KVSTORE_HEADER_SIZE; + + if (tdRestoreKVStore(pStore) < 0) goto _err; close(pStore->fd); pStore->fd = -1; @@ -146,7 +168,11 @@ _err: return NULL; } +void tdCloseKVStore(SKVStore *pStore) { tdFreeKVStore(pStore); } + int tdKVStoreStartCommit(SKVStore *pStore) { + ASSERT(pStore->fd < 0); + pStore->fd = open(pStore->fname, O_RDWR); if (pStore->fd < 0) { uError("failed to open file %s since %s", pStore->fname, strerror(errno)); @@ -180,6 +206,14 @@ int tdKVStoreStartCommit(SKVStore *pStore) { } pStore->sfd = -1; + if (lseek(pStore->fd, 0, SEEK_END) < 0) { + uError("failed to lseek file %s since %s", pStore->fname, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + ASSERT(pStore->info.size == lseek(pStore->fd, 0, SEEK_CUR)); + return 0; _err: @@ -195,11 +229,50 @@ _err: return -1; } +int tdUpdateKVStoreRecord(SKVStore *pStore, uint64_t uid, void *cont, int contLen) { + SKVRecord *pRecord = taosHashGet(pStore->map, (void *)&uid, sizeof(uid)); + if (pRecord != NULL) { + pStore->info.tombSize += (pRecord->size + sizeof(SKVRecord)); + } + + // TODO + return 0; +} + +int tdDropKVStoreRecord(SKVStore *pStore, uint64_t uid) { + SKVRecord rInfo = {0}; + char buf[128] = "\0"; + + SKVRecord *pRecord = taosHashGet(pStore->map, &uid, sizeof(uid)); + if (pRecord == NULL) { + uError("failed to drop KV store record with key " PRIu64 " since not find", uid); + return -1; + } + + rInfo.offset = -pRecord->offset; + rInfo.uid = pRecord->uid; + rInfo.size = pRecord->size; + + void *pBuf = tdEncodeKVRecord(buf, &rInfo); + + if (twrite(pStore->fd, buf, POINTER_DISTANCE(pBuf, buf)) < POINTER_DISTANCE(pBuf, buf)) { + uError("failed to write %d bytes to file %s since %s", POINTER_DISTANCE(pBuf, buf), pStore->fname, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + + pStore->info.size += POINTER_DISTANCE(pBuf, buf); + pStore->info.nDels++; + pStore->info.nRecords--; + pStore->info.tombSize += (rInfo.size + sizeof(SKVRecord) * 2); + + return 0; +} + int tdKVStoreEndCommit(SKVStore *pStore) { ASSERT(pStore->fd > 0); - terrno = tdUpdateKVStoreHeader(pStore->fd, pStore->fname, &(pStore->info)); - if (terrno != TSDB_CODE_SUCCESS) return -1; + if (tdUpdateKVStoreHeader(pStore->fd, pStore->fname, &(pStore->info)) < 0) return -1; if (fsync(pStore->fd) < 0) { uError("failed to fsync file %s since %s", pStore->fname, strerror(errno)); @@ -212,6 +285,7 @@ int tdKVStoreEndCommit(SKVStore *pStore) { terrno = TAOS_SYSTEM_ERROR(errno); return -1; } + pStore->fd = -1; remove(pStore->fsnap); return 0; @@ -220,14 +294,21 @@ int tdKVStoreEndCommit(SKVStore *pStore) { static int tdLoadKVStoreHeader(int fd, char *fname, SStoreInfo *pInfo) { char buf[TD_KVSTORE_HEADER_SIZE] = "\0"; + if (lseek(fd, 0, SEEK_SET) < 0) { + uError("failed to lseek file %s since %s", fname, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + if (tread(fd, buf, TD_KVSTORE_HEADER_SIZE) < TD_KVSTORE_HEADER_SIZE) { - uError("failed to read file %s since %s", fname, strerror(errno)); + uError("failed to read %d bytes from file %s since %s", TD_KVSTORE_HEADER_SIZE, fname, strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); return -1; } if (!taosCheckChecksumWhole((uint8_t *)buf, TD_KVSTORE_HEADER_SIZE)) { uError("file %s is broken", fname); + terrno = TSDB_CODE_COM_FILE_CORRUPTED; return -1; } @@ -241,17 +322,19 @@ static int tdUpdateKVStoreHeader(int fd, char *fname, SStoreInfo *pInfo) { if (lseek(fd, 0, SEEK_SET) < 0) { uError("failed to lseek file %s since %s", fname, strerror(errno)); - return TAOS_SYSTEM_ERROR(errno); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; } tdEncodeStoreInfo(buf, pInfo); taosCalcChecksumAppend(0, (uint8_t *)buf, TD_KVSTORE_HEADER_SIZE); if (twrite(fd, buf, TD_KVSTORE_HEADER_SIZE) < TD_KVSTORE_HEADER_SIZE) { - uError("failed to write file %s %d bytes since %s", fname, TD_KVSTORE_HEADER_SIZE, strerror(errno)); - return TAOS_SYSTEM_ERROR(errno); + uError("failed to write %d bytes to file %s since %s", TD_KVSTORE_HEADER_SIZE, fname, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; } - return TSDB_CODE_SUCCESS; + return 0; } static int tdInitKVStoreHeader(int fd, char *fname) { @@ -283,7 +366,10 @@ static SKVStore *tdNewKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void if (pStore == NULL) goto _err; pStore->fname = strdup(fname); - if (pStore->map == NULL) goto _err; + if (pStore->map == NULL) { + terrno = TSDB_CODE_COM_OUT_OF_MEMORY; + goto _err; + } pStore->fsnap = tdGetKVStoreSnapshotFname(fname); if (pStore->fsnap == NULL) goto _err; @@ -306,7 +392,6 @@ static SKVStore *tdNewKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void return pStore; _err: - terrno = TSDB_CODE_COM_OUT_OF_MEMORY; tdFreeKVStore(pStore); return NULL; } @@ -343,13 +428,13 @@ static char *tdGetKVStoreNewFname(char *fdata) { return fname; } -// static void *tdEncodeKVRecord(void *buf, SKVRecord *pRecord) { -// buf = taosEncodeFixedU64(buf, pRecord->uid); -// buf = taosEncodeFixedI64(buf, pRecord->offset); -// buf = taosEncodeFixedI32(buf, pRecord->size); +static void *tdEncodeKVRecord(void *buf, SKVRecord *pRecord) { + buf = taosEncodeFixedU64(buf, pRecord->uid); + buf = taosEncodeFixedI64(buf, pRecord->offset); + buf = taosEncodeFixedI32(buf, pRecord->size); -// return buf; -// } + return buf; +} static void *tdDecodeKVRecord(void *buf, SKVRecord *pRecord) { buf = taosDecodeFixedU64(buf, &(pRecord->uid)); @@ -361,9 +446,12 @@ static void *tdDecodeKVRecord(void *buf, SKVRecord *pRecord) { static int tdRestoreKVStore(SKVStore *pStore) { char tbuf[128] = "\0"; - char * buf = NULL; + void * buf = NULL; + int maxBufSize = 0; SKVRecord rInfo = {0}; + ASSERT(TD_KVSTORE_HEADER_SIZE == lseek(pStore->fd, 0, SEEK_CUR)); + while (true) { ssize_t tsize = tread(pStore->fd, tbuf, sizeof(SKVRecord)); if (tsize == 0) break; @@ -375,17 +463,25 @@ static int tdRestoreKVStore(SKVStore *pStore) { char *pBuf = tdDecodeKVRecord(tbuf, &rInfo); ASSERT(POINTER_DISTANCE(pBuf, tbuf) == sizeof(SKVRecord)); + ASSERT(rInfo.offset > 0 ? pStore->info.size == rInfo.offset : true); if (rInfo.offset < 0) { taosHashRemove(pStore->map, (void *)(&rInfo.uid), sizeof(rInfo.uid)); + pStore->info.size += sizeof(SKVRecord); + pStore->info.nRecords--; + pStore->info.nDels++; + pStore->info.tombSize += (rInfo.size + sizeof(SKVRecord) + sizeof(SKVRecord)); } else { - ASSERT(rInfo.offset > 0); + // TODO: add statistics + ASSERT(rInfo.offset > 0 && rInfo.size > 0); if (taosHashPut(pStore->map, (void *)(&rInfo.uid), sizeof(rInfo.uid), &rInfo, sizeof(rInfo)) < 0) { uError("failed to put record in KV store %s", pStore->fname); terrno = TSDB_CODE_COM_OUT_OF_MEMORY; goto _err; } + maxBufSize = MAX(maxBufSize, rInfo.size); + if (lseek(pStore->fd, rInfo.size, SEEK_CUR) < 0) { uError("failed to lseek file %s since %s", pStore->fname, strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); @@ -394,6 +490,13 @@ static int tdRestoreKVStore(SKVStore *pStore) { } } + buf = malloc(maxBufSize); + if (buf == NULL) { + uError("failed to allocate %d bytes in KV store %s", maxBufSize, pStore->fname); + terrno = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + SHashMutableIterator *pIter = taosHashCreateIter(pStore->map); if (pIter == NULL) { uError("failed to create hash iter while opening KV store %s", pStore->fname); @@ -418,11 +521,11 @@ static int tdRestoreKVStore(SKVStore *pStore) { if (!taosCheckChecksumWhole((uint8_t *)buf, pRecord->size)) { uError("file %s has checksum error, offset " PRId64 " size %d", pStore->fname, pRecord->offset, pRecord->size); - terrno = TSDB_CODE_TDB_FILE_CORRUPTED; + terrno = TSDB_CODE_COM_FILE_CORRUPTED; goto _err; } - if (pStore->iFunc) (*pStore->iFunc)(pStore->appH,buf, pRecord->size); + if (pStore->iFunc) (*pStore->iFunc)(pStore->appH, buf, pRecord->size); } taosHashDestroyIter(pIter); @@ -430,9 +533,9 @@ static int tdRestoreKVStore(SKVStore *pStore) { if (pStore->aFunc) (*pStore->aFunc)(pStore->appH); tfree(buf); - return TSDB_CODE_SUCCESS; + return 0; _err: tfree(buf); - return terrno; + return -1; } \ No newline at end of file From dff3aa291d6b4b24b84f71389fb0339bfb4386f8 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Wed, 10 Jun 2020 12:09:23 +0800 Subject: [PATCH 6/8] error code --- src/inc/taoserror.h | 1 + 1 file changed, 1 insertion(+) diff --git a/src/inc/taoserror.h b/src/inc/taoserror.h index 5379b371ef..ac2af75742 100644 --- a/src/inc/taoserror.h +++ b/src/inc/taoserror.h @@ -71,6 +71,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_COM_OPS_NOT_SUPPORT, 0, 0x0100, "operations TAOS_DEFINE_ERROR(TSDB_CODE_COM_MEMORY_CORRUPTED, 0, 0x0101, "memory corrupted") TAOS_DEFINE_ERROR(TSDB_CODE_COM_OUT_OF_MEMORY, 0, 0x0102, "out of memory") TAOS_DEFINE_ERROR(TSDB_CODE_COM_INVALID_CFG_MSG, 0, 0x0103, "invalid config message") +TAOS_DEFINE_ERROR(TSDB_CODE_COM_FILE_CORRUPTED, 0, 0x0104, "file is corrupted") //client TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_SQL, 0, 0x0200, "invalid sql") From 2eee59a16463868f39819af3bc5ce1f6e8cec9cc Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Wed, 10 Jun 2020 06:05:55 +0000 Subject: [PATCH 7/8] fix coredump --- src/common/src/tdataformat.c | 6 +++--- src/tsdb/src/tsdbMetaFile.c | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/common/src/tdataformat.c b/src/common/src/tdataformat.c index c01141f0d6..77e91acc14 100644 --- a/src/common/src/tdataformat.c +++ b/src/common/src/tdataformat.c @@ -201,10 +201,10 @@ void dataColInit(SDataCol *pDataCol, STColumn *pCol, void **pBuf, int maxPoints) pDataCol->len = 0; if (pDataCol->type == TSDB_DATA_TYPE_BINARY || pDataCol->type == TSDB_DATA_TYPE_NCHAR) { - pDataCol->spaceSize = (sizeof(VarDataLenT) + pDataCol->bytes) * maxPoints; pDataCol->dataOff = (VarDataOffsetT *)(*pBuf); - pDataCol->pData = POINTER_SHIFT(*pBuf, TYPE_BYTES[pDataCol->type] * maxPoints); - *pBuf = POINTER_SHIFT(*pBuf, pDataCol->spaceSize + TYPE_BYTES[pDataCol->type] * maxPoints); + pDataCol->pData = POINTER_SHIFT(*pBuf, sizeof(VarDataOffsetT) * maxPoints); + pDataCol->spaceSize = pDataCol->bytes * maxPoints; + *pBuf = POINTER_SHIFT(*pBuf, pDataCol->spaceSize + sizeof(VarDataOffsetT) * maxPoints); } else { pDataCol->spaceSize = pDataCol->bytes * maxPoints; pDataCol->dataOff = NULL; diff --git a/src/tsdb/src/tsdbMetaFile.c b/src/tsdb/src/tsdbMetaFile.c index 19fcae94e3..921db8674a 100644 --- a/src/tsdb/src/tsdbMetaFile.c +++ b/src/tsdb/src/tsdbMetaFile.c @@ -105,7 +105,7 @@ int32_t tsdbInsertMetaRecord(SMetaFile *mfh, uint64_t uid, void *cont, int32_t c return -1; } - fsync(mfh->fd); + // fsync(mfh->fd); mfh->tombSize++; @@ -132,7 +132,7 @@ int32_t tsdbDeleteMetaRecord(SMetaFile *mfh, uint64_t uid) { return -1; } - fsync(mfh->fd); + // fsync(mfh->fd); mfh->nDel++; @@ -167,7 +167,7 @@ int32_t tsdbUpdateMetaRecord(SMetaFile *mfh, uint64_t uid, void *cont, int32_t c return -1; } - fsync(mfh->fd); + // fsync(mfh->fd); return 0; } From 69219ae7692b99d81ffd8abd5e7f16fbd2f026f9 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Wed, 10 Jun 2020 15:37:31 +0800 Subject: [PATCH 8/8] fix compile error --- src/util/src/tkvstore.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/util/src/tkvstore.c b/src/util/src/tkvstore.c index 0c6c21eb4f..88cd446349 100644 --- a/src/util/src/tkvstore.c +++ b/src/util/src/tkvstore.c @@ -34,10 +34,10 @@ #define TD_KVSTORE_SNAP_SUFFIX ".snap" #define TD_KVSTORE_NEW_SUFFIX ".new" -typedef struct __attribute__((packed)) { +typedef struct { uint64_t uid; int64_t offset; - int32_t size; + int64_t size; } SKVRecord; static int tdInitKVStoreHeader(int fd, char *fname); @@ -431,7 +431,7 @@ static char *tdGetKVStoreNewFname(char *fdata) { static void *tdEncodeKVRecord(void *buf, SKVRecord *pRecord) { buf = taosEncodeFixedU64(buf, pRecord->uid); buf = taosEncodeFixedI64(buf, pRecord->offset); - buf = taosEncodeFixedI32(buf, pRecord->size); + buf = taosEncodeFixedI64(buf, pRecord->size); return buf; } @@ -439,7 +439,7 @@ static void *tdEncodeKVRecord(void *buf, SKVRecord *pRecord) { static void *tdDecodeKVRecord(void *buf, SKVRecord *pRecord) { buf = taosDecodeFixedU64(buf, &(pRecord->uid)); buf = taosDecodeFixedI64(buf, &(pRecord->offset)); - buf = taosDecodeFixedI32(buf, &(pRecord->size)); + buf = taosDecodeFixedI64(buf, &(pRecord->size)); return buf; }