271 lines
7.1 KiB
C
271 lines
7.1 KiB
C
/*
|
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
|
*
|
|
* This program is free software: you can use, redistribute, and/or modify
|
|
* it under the terms of the GNU Affero General Public License, version 3
|
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
|
* * This program is distributed in the hope that it will be useful, but WITHOUT
|
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
|
*
|
|
* You should have received a copy of the GNU Affero General Public License
|
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
*/
|
|
|
|
#include "indexFstFile.h"
|
|
#include "indexComm.h"
|
|
#include "indexFstUtil.h"
|
|
#include "indexInt.h"
|
|
#include "indexUtil.h"
|
|
#include "os.h"
|
|
#include "tutil.h"
|
|
|
|
static int32_t kBlockSize = 4096;
|
|
|
|
typedef struct {
|
|
int32_t blockId;
|
|
int32_t nread;
|
|
char buf[0];
|
|
} SDataBlock;
|
|
|
|
static void deleteDataBlockFromLRU(const void* key, size_t keyLen, void* value) { taosMemoryFree(value); }
|
|
|
|
static void idxGenLRUKey(char* buf, const char* path, int32_t blockId) {
|
|
char* p = buf;
|
|
SERIALIZE_STR_VAR_TO_BUF(p, path, strlen(path));
|
|
SERIALIZE_VAR_TO_BUF(p, '_', char);
|
|
idxInt2str(blockId, p, 0);
|
|
return;
|
|
}
|
|
static int idxFileCtxDoWrite(IFileCtx* ctx, uint8_t* buf, int len) {
|
|
if (ctx->type == TFILE) {
|
|
assert(len == taosWriteFile(ctx->file.pFile, buf, len));
|
|
} else {
|
|
memcpy(ctx->mem.buf + ctx->offset, buf, len);
|
|
}
|
|
ctx->offset += len;
|
|
return len;
|
|
}
|
|
static int idxFileCtxDoRead(IFileCtx* ctx, uint8_t* buf, int len) {
|
|
int nRead = 0;
|
|
if (ctx->type == TFILE) {
|
|
#ifdef USE_MMAP
|
|
nRead = len < ctx->file.size ? len : ctx->file.size;
|
|
memcpy(buf, ctx->file.ptr, nRead);
|
|
#else
|
|
nRead = taosReadFile(ctx->file.pFile, buf, len);
|
|
#endif
|
|
} else {
|
|
memcpy(buf, ctx->mem.buf + ctx->offset, len);
|
|
}
|
|
ctx->offset += nRead;
|
|
|
|
return nRead;
|
|
}
|
|
static int idxFileCtxDoReadFrom(IFileCtx* ctx, uint8_t* buf, int len, int32_t offset) {
|
|
int32_t total = 0, nread = 0;
|
|
int32_t blkId = offset / kBlockSize;
|
|
int32_t blkOffset = offset % kBlockSize;
|
|
int32_t blkLeft = kBlockSize - blkOffset;
|
|
|
|
do {
|
|
char key[128] = {0};
|
|
idxGenLRUKey(key, ctx->file.buf, blkId);
|
|
LRUHandle* h = taosLRUCacheLookup(ctx->lru, key, strlen(key));
|
|
|
|
if (h) {
|
|
SDataBlock* blk = taosLRUCacheValue(ctx->lru, h);
|
|
nread = TMIN(blkLeft, len);
|
|
memcpy(buf + total, blk->buf + blkOffset, nread);
|
|
taosLRUCacheRelease(ctx->lru, h, false);
|
|
} else {
|
|
int32_t cacheMemSize = sizeof(SDataBlock) + kBlockSize;
|
|
|
|
SDataBlock* blk = taosMemoryCalloc(1, cacheMemSize);
|
|
blk->blockId = blkId;
|
|
blk->nread = taosPReadFile(ctx->file.pFile, blk->buf, kBlockSize, blkId * kBlockSize);
|
|
assert(blk->nread <= kBlockSize);
|
|
|
|
if (blk->nread < kBlockSize && blk->nread < len) {
|
|
break;
|
|
}
|
|
|
|
nread = TMIN(blkLeft, len);
|
|
memcpy(buf + total, blk->buf + blkOffset, nread);
|
|
|
|
LRUStatus s = taosLRUCacheInsert(ctx->lru, key, strlen(key), blk, cacheMemSize, deleteDataBlockFromLRU, NULL,
|
|
TAOS_LRU_PRIORITY_LOW);
|
|
if (s != TAOS_LRU_STATUS_OK) {
|
|
return -1;
|
|
}
|
|
}
|
|
total += nread;
|
|
len -= nread;
|
|
offset += nread;
|
|
|
|
blkId = offset / kBlockSize;
|
|
blkOffset = offset % kBlockSize;
|
|
blkLeft = kBlockSize - blkOffset;
|
|
|
|
} while (len > 0);
|
|
return total;
|
|
}
|
|
static int idxFileCtxGetSize(IFileCtx* ctx) {
|
|
if (ctx->type == TFILE) {
|
|
int64_t file_size = 0;
|
|
taosStatFile(ctx->file.buf, &file_size, NULL);
|
|
return (int)file_size;
|
|
}
|
|
return 0;
|
|
}
|
|
static int idxFileCtxDoFlush(IFileCtx* ctx) {
|
|
if (ctx->type == TFILE) {
|
|
taosFsyncFile(ctx->file.pFile);
|
|
} else {
|
|
// do nothing
|
|
}
|
|
return 1;
|
|
}
|
|
|
|
IFileCtx* idxFileCtxCreate(WriterType type, const char* path, bool readOnly, int32_t capacity) {
|
|
IFileCtx* ctx = taosMemoryCalloc(1, sizeof(IFileCtx));
|
|
if (ctx == NULL) {
|
|
return NULL;
|
|
}
|
|
|
|
ctx->type = type;
|
|
if (ctx->type == TFILE) {
|
|
// ugly code, refactor later
|
|
ctx->file.readOnly = readOnly;
|
|
memcpy(ctx->file.buf, path, strlen(path));
|
|
if (readOnly == false) {
|
|
ctx->file.pFile = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
|
|
taosFtruncateFile(ctx->file.pFile, 0);
|
|
taosStatFile(path, &ctx->file.size, NULL);
|
|
} else {
|
|
ctx->file.pFile = taosOpenFile(path, TD_FILE_READ);
|
|
|
|
int64_t size = 0;
|
|
taosFStatFile(ctx->file.pFile, &ctx->file.size, NULL);
|
|
ctx->file.size = (int)size;
|
|
#ifdef USE_MMAP
|
|
ctx->file.ptr = (char*)tfMmapReadOnly(ctx->file.pFile, ctx->file.size);
|
|
#endif
|
|
}
|
|
if (ctx->file.pFile == NULL) {
|
|
indexError("failed to open file, error %d", errno);
|
|
goto END;
|
|
}
|
|
} else if (ctx->type == TMEMORY) {
|
|
ctx->mem.buf = taosMemoryCalloc(1, sizeof(char) * capacity);
|
|
ctx->mem.cap = capacity;
|
|
}
|
|
|
|
ctx->write = idxFileCtxDoWrite;
|
|
ctx->read = idxFileCtxDoRead;
|
|
ctx->flush = idxFileCtxDoFlush;
|
|
ctx->readFrom = idxFileCtxDoReadFrom;
|
|
ctx->size = idxFileCtxGetSize;
|
|
|
|
ctx->offset = 0;
|
|
ctx->limit = capacity;
|
|
|
|
return ctx;
|
|
END:
|
|
if (ctx->type == TMEMORY) {
|
|
taosMemoryFree(ctx->mem.buf);
|
|
}
|
|
taosMemoryFree(ctx);
|
|
return NULL;
|
|
}
|
|
void idxFileCtxDestroy(IFileCtx* ctx, bool remove) {
|
|
if (ctx->type == TMEMORY) {
|
|
taosMemoryFree(ctx->mem.buf);
|
|
} else {
|
|
ctx->flush(ctx);
|
|
taosCloseFile(&ctx->file.pFile);
|
|
if (ctx->file.readOnly) {
|
|
#ifdef USE_MMAP
|
|
munmap(ctx->file.ptr, ctx->file.size);
|
|
#endif
|
|
}
|
|
if (ctx->file.readOnly == false) {
|
|
int64_t file_size = 0;
|
|
taosStatFile(ctx->file.buf, &file_size, NULL);
|
|
}
|
|
if (remove) {
|
|
unlink(ctx->file.buf);
|
|
}
|
|
}
|
|
taosMemoryFree(ctx);
|
|
}
|
|
|
|
IdxFstFile* idxFileCreate(void* wrt) {
|
|
IdxFstFile* cw = taosMemoryCalloc(1, sizeof(IdxFstFile));
|
|
if (cw == NULL) {
|
|
return NULL;
|
|
}
|
|
|
|
cw->wrt = wrt;
|
|
return cw;
|
|
}
|
|
void idxFileDestroy(IdxFstFile* cw) {
|
|
// free wrt object: close fd or free mem
|
|
idxFileFlush(cw);
|
|
// idxFileCtxDestroy((IFileCtx *)(cw->wrt));
|
|
taosMemoryFree(cw);
|
|
}
|
|
|
|
int idxFileWrite(IdxFstFile* write, uint8_t* buf, uint32_t len) {
|
|
if (write == NULL) {
|
|
return 0;
|
|
}
|
|
// update checksum
|
|
// write data to file/socket or mem
|
|
IFileCtx* ctx = write->wrt;
|
|
|
|
int nWrite = ctx->write(ctx, buf, len);
|
|
assert(nWrite == len);
|
|
write->count += len;
|
|
|
|
write->summer = taosCalcChecksum(write->summer, buf, len);
|
|
return len;
|
|
}
|
|
|
|
int idxFileRead(IdxFstFile* write, uint8_t* buf, uint32_t len) {
|
|
if (write == NULL) {
|
|
return 0;
|
|
}
|
|
IFileCtx* ctx = write->wrt;
|
|
return ctx->read(ctx, buf, len);
|
|
}
|
|
|
|
uint32_t idxFileMaskedCheckSum(IdxFstFile* write) {
|
|
//////
|
|
return write->summer;
|
|
}
|
|
|
|
int idxFileFlush(IdxFstFile* write) {
|
|
IFileCtx* ctx = write->wrt;
|
|
ctx->flush(ctx);
|
|
return 1;
|
|
}
|
|
|
|
void idxFilePackUintIn(IdxFstFile* writer, uint64_t n, uint8_t nBytes) {
|
|
assert(1 <= nBytes && nBytes <= 8);
|
|
uint8_t* buf = taosMemoryCalloc(8, sizeof(uint8_t));
|
|
for (uint8_t i = 0; i < nBytes; i++) {
|
|
buf[i] = (uint8_t)n;
|
|
n = n >> 8;
|
|
}
|
|
idxFileWrite(writer, buf, nBytes);
|
|
taosMemoryFree(buf);
|
|
return;
|
|
}
|
|
|
|
uint8_t idxFilePackUint(IdxFstFile* writer, uint64_t n) {
|
|
uint8_t nBytes = packSize(n);
|
|
idxFilePackUintIn(writer, n, nBytes);
|
|
return nBytes;
|
|
}
|