Merge pull request #23799 from taosdata/fix/TD-27423
fix(tsdb/skip-row): remove s3 size condition
This commit is contained in:
commit
24932d2d41
|
@ -124,6 +124,7 @@ int32_t* taosGetErrno();
|
|||
#define TSDB_CODE_INVALID_CFG_VALUE TAOS_DEF_ERROR_CODE(0, 0x0133)
|
||||
|
||||
#define TSDB_CODE_IP_NOT_IN_WHITE_LIST TAOS_DEF_ERROR_CODE(0, 0x0134)
|
||||
#define TSDB_CODE_FAILED_TO_CONNECT_S3 TAOS_DEF_ERROR_CODE(0, 0x0135)
|
||||
|
||||
//client
|
||||
#define TSDB_CODE_TSC_INVALID_OPERATION TAOS_DEF_ERROR_CODE(0, 0x0200)
|
||||
|
|
|
@ -891,6 +891,8 @@ long s3Size(const char *object_name) {
|
|||
|
||||
if ((cbd.status != S3StatusOK) && (cbd.status != S3StatusErrorPreconditionFailed)) {
|
||||
s3PrintError(__FILE__, __LINE__, __func__, cbd.status, cbd.err_msg);
|
||||
|
||||
return -1;
|
||||
}
|
||||
|
||||
size = cbd.content_length;
|
||||
|
|
|
@ -421,7 +421,7 @@ static int32_t tsdbCommitFileSetBegin(SCommitter2 *committer) {
|
|||
if (mtime < committer->ctx->now - tsS3UploadDelaySec) {
|
||||
committer->ctx->skipTsRow = true;
|
||||
}
|
||||
} else if (s3Size(object_name) > 0) {
|
||||
} else /*if (s3Size(object_name) > 0) */ {
|
||||
committer->ctx->skipTsRow = true;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -45,7 +45,7 @@ static int32_t tsdbDataFileReadHeadFooter(SDataFileReader *reader) {
|
|||
int32_t ftype = TSDB_FTYPE_HEAD;
|
||||
if (reader->fd[ftype]) {
|
||||
code = tsdbReadFile(reader->fd[ftype], reader->config->files[ftype].file.size - sizeof(SHeadFooter),
|
||||
(uint8_t *)reader->headFooter, sizeof(SHeadFooter));
|
||||
(uint8_t *)reader->headFooter, sizeof(SHeadFooter), 0);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
|
@ -67,7 +67,7 @@ static int32_t tsdbDataFileReadTombFooter(SDataFileReader *reader) {
|
|||
int32_t ftype = TSDB_FTYPE_TOMB;
|
||||
if (reader->fd[ftype]) {
|
||||
code = tsdbReadFile(reader->fd[ftype], reader->config->files[ftype].file.size - sizeof(STombFooter),
|
||||
(uint8_t *)reader->tombFooter, sizeof(STombFooter));
|
||||
(uint8_t *)reader->tombFooter, sizeof(STombFooter), 0);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
reader->ctx->tombFooterLoaded = true;
|
||||
|
@ -161,7 +161,7 @@ int32_t tsdbDataFileReadBrinBlk(SDataFileReader *reader, const TBrinBlkArray **b
|
|||
}
|
||||
|
||||
code = tsdbReadFile(reader->fd[TSDB_FTYPE_HEAD], reader->headFooter->brinBlkPtr->offset, data,
|
||||
reader->headFooter->brinBlkPtr->size);
|
||||
reader->headFooter->brinBlkPtr->size, 0);
|
||||
if (code) {
|
||||
taosMemoryFree(data);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
@ -191,7 +191,8 @@ int32_t tsdbDataFileReadBrinBlock(SDataFileReader *reader, const SBrinBlk *brinB
|
|||
code = tRealloc(&reader->config->bufArr[0], brinBlk->dp->size);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
code = tsdbReadFile(reader->fd[TSDB_FTYPE_HEAD], brinBlk->dp->offset, reader->config->bufArr[0], brinBlk->dp->size);
|
||||
code =
|
||||
tsdbReadFile(reader->fd[TSDB_FTYPE_HEAD], brinBlk->dp->offset, reader->config->bufArr[0], brinBlk->dp->size, 0);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
int32_t size = 0;
|
||||
|
@ -232,7 +233,8 @@ int32_t tsdbDataFileReadBlockData(SDataFileReader *reader, const SBrinRecord *re
|
|||
code = tRealloc(&reader->config->bufArr[0], record->blockSize);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
code = tsdbReadFile(reader->fd[TSDB_FTYPE_DATA], record->blockOffset, reader->config->bufArr[0], record->blockSize);
|
||||
code =
|
||||
tsdbReadFile(reader->fd[TSDB_FTYPE_DATA], record->blockOffset, reader->config->bufArr[0], record->blockSize, 0);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
code = tDecmprBlockData(reader->config->bufArr[0], record->blockSize, bData, &reader->config->bufArr[1]);
|
||||
|
@ -257,8 +259,8 @@ int32_t tsdbDataFileReadBlockDataByColumn(SDataFileReader *reader, const SBrinRe
|
|||
code = tRealloc(&reader->config->bufArr[0], record->blockKeySize);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
code =
|
||||
tsdbReadFile(reader->fd[TSDB_FTYPE_DATA], record->blockOffset, reader->config->bufArr[0], record->blockKeySize);
|
||||
code = tsdbReadFile(reader->fd[TSDB_FTYPE_DATA], record->blockOffset, reader->config->bufArr[0], record->blockKeySize,
|
||||
0);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
// hdr
|
||||
|
@ -296,10 +298,46 @@ int32_t tsdbDataFileReadBlockDataByColumn(SDataFileReader *reader, const SBrinRe
|
|||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
code = tsdbReadFile(reader->fd[TSDB_FTYPE_DATA], record->blockOffset + record->blockKeySize,
|
||||
reader->config->bufArr[0], hdr->szBlkCol);
|
||||
reader->config->bufArr[0], hdr->szBlkCol, 0);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
int64_t szHint = 0;
|
||||
if (bData->nColData > 3) {
|
||||
int64_t offset = 0;
|
||||
SBlockCol bc = {.cid = 0};
|
||||
SBlockCol *blockCol = &bc;
|
||||
|
||||
size = 0;
|
||||
SColData *colData = tBlockDataGetColDataByIdx(bData, 0);
|
||||
while (blockCol && blockCol->cid < colData->cid) {
|
||||
if (size < hdr->szBlkCol) {
|
||||
size += tGetBlockCol(reader->config->bufArr[0] + size, blockCol);
|
||||
} else {
|
||||
ASSERT(size == hdr->szBlkCol);
|
||||
blockCol = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
if (blockCol && blockCol->flag == HAS_VALUE) {
|
||||
offset = blockCol->offset;
|
||||
|
||||
SColData *colDataEnd = tBlockDataGetColDataByIdx(bData, bData->nColData - 1);
|
||||
while (blockCol && blockCol->cid < colDataEnd->cid) {
|
||||
if (size < hdr->szBlkCol) {
|
||||
size += tGetBlockCol(reader->config->bufArr[0] + size, blockCol);
|
||||
} else {
|
||||
ASSERT(size == hdr->szBlkCol);
|
||||
blockCol = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
if (blockCol && blockCol->flag == HAS_VALUE) {
|
||||
szHint = blockCol->offset + blockCol->szBitmap + blockCol->szOffset + blockCol->szValue - offset;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
SBlockCol bc[1] = {{.cid = 0}};
|
||||
SBlockCol *blockCol = bc;
|
||||
|
||||
|
@ -338,7 +376,7 @@ int32_t tsdbDataFileReadBlockDataByColumn(SDataFileReader *reader, const SBrinRe
|
|||
|
||||
code = tsdbReadFile(reader->fd[TSDB_FTYPE_DATA],
|
||||
record->blockOffset + record->blockKeySize + hdr->szBlkCol + blockCol->offset,
|
||||
reader->config->bufArr[1], size1);
|
||||
reader->config->bufArr[1], size1, i > 0 ? 0 : szHint);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
code = tsdbDecmprColData(reader->config->bufArr[1], blockCol, hdr->cmprAlg, hdr->nRow, colData,
|
||||
|
@ -366,7 +404,7 @@ int32_t tsdbDataFileReadBlockSma(SDataFileReader *reader, const SBrinRecord *rec
|
|||
code = tRealloc(&reader->config->bufArr[0], record->smaSize);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
code = tsdbReadFile(reader->fd[TSDB_FTYPE_SMA], record->smaOffset, reader->config->bufArr[0], record->smaSize);
|
||||
code = tsdbReadFile(reader->fd[TSDB_FTYPE_SMA], record->smaOffset, reader->config->bufArr[0], record->smaSize, 0);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
// decode sma data
|
||||
|
@ -405,7 +443,7 @@ int32_t tsdbDataFileReadTombBlk(SDataFileReader *reader, const TTombBlkArray **t
|
|||
}
|
||||
|
||||
code = tsdbReadFile(reader->fd[TSDB_FTYPE_TOMB], reader->tombFooter->tombBlkPtr->offset, data,
|
||||
reader->tombFooter->tombBlkPtr->size);
|
||||
reader->tombFooter->tombBlkPtr->size, 0);
|
||||
if (code) {
|
||||
taosMemoryFree(data);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
@ -435,7 +473,8 @@ int32_t tsdbDataFileReadTombBlock(SDataFileReader *reader, const STombBlk *tombB
|
|||
code = tRealloc(&reader->config->bufArr[0], tombBlk->dp->size);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
code = tsdbReadFile(reader->fd[TSDB_FTYPE_TOMB], tombBlk->dp->offset, reader->config->bufArr[0], tombBlk->dp->size);
|
||||
code =
|
||||
tsdbReadFile(reader->fd[TSDB_FTYPE_TOMB], tombBlk->dp->offset, reader->config->bufArr[0], tombBlk->dp->size, 0);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
int32_t size = 0;
|
||||
|
@ -488,8 +527,8 @@ struct SDataFileWriter {
|
|||
STombBlock tombBlock[1];
|
||||
int32_t tombBlockIdx;
|
||||
// range
|
||||
SVersionRange range;
|
||||
SVersionRange tombRange;
|
||||
SVersionRange range;
|
||||
SVersionRange tombRange;
|
||||
} ctx[1];
|
||||
|
||||
STFile files[TSDB_FTYPE_MAX];
|
||||
|
|
|
@ -34,7 +34,7 @@ typedef struct SFDataPtr {
|
|||
extern int32_t tsdbOpenFile(const char *path, STsdb *pTsdb, int32_t flag, STsdbFD **ppFD);
|
||||
extern void tsdbCloseFile(STsdbFD **ppFD);
|
||||
extern int32_t tsdbWriteFile(STsdbFD *pFD, int64_t offset, const uint8_t *pBuf, int64_t size);
|
||||
extern int32_t tsdbReadFile(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t size);
|
||||
extern int32_t tsdbReadFile(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t size, int64_t szHint);
|
||||
extern int32_t tsdbFsyncFile(STsdbFD *pFD);
|
||||
|
||||
#ifdef __cplusplus
|
||||
|
|
|
@ -901,7 +901,7 @@ int32_t tsdbFSEditCommit(STFileSystem *fs) {
|
|||
if (mtime < now - tsS3UploadDelaySec) {
|
||||
skipMerge = true;
|
||||
}
|
||||
} else if (s3Size(object_name) > 0) {
|
||||
} else /* if (s3Size(object_name) > 0) */ {
|
||||
skipMerge = true;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -568,7 +568,7 @@ static int32_t tsdbMerge(void *arg) {
|
|||
if (mtime < now - tsS3UploadDelaySec) {
|
||||
skipMerge = true;
|
||||
}
|
||||
} else if (s3Size(object_name) > 0) {
|
||||
} else /* if (s3Size(object_name) > 0) */ {
|
||||
skipMerge = true;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -26,7 +26,17 @@ static int32_t tsdbOpenFileImpl(STsdbFD *pFD) {
|
|||
if (pFD->pFD == NULL) {
|
||||
int errsv = errno;
|
||||
const char *object_name = taosDirEntryBaseName((char *)path);
|
||||
long s3_size = tsS3Enabled ? s3Size(object_name) : 0;
|
||||
long s3_size = 0;
|
||||
if (tsS3Enabled) {
|
||||
long size = s3Size(object_name);
|
||||
if (size < 0) {
|
||||
code = terrno = TSDB_CODE_FAILED_TO_CONNECT_S3;
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
s3_size = size;
|
||||
}
|
||||
|
||||
if (tsS3Enabled && !strncmp(path + strlen(path) - 5, ".data", 5) && s3_size > 0) {
|
||||
#ifndef S3_BLOCK_CACHE
|
||||
s3EvictCache(path, s3_size);
|
||||
|
@ -48,6 +58,7 @@ static int32_t tsdbOpenFileImpl(STsdbFD *pFD) {
|
|||
// pFD->szFile = s3_size;
|
||||
#endif
|
||||
} else {
|
||||
tsdbInfo("no file: %s", path);
|
||||
code = TAOS_SYSTEM_ERROR(errsv);
|
||||
// taosMemoryFree(pFD);
|
||||
goto _exit;
|
||||
|
@ -283,7 +294,7 @@ _exit:
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbReadFileS3(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t size) {
|
||||
static int32_t tsdbReadFileS3(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t size, int64_t szHint) {
|
||||
int32_t code = 0;
|
||||
int64_t n = 0;
|
||||
int32_t szPgCont = PAGE_CONTENT_SIZE(pFD->szPage);
|
||||
|
@ -330,7 +341,7 @@ static int32_t tsdbReadFileS3(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64
|
|||
memcpy(pBuf + n, pFD->pBuf + bOffset, nRead);
|
||||
|
||||
n += nRead;
|
||||
pgno++;
|
||||
++pgno;
|
||||
bOffset = 0;
|
||||
}
|
||||
|
||||
|
@ -339,7 +350,12 @@ static int32_t tsdbReadFileS3(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64
|
|||
uint8_t *pBlock = NULL;
|
||||
int64_t retrieve_offset = PAGE_OFFSET(pgno, pFD->szPage);
|
||||
int64_t pgnoEnd = pgno - 1 + (bOffset + size - n + szPgCont - 1) / szPgCont;
|
||||
int64_t retrieve_size = (pgnoEnd - pgno + 1) * pFD->szPage;
|
||||
|
||||
if (szHint > 0) {
|
||||
pgnoEnd = pgno - 1 + (bOffset + szHint - n + szPgCont - 1) / szPgCont;
|
||||
}
|
||||
|
||||
int64_t retrieve_size = (pgnoEnd - pgno + 1) * pFD->szPage;
|
||||
code = s3GetObjectBlock(pFD->objName, retrieve_offset, retrieve_size, 1, &pBlock);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _exit;
|
||||
|
@ -350,6 +366,10 @@ static int32_t tsdbReadFileS3(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64
|
|||
for (int i = 0; i < nPage; ++i) {
|
||||
tsdbCacheSetPageS3(pFD->pTsdb->pgCache, pFD, pgno, pBlock + i * pFD->szPage);
|
||||
|
||||
if (szHint > 0 && n >= size) {
|
||||
++pgno;
|
||||
continue;
|
||||
}
|
||||
memcpy(pFD->pBuf, pBlock + i * pFD->szPage, pFD->szPage);
|
||||
|
||||
// check
|
||||
|
@ -364,7 +384,7 @@ static int32_t tsdbReadFileS3(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64
|
|||
memcpy(pBuf + n, pFD->pBuf + bOffset, nRead);
|
||||
|
||||
n += nRead;
|
||||
pgno++;
|
||||
++pgno;
|
||||
bOffset = 0;
|
||||
}
|
||||
|
||||
|
@ -375,7 +395,7 @@ _exit:
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbReadFile(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t size) {
|
||||
int32_t tsdbReadFile(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t size, int64_t szHint) {
|
||||
int32_t code = 0;
|
||||
if (!pFD->pFD) {
|
||||
code = tsdbOpenFileImpl(pFD);
|
||||
|
@ -385,7 +405,7 @@ int32_t tsdbReadFile(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t size)
|
|||
}
|
||||
|
||||
if (pFD->s3File && tsS3BlockSize < 0) {
|
||||
return tsdbReadFileS3(pFD, offset, pBuf, size);
|
||||
return tsdbReadFileS3(pFD, offset, pBuf, size, szHint);
|
||||
} else {
|
||||
return tsdbReadFileImp(pFD, offset, pBuf, size);
|
||||
}
|
||||
|
@ -1141,7 +1161,7 @@ int32_t tsdbReadBlockIdx(SDataFReader *pReader, SArray *aBlockIdx) {
|
|||
if (code) goto _err;
|
||||
|
||||
// read
|
||||
code = tsdbReadFile(pReader->pHeadFD, offset, pReader->aBuf[0], size);
|
||||
code = tsdbReadFile(pReader->pHeadFD, offset, pReader->aBuf[0], size, 0);
|
||||
if (code) goto _err;
|
||||
|
||||
// decode
|
||||
|
@ -1178,7 +1198,7 @@ int32_t tsdbReadSttBlk(SDataFReader *pReader, int32_t iStt, SArray *aSttBlk) {
|
|||
if (code) goto _err;
|
||||
|
||||
// read
|
||||
code = tsdbReadFile(pReader->aSttFD[iStt], offset, pReader->aBuf[0], size);
|
||||
code = tsdbReadFile(pReader->aSttFD[iStt], offset, pReader->aBuf[0], size, 0);
|
||||
if (code) goto _err;
|
||||
|
||||
// decode
|
||||
|
@ -1211,7 +1231,7 @@ int32_t tsdbReadDataBlk(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *m
|
|||
if (code) goto _err;
|
||||
|
||||
// read
|
||||
code = tsdbReadFile(pReader->pHeadFD, offset, pReader->aBuf[0], size);
|
||||
code = tsdbReadFile(pReader->pHeadFD, offset, pReader->aBuf[0], size, 0);
|
||||
if (code) goto _err;
|
||||
|
||||
// decode
|
||||
|
@ -1242,7 +1262,7 @@ int32_t tsdbReadBlockSma(SDataFReader *pReader, SDataBlk *pDataBlk, SArray *aCol
|
|||
if (code) goto _err;
|
||||
|
||||
// read
|
||||
code = tsdbReadFile(pReader->pSmaFD, pSmaInfo->offset, pReader->aBuf[0], pSmaInfo->size);
|
||||
code = tsdbReadFile(pReader->pSmaFD, pSmaInfo->offset, pReader->aBuf[0], pSmaInfo->size, 0);
|
||||
if (code) goto _err;
|
||||
|
||||
// decode
|
||||
|
@ -1276,7 +1296,7 @@ static int32_t tsdbReadBlockDataImpl(SDataFReader *pReader, SBlockInfo *pBlkInfo
|
|||
code = tRealloc(&pReader->aBuf[0], pBlkInfo->szKey);
|
||||
if (code) goto _err;
|
||||
|
||||
code = tsdbReadFile(pFD, pBlkInfo->offset, pReader->aBuf[0], pBlkInfo->szKey);
|
||||
code = tsdbReadFile(pFD, pBlkInfo->offset, pReader->aBuf[0], pBlkInfo->szKey, 0);
|
||||
if (code) goto _err;
|
||||
|
||||
SDiskDataHdr hdr;
|
||||
|
@ -1322,7 +1342,7 @@ static int32_t tsdbReadBlockDataImpl(SDataFReader *pReader, SBlockInfo *pBlkInfo
|
|||
code = tRealloc(&pReader->aBuf[0], hdr.szBlkCol);
|
||||
if (code) goto _err;
|
||||
|
||||
code = tsdbReadFile(pFD, offset, pReader->aBuf[0], hdr.szBlkCol);
|
||||
code = tsdbReadFile(pFD, offset, pReader->aBuf[0], hdr.szBlkCol, 0);
|
||||
if (code) goto _err;
|
||||
}
|
||||
|
||||
|
@ -1366,7 +1386,7 @@ static int32_t tsdbReadBlockDataImpl(SDataFReader *pReader, SBlockInfo *pBlkInfo
|
|||
code = tRealloc(&pReader->aBuf[1], size);
|
||||
if (code) goto _err;
|
||||
|
||||
code = tsdbReadFile(pFD, offset, pReader->aBuf[1], size);
|
||||
code = tsdbReadFile(pFD, offset, pReader->aBuf[1], size, 0);
|
||||
if (code) goto _err;
|
||||
|
||||
code = tsdbDecmprColData(pReader->aBuf[1], pBlockCol, hdr.cmprAlg, hdr.nRow, pColData, &pReader->aBuf[2]);
|
||||
|
@ -1392,7 +1412,7 @@ int32_t tsdbReadDataBlockEx(SDataFReader *pReader, SDataBlk *pDataBlk, SBlockDat
|
|||
if (code) goto _err;
|
||||
|
||||
// read
|
||||
code = tsdbReadFile(pReader->pDataFD, pBlockInfo->offset, pReader->aBuf[0], pBlockInfo->szBlock);
|
||||
code = tsdbReadFile(pReader->pDataFD, pBlockInfo->offset, pReader->aBuf[0], pBlockInfo->szBlock, 0);
|
||||
if (code) goto _err;
|
||||
|
||||
// decmpr
|
||||
|
@ -1444,7 +1464,7 @@ int32_t tsdbReadSttBlockEx(SDataFReader *pReader, int32_t iStt, SSttBlk *pSttBlk
|
|||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
// read
|
||||
code = tsdbReadFile(pReader->aSttFD[iStt], pSttBlk->bInfo.offset, pReader->aBuf[0], pSttBlk->bInfo.szBlock);
|
||||
code = tsdbReadFile(pReader->aSttFD[iStt], pSttBlk->bInfo.offset, pReader->aBuf[0], pSttBlk->bInfo.szBlock, 0);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
// decmpr
|
||||
|
@ -1700,7 +1720,7 @@ int32_t tsdbReadDelDatav1(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelDa
|
|||
if (code) goto _err;
|
||||
|
||||
// read
|
||||
code = tsdbReadFile(pReader->pReadH, offset, pReader->aBuf[0], size);
|
||||
code = tsdbReadFile(pReader->pReadH, offset, pReader->aBuf[0], size, 0);
|
||||
if (code) goto _err;
|
||||
|
||||
// // decode
|
||||
|
@ -1740,7 +1760,7 @@ int32_t tsdbReadDelIdx(SDelFReader *pReader, SArray *aDelIdx) {
|
|||
if (code) goto _err;
|
||||
|
||||
// read
|
||||
code = tsdbReadFile(pReader->pReadH, offset, pReader->aBuf[0], size);
|
||||
code = tsdbReadFile(pReader->pReadH, offset, pReader->aBuf[0], size, 0);
|
||||
if (code) goto _err;
|
||||
|
||||
// decode
|
||||
|
|
|
@ -60,7 +60,7 @@ int32_t tsdbSttFileReaderOpen(const char *fname, const SSttFileReaderConfig *con
|
|||
int64_t offset = config->file->size - sizeof(SSttFooter);
|
||||
ASSERT(offset >= TSDB_FHDR_SIZE);
|
||||
|
||||
code = tsdbReadFile(reader[0]->fd, offset, (uint8_t *)(reader[0]->footer), sizeof(SSttFooter));
|
||||
code = tsdbReadFile(reader[0]->fd, offset, (uint8_t *)(reader[0]->footer), sizeof(SSttFooter), 0);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
_exit:
|
||||
|
@ -97,7 +97,7 @@ int32_t tsdbSttFileReadStatisBlk(SSttFileReader *reader, const TStatisBlkArray *
|
|||
if (!data) return TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
||||
int32_t code =
|
||||
tsdbReadFile(reader->fd, reader->footer->statisBlkPtr->offset, data, reader->footer->statisBlkPtr->size);
|
||||
tsdbReadFile(reader->fd, reader->footer->statisBlkPtr->offset, data, reader->footer->statisBlkPtr->size, 0);
|
||||
if (code) {
|
||||
taosMemoryFree(data);
|
||||
return code;
|
||||
|
@ -125,7 +125,7 @@ int32_t tsdbSttFileReadTombBlk(SSttFileReader *reader, const TTombBlkArray **tom
|
|||
if (!data) return TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
||||
int32_t code =
|
||||
tsdbReadFile(reader->fd, reader->footer->tombBlkPtr->offset, data, reader->footer->tombBlkPtr->size);
|
||||
tsdbReadFile(reader->fd, reader->footer->tombBlkPtr->offset, data, reader->footer->tombBlkPtr->size, 0);
|
||||
if (code) {
|
||||
taosMemoryFree(data);
|
||||
return code;
|
||||
|
@ -152,7 +152,8 @@ int32_t tsdbSttFileReadSttBlk(SSttFileReader *reader, const TSttBlkArray **sttBl
|
|||
void *data = taosMemoryMalloc(reader->footer->sttBlkPtr->size);
|
||||
if (!data) return TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
||||
int32_t code = tsdbReadFile(reader->fd, reader->footer->sttBlkPtr->offset, data, reader->footer->sttBlkPtr->size);
|
||||
int32_t code =
|
||||
tsdbReadFile(reader->fd, reader->footer->sttBlkPtr->offset, data, reader->footer->sttBlkPtr->size, 0);
|
||||
if (code) {
|
||||
taosMemoryFree(data);
|
||||
return code;
|
||||
|
@ -177,7 +178,7 @@ int32_t tsdbSttFileReadBlockData(SSttFileReader *reader, const SSttBlk *sttBlk,
|
|||
code = tRealloc(&reader->config->bufArr[0], sttBlk->bInfo.szBlock);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
code = tsdbReadFile(reader->fd, sttBlk->bInfo.offset, reader->config->bufArr[0], sttBlk->bInfo.szBlock);
|
||||
code = tsdbReadFile(reader->fd, sttBlk->bInfo.offset, reader->config->bufArr[0], sttBlk->bInfo.szBlock, 0);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
code = tDecmprBlockData(reader->config->bufArr[0], sttBlk->bInfo.szBlock, bData, &reader->config->bufArr[1]);
|
||||
|
@ -209,7 +210,7 @@ int32_t tsdbSttFileReadBlockDataByColumn(SSttFileReader *reader, const SSttBlk *
|
|||
code = tRealloc(&reader->config->bufArr[0], sttBlk->bInfo.szKey);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
code = tsdbReadFile(reader->fd, sttBlk->bInfo.offset, reader->config->bufArr[0], sttBlk->bInfo.szKey);
|
||||
code = tsdbReadFile(reader->fd, sttBlk->bInfo.offset, reader->config->bufArr[0], sttBlk->bInfo.szKey, 0);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
// hdr
|
||||
|
@ -255,7 +256,7 @@ int32_t tsdbSttFileReadBlockDataByColumn(SSttFileReader *reader, const SSttBlk *
|
|||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
code = tsdbReadFile(reader->fd, sttBlk->bInfo.offset + sttBlk->bInfo.szKey, reader->config->bufArr[0],
|
||||
hdr->szBlkCol);
|
||||
hdr->szBlkCol, 0);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
|
@ -296,7 +297,7 @@ int32_t tsdbSttFileReadBlockDataByColumn(SSttFileReader *reader, const SSttBlk *
|
|||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
code = tsdbReadFile(reader->fd, sttBlk->bInfo.offset + sttBlk->bInfo.szKey + hdr->szBlkCol + blockCol->offset,
|
||||
reader->config->bufArr[1], size1);
|
||||
reader->config->bufArr[1], size1, 0);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
code = tsdbDecmprColData(reader->config->bufArr[1], blockCol, hdr->cmprAlg, hdr->nRow, colData,
|
||||
|
@ -321,7 +322,7 @@ int32_t tsdbSttFileReadTombBlock(SSttFileReader *reader, const STombBlk *tombBlk
|
|||
code = tRealloc(&reader->config->bufArr[0], tombBlk->dp->size);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
code = tsdbReadFile(reader->fd, tombBlk->dp->offset, reader->config->bufArr[0], tombBlk->dp->size);
|
||||
code = tsdbReadFile(reader->fd, tombBlk->dp->offset, reader->config->bufArr[0], tombBlk->dp->size, 0);
|
||||
if (code) TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
int64_t size = 0;
|
||||
|
@ -352,7 +353,7 @@ int32_t tsdbSttFileReadStatisBlock(SSttFileReader *reader, const SStatisBlk *sta
|
|||
code = tRealloc(&reader->config->bufArr[0], statisBlk->dp->size);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
code = tsdbReadFile(reader->fd, statisBlk->dp->offset, reader->config->bufArr[0], statisBlk->dp->size);
|
||||
code = tsdbReadFile(reader->fd, statisBlk->dp->offset, reader->config->bufArr[0], statisBlk->dp->size, 0);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
int64_t size = 0;
|
||||
|
@ -405,7 +406,7 @@ struct SSttFileWriter {
|
|||
};
|
||||
|
||||
static int32_t tsdbFileDoWriteSttBlockData(STsdbFD *fd, SBlockData *blockData, int8_t cmprAlg, int64_t *fileSize,
|
||||
TSttBlkArray *sttBlkArray, uint8_t **bufArr, SVersionRange *range) {
|
||||
TSttBlkArray *sttBlkArray, uint8_t **bufArr, SVersionRange *range) {
|
||||
if (blockData->nRow == 0) return 0;
|
||||
|
||||
int32_t code = 0;
|
||||
|
|
|
@ -101,6 +101,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_APP_IS_STOPPING, "Database is closing d
|
|||
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_DATA_FMT, "Invalid data format")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_CFG_VALUE, "Invalid configuration value")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_IP_NOT_IN_WHITE_LIST, "Not allowed to connect")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_FAILED_TO_CONNECT_S3, "Failed to connect to s3 server")
|
||||
|
||||
//client
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_OPERATION, "Invalid operation")
|
||||
|
|
Loading…
Reference in New Issue