commit
37109d561c
|
@ -319,7 +319,7 @@ int tdInitDataCols(SDataCols *pCols, STSchema *pSchema);
|
||||||
SDataCols *tdDupDataCols(SDataCols *pCols, bool keepData);
|
SDataCols *tdDupDataCols(SDataCols *pCols, bool keepData);
|
||||||
SDataCols *tdFreeDataCols(SDataCols *pCols);
|
SDataCols *tdFreeDataCols(SDataCols *pCols);
|
||||||
void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols *pCols);
|
void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols *pCols);
|
||||||
int tdMergeDataCols(SDataCols *target, SDataCols *src, int rowsToMerge);
|
int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge, int *pOffset);
|
||||||
|
|
||||||
// ----------------- K-V data row structure
|
// ----------------- K-V data row structure
|
||||||
/*
|
/*
|
||||||
|
|
|
@ -441,30 +441,35 @@ void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols *pCols)
|
||||||
pCols->numOfRows++;
|
pCols->numOfRows++;
|
||||||
}
|
}
|
||||||
|
|
||||||
int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge) {
|
int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge, int *pOffset) {
|
||||||
ASSERT(rowsToMerge > 0 && rowsToMerge <= source->numOfRows);
|
ASSERT(rowsToMerge > 0 && rowsToMerge <= source->numOfRows);
|
||||||
ASSERT(target->numOfCols == source->numOfCols);
|
ASSERT(target->numOfCols == source->numOfCols);
|
||||||
|
int offset = 0;
|
||||||
|
|
||||||
|
if (pOffset == NULL) {
|
||||||
|
pOffset = &offset;
|
||||||
|
}
|
||||||
|
|
||||||
SDataCols *pTarget = NULL;
|
SDataCols *pTarget = NULL;
|
||||||
|
|
||||||
if (dataColsKeyLast(target) < dataColsKeyFirst(source)) { // No overlap
|
if ((target->numOfRows == 0) || (dataColsKeyLast(target) < dataColsKeyFirst(source))) { // No overlap
|
||||||
ASSERT(target->numOfRows + rowsToMerge <= target->maxPoints);
|
ASSERT(target->numOfRows + rowsToMerge <= target->maxPoints);
|
||||||
for (int i = 0; i < rowsToMerge; i++) {
|
for (int i = 0; i < rowsToMerge; i++) {
|
||||||
for (int j = 0; j < source->numOfCols; j++) {
|
for (int j = 0; j < source->numOfCols; j++) {
|
||||||
if (source->cols[j].len > 0) {
|
if (source->cols[j].len > 0) {
|
||||||
dataColAppendVal(target->cols + j, tdGetColDataOfRow(source->cols + j, i), target->numOfRows,
|
dataColAppendVal(target->cols + j, tdGetColDataOfRow(source->cols + j, i + (*pOffset)), target->numOfRows,
|
||||||
target->maxPoints);
|
target->maxPoints);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
target->numOfRows++;
|
target->numOfRows++;
|
||||||
}
|
}
|
||||||
|
(*pOffset) += rowsToMerge;
|
||||||
} else {
|
} else {
|
||||||
pTarget = tdDupDataCols(target, true);
|
pTarget = tdDupDataCols(target, true);
|
||||||
if (pTarget == NULL) goto _err;
|
if (pTarget == NULL) goto _err;
|
||||||
|
|
||||||
int iter1 = 0;
|
int iter1 = 0;
|
||||||
int iter2 = 0;
|
tdMergeTwoDataCols(target, pTarget, &iter1, pTarget->numOfRows, source, pOffset, source->numOfRows,
|
||||||
tdMergeTwoDataCols(target, pTarget, &iter1, pTarget->numOfRows, source, &iter2, source->numOfRows,
|
|
||||||
pTarget->numOfRows + rowsToMerge);
|
pTarget->numOfRows + rowsToMerge);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -31,6 +31,8 @@ typedef struct {
|
||||||
#define TFS_UNDECIDED_ID -1
|
#define TFS_UNDECIDED_ID -1
|
||||||
#define TFS_PRIMARY_LEVEL 0
|
#define TFS_PRIMARY_LEVEL 0
|
||||||
#define TFS_PRIMARY_ID 0
|
#define TFS_PRIMARY_ID 0
|
||||||
|
#define TFS_MIN_LEVEL 0
|
||||||
|
#define TFS_MAX_LEVEL (TSDB_MAX_TIERS - 1)
|
||||||
|
|
||||||
// FS APIs ====================================
|
// FS APIs ====================================
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
|
|
@ -409,6 +409,9 @@ void tsdbDecCommitRef(int vgId);
|
||||||
int tsdbSyncSend(void *pRepo, SOCKET socketFd);
|
int tsdbSyncSend(void *pRepo, SOCKET socketFd);
|
||||||
int tsdbSyncRecv(void *pRepo, SOCKET socketFd);
|
int tsdbSyncRecv(void *pRepo, SOCKET socketFd);
|
||||||
|
|
||||||
|
// For TSDB Compact
|
||||||
|
int tsdbCompact(STsdbRepo *pRepo);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -6,6 +6,10 @@ AUX_SOURCE_DIRECTORY(src SRC)
|
||||||
ADD_LIBRARY(tsdb ${SRC})
|
ADD_LIBRARY(tsdb ${SRC})
|
||||||
TARGET_LINK_LIBRARIES(tsdb tfs common tutil)
|
TARGET_LINK_LIBRARIES(tsdb tfs common tutil)
|
||||||
|
|
||||||
|
IF (TD_TSDB_PLUGINS)
|
||||||
|
TARGET_LINK_LIBRARIES(tsdb tsdbPlugins)
|
||||||
|
ENDIF ()
|
||||||
|
|
||||||
IF (TD_LINUX)
|
IF (TD_LINUX)
|
||||||
# Someone has no gtest directory, so comment it
|
# Someone has no gtest directory, so comment it
|
||||||
# ADD_SUBDIRECTORY(tests)
|
# ADD_SUBDIRECTORY(tests)
|
||||||
|
|
|
@ -29,10 +29,17 @@ typedef struct {
|
||||||
int64_t size;
|
int64_t size;
|
||||||
} SKVRecord;
|
} SKVRecord;
|
||||||
|
|
||||||
|
#define TSDB_DEFAULT_BLOCK_ROWS(maxRows) ((maxRows)*4 / 5)
|
||||||
|
|
||||||
void tsdbGetRtnSnap(STsdbRepo *pRepo, SRtn *pRtn);
|
void tsdbGetRtnSnap(STsdbRepo *pRepo, SRtn *pRtn);
|
||||||
int tsdbEncodeKVRecord(void **buf, SKVRecord *pRecord);
|
int tsdbEncodeKVRecord(void **buf, SKVRecord *pRecord);
|
||||||
void *tsdbDecodeKVRecord(void *buf, SKVRecord *pRecord);
|
void *tsdbDecodeKVRecord(void *buf, SKVRecord *pRecord);
|
||||||
void *tsdbCommitData(STsdbRepo *pRepo);
|
void *tsdbCommitData(STsdbRepo *pRepo);
|
||||||
|
int tsdbApplyRtnOnFSet(STsdbRepo *pRepo, SDFileSet *pSet, SRtn *pRtn);
|
||||||
|
int tsdbWriteBlockInfoImpl(SDFile *pHeadf, STable *pTable, SArray *pSupA, SArray *pSubA, void **ppBuf, SBlockIdx *pIdx);
|
||||||
|
int tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf);
|
||||||
|
int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDataCols *pDataCols, SBlock *pBlock,
|
||||||
|
bool isLast, bool isSuper, void **ppBuf, void **ppCBuf);
|
||||||
int tsdbApplyRtn(STsdbRepo *pRepo);
|
int tsdbApplyRtn(STsdbRepo *pRepo);
|
||||||
|
|
||||||
static FORCE_INLINE int tsdbGetFidLevel(int fid, SRtn *pRtn) {
|
static FORCE_INLINE int tsdbGetFidLevel(int fid, SRtn *pRtn) {
|
||||||
|
|
|
@ -16,6 +16,8 @@
|
||||||
#ifndef _TD_TSDB_COMMIT_QUEUE_H_
|
#ifndef _TD_TSDB_COMMIT_QUEUE_H_
|
||||||
#define _TD_TSDB_COMMIT_QUEUE_H_
|
#define _TD_TSDB_COMMIT_QUEUE_H_
|
||||||
|
|
||||||
int tsdbScheduleCommit(STsdbRepo *pRepo);
|
typedef enum { COMMIT_REQ, COMPACT_REQ } TSDB_REQ_T;
|
||||||
|
|
||||||
|
int tsdbScheduleCommit(STsdbRepo *pRepo, TSDB_REQ_T req);
|
||||||
|
|
||||||
#endif /* _TD_TSDB_COMMIT_QUEUE_H_ */
|
#endif /* _TD_TSDB_COMMIT_QUEUE_H_ */
|
|
@ -0,0 +1,28 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
#ifndef _TD_TSDB_COMPACT_H_
|
||||||
|
#define _TD_TSDB_COMPACT_H_
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
extern "C" {
|
||||||
|
#endif
|
||||||
|
|
||||||
|
void *tsdbCompactImpl(STsdbRepo *pRepo);
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#endif /* _TD_TSDB_COMPACT_H_ */
|
|
@ -64,6 +64,8 @@ extern "C" {
|
||||||
#include "tsdbReadImpl.h"
|
#include "tsdbReadImpl.h"
|
||||||
// Commit
|
// Commit
|
||||||
#include "tsdbCommit.h"
|
#include "tsdbCommit.h"
|
||||||
|
// Compact
|
||||||
|
#include "tsdbCompact.h"
|
||||||
// Commit Queue
|
// Commit Queue
|
||||||
#include "tsdbCommitQueue.h"
|
#include "tsdbCommitQueue.h"
|
||||||
// Main definitions
|
// Main definitions
|
||||||
|
|
|
@ -51,7 +51,7 @@ typedef struct {
|
||||||
#define TSDB_COMMIT_LAST_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_LAST)
|
#define TSDB_COMMIT_LAST_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_LAST)
|
||||||
#define TSDB_COMMIT_BUF(ch) TSDB_READ_BUF(&((ch)->readh))
|
#define TSDB_COMMIT_BUF(ch) TSDB_READ_BUF(&((ch)->readh))
|
||||||
#define TSDB_COMMIT_COMP_BUF(ch) TSDB_READ_COMP_BUF(&((ch)->readh))
|
#define TSDB_COMMIT_COMP_BUF(ch) TSDB_READ_COMP_BUF(&((ch)->readh))
|
||||||
#define TSDB_COMMIT_DEFAULT_ROWS(ch) (TSDB_COMMIT_REPO(ch)->config.maxRowsPerFileBlock * 4 / 5)
|
#define TSDB_COMMIT_DEFAULT_ROWS(ch) TSDB_DEFAULT_BLOCK_ROWS(TSDB_COMMIT_REPO(ch)->config.maxRowsPerFileBlock)
|
||||||
#define TSDB_COMMIT_TXN_VERSION(ch) FS_TXN_VERSION(REPO_FS(TSDB_COMMIT_REPO(ch)))
|
#define TSDB_COMMIT_TXN_VERSION(ch) FS_TXN_VERSION(REPO_FS(TSDB_COMMIT_REPO(ch)))
|
||||||
|
|
||||||
static int tsdbCommitMeta(STsdbRepo *pRepo);
|
static int tsdbCommitMeta(STsdbRepo *pRepo);
|
||||||
|
@ -72,7 +72,6 @@ static int tsdbCommitToTable(SCommitH *pCommith, int tid);
|
||||||
static int tsdbSetCommitTable(SCommitH *pCommith, STable *pTable);
|
static int tsdbSetCommitTable(SCommitH *pCommith, STable *pTable);
|
||||||
static int tsdbComparKeyBlock(const void *arg1, const void *arg2);
|
static int tsdbComparKeyBlock(const void *arg1, const void *arg2);
|
||||||
static int tsdbWriteBlockInfo(SCommitH *pCommih);
|
static int tsdbWriteBlockInfo(SCommitH *pCommih);
|
||||||
static int tsdbWriteBlockIdx(SCommitH *pCommih);
|
|
||||||
static int tsdbCommitMemData(SCommitH *pCommith, SCommitIter *pIter, TSKEY keyLimit, bool toData);
|
static int tsdbCommitMemData(SCommitH *pCommith, SCommitIter *pIter, TSKEY keyLimit, bool toData);
|
||||||
static int tsdbMergeMemData(SCommitH *pCommith, SCommitIter *pIter, int bidx);
|
static int tsdbMergeMemData(SCommitH *pCommith, SCommitIter *pIter, int bidx);
|
||||||
static int tsdbMoveBlock(SCommitH *pCommith, int bidx);
|
static int tsdbMoveBlock(SCommitH *pCommith, int bidx);
|
||||||
|
@ -86,7 +85,6 @@ static void tsdbCloseCommitFile(SCommitH *pCommith, bool hasError);
|
||||||
static bool tsdbCanAddSubBlock(SCommitH *pCommith, SBlock *pBlock, SMergeInfo *pInfo);
|
static bool tsdbCanAddSubBlock(SCommitH *pCommith, SBlock *pBlock, SMergeInfo *pInfo);
|
||||||
static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget,
|
static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget,
|
||||||
TSKEY maxKey, int maxRows, int8_t update);
|
TSKEY maxKey, int maxRows, int8_t update);
|
||||||
static int tsdbApplyRtnOnFSet(STsdbRepo *pRepo, SDFileSet *pSet, SRtn *pRtn);
|
|
||||||
|
|
||||||
void *tsdbCommitData(STsdbRepo *pRepo) {
|
void *tsdbCommitData(STsdbRepo *pRepo) {
|
||||||
if (pRepo->imem == NULL) {
|
if (pRepo->imem == NULL) {
|
||||||
|
@ -117,6 +115,151 @@ _err:
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int tsdbApplyRtnOnFSet(STsdbRepo *pRepo, SDFileSet *pSet, SRtn *pRtn) {
|
||||||
|
SDiskID did;
|
||||||
|
SDFileSet nSet;
|
||||||
|
STsdbFS * pfs = REPO_FS(pRepo);
|
||||||
|
int level;
|
||||||
|
|
||||||
|
ASSERT(pSet->fid >= pRtn->minFid);
|
||||||
|
|
||||||
|
level = tsdbGetFidLevel(pSet->fid, pRtn);
|
||||||
|
|
||||||
|
tfsAllocDisk(level, &(did.level), &(did.id));
|
||||||
|
if (did.level == TFS_UNDECIDED_LEVEL) {
|
||||||
|
terrno = TSDB_CODE_TDB_NO_AVAIL_DISK;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (did.level > TSDB_FSET_LEVEL(pSet)) {
|
||||||
|
// Need to move the FSET to higher level
|
||||||
|
tsdbInitDFileSet(&nSet, did, REPO_ID(pRepo), pSet->fid, FS_TXN_VERSION(pfs));
|
||||||
|
|
||||||
|
if (tsdbCopyDFileSet(pSet, &nSet) < 0) {
|
||||||
|
tsdbError("vgId:%d failed to copy FSET %d from level %d to level %d since %s", REPO_ID(pRepo), pSet->fid,
|
||||||
|
TSDB_FSET_LEVEL(pSet), did.level, tstrerror(terrno));
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tsdbUpdateDFileSet(pfs, &nSet) < 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
tsdbInfo("vgId:%d FSET %d is copied from level %d disk id %d to level %d disk id %d", REPO_ID(pRepo), pSet->fid,
|
||||||
|
TSDB_FSET_LEVEL(pSet), TSDB_FSET_ID(pSet), did.level, did.id);
|
||||||
|
} else {
|
||||||
|
// On a correct level
|
||||||
|
if (tsdbUpdateDFileSet(pfs, pSet) < 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int tsdbWriteBlockInfoImpl(SDFile *pHeadf, STable *pTable, SArray *pSupA, SArray *pSubA, void **ppBuf,
|
||||||
|
SBlockIdx *pIdx) {
|
||||||
|
size_t nSupBlocks;
|
||||||
|
size_t nSubBlocks;
|
||||||
|
uint32_t tlen;
|
||||||
|
SBlockInfo *pBlkInfo;
|
||||||
|
int64_t offset;
|
||||||
|
SBlock * pBlock;
|
||||||
|
|
||||||
|
memset(pIdx, 0, sizeof(*pIdx));
|
||||||
|
|
||||||
|
nSupBlocks = taosArrayGetSize(pSupA);
|
||||||
|
nSubBlocks = (pSubA == NULL) ? 0 : taosArrayGetSize(pSubA);
|
||||||
|
|
||||||
|
if (nSupBlocks <= 0) {
|
||||||
|
// No data (data all deleted)
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
tlen = (uint32_t)(sizeof(SBlockInfo) + sizeof(SBlock) * (nSupBlocks + nSubBlocks) + sizeof(TSCKSUM));
|
||||||
|
if (tsdbMakeRoom(ppBuf, tlen) < 0) return -1;
|
||||||
|
pBlkInfo = *ppBuf;
|
||||||
|
|
||||||
|
pBlkInfo->delimiter = TSDB_FILE_DELIMITER;
|
||||||
|
pBlkInfo->tid = TABLE_TID(pTable);
|
||||||
|
pBlkInfo->uid = TABLE_UID(pTable);
|
||||||
|
|
||||||
|
memcpy((void *)(pBlkInfo->blocks), taosArrayGet(pSupA, 0), nSupBlocks * sizeof(SBlock));
|
||||||
|
if (nSubBlocks > 0) {
|
||||||
|
memcpy((void *)(pBlkInfo->blocks + nSupBlocks), taosArrayGet(pSubA, 0), nSubBlocks * sizeof(SBlock));
|
||||||
|
|
||||||
|
for (int i = 0; i < nSupBlocks; i++) {
|
||||||
|
pBlock = pBlkInfo->blocks + i;
|
||||||
|
|
||||||
|
if (pBlock->numOfSubBlocks > 1) {
|
||||||
|
pBlock->offset += (sizeof(SBlockInfo) + sizeof(SBlock) * nSupBlocks);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
taosCalcChecksumAppend(0, (uint8_t *)pBlkInfo, tlen);
|
||||||
|
|
||||||
|
if (tsdbAppendDFile(pHeadf, (void *)pBlkInfo, tlen, &offset) < 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
tsdbUpdateDFileMagic(pHeadf, POINTER_SHIFT(pBlkInfo, tlen - sizeof(TSCKSUM)));
|
||||||
|
|
||||||
|
// Set pIdx
|
||||||
|
pBlock = taosArrayGetLast(pSupA);
|
||||||
|
|
||||||
|
pIdx->tid = TABLE_TID(pTable);
|
||||||
|
pIdx->uid = TABLE_UID(pTable);
|
||||||
|
pIdx->hasLast = pBlock->last ? 1 : 0;
|
||||||
|
pIdx->maxKey = pBlock->keyLast;
|
||||||
|
pIdx->numOfBlocks = (uint32_t)nSupBlocks;
|
||||||
|
pIdx->len = tlen;
|
||||||
|
pIdx->offset = (uint32_t)offset;
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf) {
|
||||||
|
SBlockIdx *pBlkIdx;
|
||||||
|
size_t nidx = taosArrayGetSize(pIdxA);
|
||||||
|
int tlen = 0, size;
|
||||||
|
int64_t offset;
|
||||||
|
|
||||||
|
if (nidx <= 0) {
|
||||||
|
// All data are deleted
|
||||||
|
pHeadf->info.offset = 0;
|
||||||
|
pHeadf->info.len = 0;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (size_t i = 0; i < nidx; i++) {
|
||||||
|
pBlkIdx = (SBlockIdx *)taosArrayGet(pIdxA, i);
|
||||||
|
|
||||||
|
size = tsdbEncodeSBlockIdx(NULL, pBlkIdx);
|
||||||
|
if (tsdbMakeRoom(ppBuf, tlen + size) < 0) return -1;
|
||||||
|
|
||||||
|
void *ptr = POINTER_SHIFT(*ppBuf, tlen);
|
||||||
|
tsdbEncodeSBlockIdx(&ptr, pBlkIdx);
|
||||||
|
|
||||||
|
tlen += size;
|
||||||
|
}
|
||||||
|
|
||||||
|
tlen += sizeof(TSCKSUM);
|
||||||
|
if (tsdbMakeRoom(ppBuf, tlen) < 0) return -1;
|
||||||
|
taosCalcChecksumAppend(0, (uint8_t *)(*ppBuf), tlen);
|
||||||
|
|
||||||
|
if (tsdbAppendDFile(pHeadf, *ppBuf, tlen, &offset) < tlen) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
tsdbUpdateDFileMagic(pHeadf, POINTER_SHIFT(*ppBuf, tlen - sizeof(TSCKSUM)));
|
||||||
|
pHeadf->info.offset = (uint32_t)offset;
|
||||||
|
pHeadf->info.len = tlen;
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
// =================== Commit Meta Data
|
// =================== Commit Meta Data
|
||||||
static int tsdbCommitMeta(STsdbRepo *pRepo) {
|
static int tsdbCommitMeta(STsdbRepo *pRepo) {
|
||||||
STsdbFS * pfs = REPO_FS(pRepo);
|
STsdbFS * pfs = REPO_FS(pRepo);
|
||||||
|
@ -446,7 +589,8 @@ static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tsdbWriteBlockIdx(pCommith) < 0) {
|
if (tsdbWriteBlockIdx(TSDB_COMMIT_HEAD_FILE(pCommith), pCommith->aBlkIdx, (void **)(&(TSDB_COMMIT_BUF(pCommith)))) <
|
||||||
|
0) {
|
||||||
tsdbError("vgId:%d failed to write SBlockIdx part to FSET %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno));
|
tsdbError("vgId:%d failed to write SBlockIdx part to FSET %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno));
|
||||||
tsdbCloseCommitFile(pCommith, true);
|
tsdbCloseCommitFile(pCommith, true);
|
||||||
// revert the file change
|
// revert the file change
|
||||||
|
@ -754,23 +898,21 @@ static int tsdbComparKeyBlock(const void *arg1, const void *arg2) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static int tsdbWriteBlock(SCommitH *pCommith, SDFile *pDFile, SDataCols *pDataCols, SBlock *pBlock, bool isLast,
|
int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDataCols *pDataCols, SBlock *pBlock,
|
||||||
bool isSuper) {
|
bool isLast, bool isSuper, void **ppBuf, void **ppCBuf) {
|
||||||
STsdbRepo * pRepo = TSDB_COMMIT_REPO(pCommith);
|
|
||||||
STsdbCfg * pCfg = REPO_CFG(pRepo);
|
STsdbCfg * pCfg = REPO_CFG(pRepo);
|
||||||
SBlockData *pBlockData;
|
SBlockData *pBlockData;
|
||||||
int64_t offset = 0;
|
int64_t offset = 0;
|
||||||
STable * pTable = TSDB_COMMIT_TABLE(pCommith);
|
|
||||||
int rowsToWrite = pDataCols->numOfRows;
|
int rowsToWrite = pDataCols->numOfRows;
|
||||||
|
|
||||||
ASSERT(rowsToWrite > 0 && rowsToWrite <= pCfg->maxRowsPerFileBlock);
|
ASSERT(rowsToWrite > 0 && rowsToWrite <= pCfg->maxRowsPerFileBlock);
|
||||||
ASSERT((!isLast) || rowsToWrite < pCfg->minRowsPerFileBlock);
|
ASSERT((!isLast) || rowsToWrite < pCfg->minRowsPerFileBlock);
|
||||||
|
|
||||||
// Make buffer space
|
// Make buffer space
|
||||||
if (tsdbMakeRoom((void **)(&TSDB_COMMIT_BUF(pCommith)), TSDB_BLOCK_STATIS_SIZE(pDataCols->numOfCols)) < 0) {
|
if (tsdbMakeRoom(ppBuf, TSDB_BLOCK_STATIS_SIZE(pDataCols->numOfCols)) < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
pBlockData = (SBlockData *)TSDB_COMMIT_BUF(pCommith);
|
pBlockData = (SBlockData *)(*ppBuf);
|
||||||
|
|
||||||
// Get # of cols not all NULL(not including key column)
|
// Get # of cols not all NULL(not including key column)
|
||||||
int nColsNotAllNull = 0;
|
int nColsNotAllNull = 0;
|
||||||
|
@ -816,23 +958,23 @@ static int tsdbWriteBlock(SCommitH *pCommith, SDFile *pDFile, SDataCols *pDataCo
|
||||||
void * tptr;
|
void * tptr;
|
||||||
|
|
||||||
// Make room
|
// Make room
|
||||||
if (tsdbMakeRoom((void **)(&TSDB_COMMIT_BUF(pCommith)), lsize + tlen + COMP_OVERFLOW_BYTES + sizeof(TSCKSUM)) < 0) {
|
if (tsdbMakeRoom(ppBuf, lsize + tlen + COMP_OVERFLOW_BYTES + sizeof(TSCKSUM)) < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
pBlockData = (SBlockData *)TSDB_COMMIT_BUF(pCommith);
|
pBlockData = (SBlockData *)(*ppBuf);
|
||||||
pBlockCol = pBlockData->cols + tcol;
|
pBlockCol = pBlockData->cols + tcol;
|
||||||
tptr = POINTER_SHIFT(pBlockData, lsize);
|
tptr = POINTER_SHIFT(pBlockData, lsize);
|
||||||
|
|
||||||
if (pCfg->compression == TWO_STAGE_COMP &&
|
if (pCfg->compression == TWO_STAGE_COMP &&
|
||||||
tsdbMakeRoom((void **)(&TSDB_COMMIT_COMP_BUF(pCommith)), tlen + COMP_OVERFLOW_BYTES) < 0) {
|
tsdbMakeRoom(ppCBuf, tlen + COMP_OVERFLOW_BYTES) < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Compress or just copy
|
// Compress or just copy
|
||||||
if (pCfg->compression) {
|
if (pCfg->compression) {
|
||||||
flen = (*(tDataTypes[pDataCol->type].compFunc))((char *)pDataCol->pData, tlen, rowsToWrite, tptr,
|
flen = (*(tDataTypes[pDataCol->type].compFunc))((char *)pDataCol->pData, tlen, rowsToWrite, tptr,
|
||||||
tlen + COMP_OVERFLOW_BYTES, pCfg->compression,
|
tlen + COMP_OVERFLOW_BYTES, pCfg->compression, *ppCBuf,
|
||||||
TSDB_COMMIT_COMP_BUF(pCommith), tlen + COMP_OVERFLOW_BYTES);
|
tlen + COMP_OVERFLOW_BYTES);
|
||||||
} else {
|
} else {
|
||||||
flen = tlen;
|
flen = tlen;
|
||||||
memcpy(tptr, pDataCol->pData, flen);
|
memcpy(tptr, pDataCol->pData, flen);
|
||||||
|
@ -888,68 +1030,27 @@ static int tsdbWriteBlock(SCommitH *pCommith, SDFile *pDFile, SDataCols *pDataCo
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int tsdbWriteBlock(SCommitH *pCommith, SDFile *pDFile, SDataCols *pDataCols, SBlock *pBlock, bool isLast,
|
||||||
|
bool isSuper) {
|
||||||
|
return tsdbWriteBlockImpl(TSDB_COMMIT_REPO(pCommith), TSDB_COMMIT_TABLE(pCommith), pDFile, pDataCols, pBlock, isLast,
|
||||||
|
isSuper, (void **)(&(TSDB_COMMIT_BUF(pCommith))),
|
||||||
|
(void **)(&(TSDB_COMMIT_COMP_BUF(pCommith))));
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
static int tsdbWriteBlockInfo(SCommitH *pCommih) {
|
static int tsdbWriteBlockInfo(SCommitH *pCommih) {
|
||||||
SDFile * pHeadf = TSDB_COMMIT_HEAD_FILE(pCommih);
|
SDFile * pHeadf = TSDB_COMMIT_HEAD_FILE(pCommih);
|
||||||
SBlockIdx blkIdx;
|
SBlockIdx blkIdx;
|
||||||
STable * pTable = TSDB_COMMIT_TABLE(pCommih);
|
STable * pTable = TSDB_COMMIT_TABLE(pCommih);
|
||||||
SBlock * pBlock;
|
|
||||||
size_t nSupBlocks;
|
|
||||||
size_t nSubBlocks;
|
|
||||||
uint32_t tlen;
|
|
||||||
SBlockInfo *pBlkInfo;
|
|
||||||
int64_t offset;
|
|
||||||
|
|
||||||
nSupBlocks = taosArrayGetSize(pCommih->aSupBlk);
|
if (tsdbWriteBlockInfoImpl(pHeadf, pTable, pCommih->aSupBlk, pCommih->aSubBlk, (void **)(&(TSDB_COMMIT_BUF(pCommih))),
|
||||||
nSubBlocks = taosArrayGetSize(pCommih->aSubBlk);
|
&blkIdx) < 0) {
|
||||||
|
|
||||||
if (nSupBlocks <= 0) {
|
|
||||||
// No data (data all deleted)
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
tlen = (uint32_t)(sizeof(SBlockInfo) + sizeof(SBlock) * (nSupBlocks + nSubBlocks) + sizeof(TSCKSUM));
|
|
||||||
|
|
||||||
// Write SBlockInfo part
|
|
||||||
if (tsdbMakeRoom((void **)(&(TSDB_COMMIT_BUF(pCommih))), tlen) < 0) return -1;
|
|
||||||
pBlkInfo = TSDB_COMMIT_BUF(pCommih);
|
|
||||||
|
|
||||||
pBlkInfo->delimiter = TSDB_FILE_DELIMITER;
|
|
||||||
pBlkInfo->tid = TABLE_TID(pTable);
|
|
||||||
pBlkInfo->uid = TABLE_UID(pTable);
|
|
||||||
|
|
||||||
memcpy((void *)(pBlkInfo->blocks), taosArrayGet(pCommih->aSupBlk, 0), nSupBlocks * sizeof(SBlock));
|
|
||||||
if (nSubBlocks > 0) {
|
|
||||||
memcpy((void *)(pBlkInfo->blocks + nSupBlocks), taosArrayGet(pCommih->aSubBlk, 0), nSubBlocks * sizeof(SBlock));
|
|
||||||
|
|
||||||
for (int i = 0; i < nSupBlocks; i++) {
|
|
||||||
pBlock = pBlkInfo->blocks + i;
|
|
||||||
|
|
||||||
if (pBlock->numOfSubBlocks > 1) {
|
|
||||||
pBlock->offset += (sizeof(SBlockInfo) + sizeof(SBlock) * nSupBlocks);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
taosCalcChecksumAppend(0, (uint8_t *)pBlkInfo, tlen);
|
|
||||||
|
|
||||||
if (tsdbAppendDFile(pHeadf, TSDB_COMMIT_BUF(pCommih), tlen, &offset) < 0) {
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
tsdbUpdateDFileMagic(pHeadf, POINTER_SHIFT(pBlkInfo, tlen - sizeof(TSCKSUM)));
|
if (blkIdx.numOfBlocks == 0) {
|
||||||
|
return 0;
|
||||||
// Set blkIdx
|
}
|
||||||
pBlock = taosArrayGet(pCommih->aSupBlk, nSupBlocks - 1);
|
|
||||||
|
|
||||||
blkIdx.tid = TABLE_TID(pTable);
|
|
||||||
blkIdx.uid = TABLE_UID(pTable);
|
|
||||||
blkIdx.hasLast = pBlock->last ? 1 : 0;
|
|
||||||
blkIdx.maxKey = pBlock->keyLast;
|
|
||||||
blkIdx.numOfBlocks = (uint32_t)nSupBlocks;
|
|
||||||
blkIdx.len = tlen;
|
|
||||||
blkIdx.offset = (uint32_t)offset;
|
|
||||||
|
|
||||||
ASSERT(blkIdx.numOfBlocks > 0);
|
|
||||||
|
|
||||||
if (taosArrayPush(pCommih->aBlkIdx, (void *)(&blkIdx)) == NULL) {
|
if (taosArrayPush(pCommih->aBlkIdx, (void *)(&blkIdx)) == NULL) {
|
||||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||||
|
@ -959,49 +1060,6 @@ static int tsdbWriteBlockInfo(SCommitH *pCommih) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int tsdbWriteBlockIdx(SCommitH *pCommih) {
|
|
||||||
SBlockIdx *pBlkIdx = NULL;
|
|
||||||
SDFile * pHeadf = TSDB_COMMIT_HEAD_FILE(pCommih);
|
|
||||||
size_t nidx = taosArrayGetSize(pCommih->aBlkIdx);
|
|
||||||
int tlen = 0, size = 0;
|
|
||||||
int64_t offset = 0;
|
|
||||||
|
|
||||||
if (nidx <= 0) {
|
|
||||||
// All data are deleted
|
|
||||||
pHeadf->info.offset = 0;
|
|
||||||
pHeadf->info.len = 0;
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
for (size_t i = 0; i < nidx; i++) {
|
|
||||||
pBlkIdx = (SBlockIdx *)taosArrayGet(pCommih->aBlkIdx, i);
|
|
||||||
|
|
||||||
size = tsdbEncodeSBlockIdx(NULL, pBlkIdx);
|
|
||||||
if (tsdbMakeRoom((void **)(&TSDB_COMMIT_BUF(pCommih)), tlen + size) < 0) return -1;
|
|
||||||
|
|
||||||
void *ptr = POINTER_SHIFT(TSDB_COMMIT_BUF(pCommih), tlen);
|
|
||||||
tsdbEncodeSBlockIdx(&ptr, pBlkIdx);
|
|
||||||
|
|
||||||
tlen += size;
|
|
||||||
}
|
|
||||||
|
|
||||||
tlen += sizeof(TSCKSUM);
|
|
||||||
if (tsdbMakeRoom((void **)(&TSDB_COMMIT_BUF(pCommih)), tlen) < 0) return -1;
|
|
||||||
taosCalcChecksumAppend(0, (uint8_t *)TSDB_COMMIT_BUF(pCommih), tlen);
|
|
||||||
|
|
||||||
if (tsdbAppendDFile(pHeadf, TSDB_COMMIT_BUF(pCommih), tlen, &offset) < tlen) {
|
|
||||||
tsdbError("vgId:%d failed to write block index part to file %s since %s", TSDB_COMMIT_REPO_ID(pCommih),
|
|
||||||
TSDB_FILE_FULL_NAME(pHeadf), tstrerror(terrno));
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
tsdbUpdateDFileMagic(pHeadf, POINTER_SHIFT(TSDB_COMMIT_BUF(pCommih), tlen - sizeof(TSCKSUM)));
|
|
||||||
pHeadf->info.offset = (uint32_t)offset;
|
|
||||||
pHeadf->info.len = tlen;
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int tsdbCommitMemData(SCommitH *pCommith, SCommitIter *pIter, TSKEY keyLimit, bool toData) {
|
static int tsdbCommitMemData(SCommitH *pCommith, SCommitIter *pIter, TSKEY keyLimit, bool toData) {
|
||||||
STsdbRepo *pRepo = TSDB_COMMIT_REPO(pCommith);
|
STsdbRepo *pRepo = TSDB_COMMIT_REPO(pCommith);
|
||||||
STsdbCfg * pCfg = REPO_CFG(pRepo);
|
STsdbCfg * pCfg = REPO_CFG(pRepo);
|
||||||
|
@ -1454,45 +1512,3 @@ int tsdbApplyRtn(STsdbRepo *pRepo) {
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int tsdbApplyRtnOnFSet(STsdbRepo *pRepo, SDFileSet *pSet, SRtn *pRtn) {
|
|
||||||
SDiskID did;
|
|
||||||
SDFileSet nSet;
|
|
||||||
STsdbFS * pfs = REPO_FS(pRepo);
|
|
||||||
int level;
|
|
||||||
|
|
||||||
ASSERT(pSet->fid >= pRtn->minFid);
|
|
||||||
|
|
||||||
level = tsdbGetFidLevel(pSet->fid, pRtn);
|
|
||||||
|
|
||||||
tfsAllocDisk(level, &(did.level), &(did.id));
|
|
||||||
if (did.level == TFS_UNDECIDED_LEVEL) {
|
|
||||||
terrno = TSDB_CODE_TDB_NO_AVAIL_DISK;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (did.level > TSDB_FSET_LEVEL(pSet)) {
|
|
||||||
// Need to move the FSET to higher level
|
|
||||||
tsdbInitDFileSet(&nSet, did, REPO_ID(pRepo), pSet->fid, FS_TXN_VERSION(pfs));
|
|
||||||
|
|
||||||
if (tsdbCopyDFileSet(pSet, &nSet) < 0) {
|
|
||||||
tsdbError("vgId:%d failed to copy FSET %d from level %d to level %d since %s", REPO_ID(pRepo), pSet->fid,
|
|
||||||
TSDB_FSET_LEVEL(pSet), did.level, tstrerror(terrno));
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (tsdbUpdateDFileSet(pfs, &nSet) < 0) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
tsdbInfo("vgId:%d FSET %d is copied from level %d disk id %d to level %d disk id %d", REPO_ID(pRepo), pSet->fid,
|
|
||||||
TSDB_FSET_LEVEL(pSet), TSDB_FSET_ID(pSet), did.level, did.id);
|
|
||||||
} else {
|
|
||||||
// On a correct level
|
|
||||||
if (tsdbUpdateDFileSet(pfs, pSet) < 0) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
|
@ -26,8 +26,9 @@ typedef struct {
|
||||||
} SCommitQueue;
|
} SCommitQueue;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
TSDB_REQ_T req;
|
||||||
STsdbRepo *pRepo;
|
STsdbRepo *pRepo;
|
||||||
} SCommitReq;
|
} SReq;
|
||||||
|
|
||||||
static void *tsdbLoopCommit(void *arg);
|
static void *tsdbLoopCommit(void *arg);
|
||||||
|
|
||||||
|
@ -90,16 +91,17 @@ void tsdbDestroyCommitQueue() {
|
||||||
pthread_mutex_destroy(&(pQueue->lock));
|
pthread_mutex_destroy(&(pQueue->lock));
|
||||||
}
|
}
|
||||||
|
|
||||||
int tsdbScheduleCommit(STsdbRepo *pRepo) {
|
int tsdbScheduleCommit(STsdbRepo *pRepo, TSDB_REQ_T req) {
|
||||||
SCommitQueue *pQueue = &tsCommitQueue;
|
SCommitQueue *pQueue = &tsCommitQueue;
|
||||||
|
|
||||||
SListNode *pNode = (SListNode *)calloc(1, sizeof(SListNode) + sizeof(SCommitReq));
|
SListNode *pNode = (SListNode *)calloc(1, sizeof(SListNode) + sizeof(SReq));
|
||||||
if (pNode == NULL) {
|
if (pNode == NULL) {
|
||||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
((SCommitReq *)pNode->data)->pRepo = pRepo;
|
((SReq *)pNode->data)->req = req;
|
||||||
|
((SReq *)pNode->data)->pRepo = pRepo;
|
||||||
|
|
||||||
pthread_mutex_lock(&(pQueue->lock));
|
pthread_mutex_lock(&(pQueue->lock));
|
||||||
|
|
||||||
|
@ -154,6 +156,7 @@ static void *tsdbLoopCommit(void *arg) {
|
||||||
SCommitQueue *pQueue = &tsCommitQueue;
|
SCommitQueue *pQueue = &tsCommitQueue;
|
||||||
SListNode * pNode = NULL;
|
SListNode * pNode = NULL;
|
||||||
STsdbRepo * pRepo = NULL;
|
STsdbRepo * pRepo = NULL;
|
||||||
|
TSDB_REQ_T req;
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
pthread_mutex_lock(&(pQueue->lock));
|
pthread_mutex_lock(&(pQueue->lock));
|
||||||
|
@ -174,14 +177,22 @@ static void *tsdbLoopCommit(void *arg) {
|
||||||
|
|
||||||
pthread_mutex_unlock(&(pQueue->lock));
|
pthread_mutex_unlock(&(pQueue->lock));
|
||||||
|
|
||||||
pRepo = ((SCommitReq *)pNode->data)->pRepo;
|
req = ((SReq *)pNode->data)->req;
|
||||||
|
pRepo = ((SReq *)pNode->data)->pRepo;
|
||||||
|
|
||||||
// check if need to apply new config
|
// check if need to apply new config
|
||||||
if (pRepo->config_changed) {
|
if (pRepo->config_changed) {
|
||||||
tsdbApplyRepoConfig(pRepo);
|
tsdbApplyRepoConfig(pRepo);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (req == COMMIT_REQ) {
|
||||||
tsdbCommitData(pRepo);
|
tsdbCommitData(pRepo);
|
||||||
|
} else if (req == COMPACT_REQ) {
|
||||||
|
tsdbCompactImpl(pRepo);
|
||||||
|
} else {
|
||||||
|
ASSERT(0);
|
||||||
|
}
|
||||||
|
|
||||||
listNodeFree(pNode);
|
listNodeFree(pNode);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -12,3 +12,11 @@
|
||||||
* You should have received a copy of the GNU Affero General Public License
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
#include "tsdb.h"
|
||||||
|
|
||||||
|
#ifndef _TSDB_PLUGINS
|
||||||
|
|
||||||
|
int tsdbCompact(STsdbRepo *pRepo) { return 0; }
|
||||||
|
void *tsdbCompactImpl(STsdbRepo *pRepo) { return NULL; }
|
||||||
|
|
||||||
|
#endif
|
|
@ -288,7 +288,7 @@ int tsdbAsyncCommit(STsdbRepo *pRepo) {
|
||||||
if (tsdbLockRepo(pRepo) < 0) return -1;
|
if (tsdbLockRepo(pRepo) < 0) return -1;
|
||||||
pRepo->imem = pRepo->mem;
|
pRepo->imem = pRepo->mem;
|
||||||
pRepo->mem = NULL;
|
pRepo->mem = NULL;
|
||||||
tsdbScheduleCommit(pRepo);
|
tsdbScheduleCommit(pRepo, COMMIT_REQ);
|
||||||
if (tsdbUnlockRepo(pRepo) < 0) return -1;
|
if (tsdbUnlockRepo(pRepo) < 0) return -1;
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
|
|
@ -258,7 +258,7 @@ int tsdbLoadBlockData(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo) {
|
||||||
for (int i = 1; i < pBlock->numOfSubBlocks; i++) {
|
for (int i = 1; i < pBlock->numOfSubBlocks; i++) {
|
||||||
iBlock++;
|
iBlock++;
|
||||||
if (tsdbLoadBlockDataImpl(pReadh, iBlock, pReadh->pDCols[1]) < 0) return -1;
|
if (tsdbLoadBlockDataImpl(pReadh, iBlock, pReadh->pDCols[1]) < 0) return -1;
|
||||||
if (tdMergeDataCols(pReadh->pDCols[0], pReadh->pDCols[1], pReadh->pDCols[1]->numOfRows) < 0) return -1;
|
if (tdMergeDataCols(pReadh->pDCols[0], pReadh->pDCols[1], pReadh->pDCols[1]->numOfRows, NULL) < 0) return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
ASSERT(pReadh->pDCols[0]->numOfRows == pBlock->numOfRows);
|
ASSERT(pReadh->pDCols[0]->numOfRows == pBlock->numOfRows);
|
||||||
|
@ -284,7 +284,7 @@ int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo,
|
||||||
for (int i = 1; i < pBlock->numOfSubBlocks; i++) {
|
for (int i = 1; i < pBlock->numOfSubBlocks; i++) {
|
||||||
iBlock++;
|
iBlock++;
|
||||||
if (tsdbLoadBlockDataColsImpl(pReadh, iBlock, pReadh->pDCols[1], colIds, numOfColsIds) < 0) return -1;
|
if (tsdbLoadBlockDataColsImpl(pReadh, iBlock, pReadh->pDCols[1], colIds, numOfColsIds) < 0) return -1;
|
||||||
if (tdMergeDataCols(pReadh->pDCols[0], pReadh->pDCols[1], pReadh->pDCols[1]->numOfRows) < 0) return -1;
|
if (tdMergeDataCols(pReadh->pDCols[0], pReadh->pDCols[1], pReadh->pDCols[1]->numOfRows, NULL) < 0) return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
ASSERT(pReadh->pDCols[0]->numOfRows == pBlock->numOfRows);
|
ASSERT(pReadh->pDCols[0]->numOfRows == pBlock->numOfRows);
|
||||||
|
|
Loading…
Reference in New Issue