more code
This commit is contained in:
parent
5ea8f50ec6
commit
0e4b0459bd
|
@ -262,7 +262,7 @@ int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pS
|
|||
int32_t tsdbDataFWriterClose(SDataFWriter **ppWriter, int8_t sync);
|
||||
int32_t tsdbUpdateDFileSetHeader(SDataFWriter *pWriter);
|
||||
int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SArray *aBlockIdx);
|
||||
int32_t tsdbWriteBlock(SDataFWriter *pWriter, SMapData *pMapData, SBlockIdx *pBlockIdx);
|
||||
int32_t tsdbWriteDataBlk(SDataFWriter *pWriter, SMapData *mDataBlk, SBlockIdx *pBlockIdx);
|
||||
int32_t tsdbWriteSttBlk(SDataFWriter *pWriter, SArray *aSttBlk);
|
||||
int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlockInfo *pBlkInfo, SSmaInfo *pSmaInfo,
|
||||
int8_t cmprAlg, int8_t toLast);
|
||||
|
@ -272,7 +272,7 @@ int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo);
|
|||
int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pSet);
|
||||
int32_t tsdbDataFReaderClose(SDataFReader **ppReader);
|
||||
int32_t tsdbReadBlockIdx(SDataFReader *pReader, SArray *aBlockIdx);
|
||||
int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *pMapData);
|
||||
int32_t tsdbReadDataBlk(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *mDataBlk);
|
||||
int32_t tsdbReadSttBlk(SDataFReader *pReader, int32_t iStt, SArray *aSttBlk);
|
||||
int32_t tsdbReadBlockSma(SDataFReader *pReader, SDataBlk *pBlock, SArray *aColumnDataAgg);
|
||||
int32_t tsdbReadDataBlock(SDataFReader *pReader, SDataBlk *pBlock, SBlockData *pBlockData);
|
||||
|
|
|
@ -589,7 +589,7 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) {
|
|||
}
|
||||
|
||||
tMapDataReset(&state->blockMap);
|
||||
code = tsdbReadBlock(state->pDataFReader, state->pBlockIdx, &state->blockMap);
|
||||
code = tsdbReadDataBlk(state->pDataFReader, state->pBlockIdx, &state->blockMap);
|
||||
if (code) goto _err;
|
||||
|
||||
state->nBlock = state->blockMap.nItem;
|
||||
|
|
|
@ -377,7 +377,7 @@ static int32_t tsdbCommitterNextTableData(SCommitter *pCommitter) {
|
|||
pCommitter->dReader.pBlockIdx =
|
||||
(SBlockIdx *)taosArrayGet(pCommitter->dReader.aBlockIdx, pCommitter->dReader.iBlockIdx);
|
||||
|
||||
code = tsdbReadBlock(pCommitter->dReader.pReader, pCommitter->dReader.pBlockIdx, &pCommitter->dReader.mBlock);
|
||||
code = tsdbReadDataBlk(pCommitter->dReader.pReader, pCommitter->dReader.pBlockIdx, &pCommitter->dReader.mBlock);
|
||||
if (code) goto _exit;
|
||||
|
||||
ASSERT(pCommitter->dReader.mBlock.nItem > 0);
|
||||
|
@ -493,7 +493,7 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
|
|||
pCommitter->dReader.iBlockIdx = 0;
|
||||
if (taosArrayGetSize(pCommitter->dReader.aBlockIdx) > 0) {
|
||||
pCommitter->dReader.pBlockIdx = (SBlockIdx *)taosArrayGet(pCommitter->dReader.aBlockIdx, 0);
|
||||
code = tsdbReadBlock(pCommitter->dReader.pReader, pCommitter->dReader.pBlockIdx, &pCommitter->dReader.mBlock);
|
||||
code = tsdbReadDataBlk(pCommitter->dReader.pReader, pCommitter->dReader.pBlockIdx, &pCommitter->dReader.mBlock);
|
||||
if (code) goto _err;
|
||||
} else {
|
||||
pCommitter->dReader.pBlockIdx = NULL;
|
||||
|
@ -688,7 +688,7 @@ static int32_t tsdbMoveCommitData(SCommitter *pCommitter, TABLEID toTable) {
|
|||
|
||||
while (pCommitter->dReader.pBlockIdx && tTABLEIDCmprFn(pCommitter->dReader.pBlockIdx, &toTable) < 0) {
|
||||
SBlockIdx blockIdx = *pCommitter->dReader.pBlockIdx;
|
||||
code = tsdbWriteBlock(pCommitter->dWriter.pWriter, &pCommitter->dReader.mBlock, &blockIdx);
|
||||
code = tsdbWriteDataBlk(pCommitter->dWriter.pWriter, &pCommitter->dReader.mBlock, &blockIdx);
|
||||
if (code) goto _err;
|
||||
|
||||
if (taosArrayPush(pCommitter->dWriter.aBlockIdx, &blockIdx) == NULL) {
|
||||
|
@ -1451,7 +1451,7 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) {
|
|||
// end
|
||||
if (pCommitter->dWriter.mBlock.nItem > 0) {
|
||||
SBlockIdx blockIdx = {.suid = id.suid, .uid = id.uid};
|
||||
code = tsdbWriteBlock(pCommitter->dWriter.pWriter, &pCommitter->dWriter.mBlock, &blockIdx);
|
||||
code = tsdbWriteDataBlk(pCommitter->dWriter.pWriter, &pCommitter->dWriter.mBlock, &blockIdx);
|
||||
if (code) goto _err;
|
||||
|
||||
if (taosArrayPush(pCommitter->dWriter.aBlockIdx, &blockIdx) == NULL) {
|
||||
|
|
|
@ -16,7 +16,7 @@
|
|||
#include "osDef.h"
|
||||
#include "tsdb.h"
|
||||
|
||||
#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC)
|
||||
#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC)
|
||||
|
||||
typedef enum {
|
||||
EXTERNAL_ROWS_PREV = 0x1,
|
||||
|
@ -81,11 +81,11 @@ typedef struct SBlockLoadSuppInfo {
|
|||
} SBlockLoadSuppInfo;
|
||||
|
||||
typedef struct SLastBlockReader {
|
||||
STimeWindow window;
|
||||
SVersionRange verRange;
|
||||
int32_t order;
|
||||
uint64_t uid;
|
||||
SMergeTree mergeTree;
|
||||
STimeWindow window;
|
||||
SVersionRange verRange;
|
||||
int32_t order;
|
||||
uint64_t uid;
|
||||
SMergeTree mergeTree;
|
||||
SSttBlockLoadInfo* pInfo;
|
||||
} SLastBlockReader;
|
||||
|
||||
|
@ -229,10 +229,10 @@ static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableK
|
|||
STableBlockScanInfo info = {.lastKey = 0, .uid = idList[j].uid};
|
||||
if (ASCENDING_TRAVERSE(pTsdbReader->order)) {
|
||||
int64_t skey = pTsdbReader->window.skey;
|
||||
info.lastKey = (skey > INT64_MIN)? (skey - 1):skey;
|
||||
info.lastKey = (skey > INT64_MIN) ? (skey - 1) : skey;
|
||||
} else {
|
||||
int64_t ekey = pTsdbReader->window.ekey;
|
||||
info.lastKey = (ekey < INT64_MAX)? (ekey + 1):ekey;
|
||||
info.lastKey = (ekey < INT64_MAX) ? (ekey + 1) : ekey;
|
||||
}
|
||||
|
||||
taosHashPut(pTableMap, &info.uid, sizeof(uint64_t), &info, sizeof(info));
|
||||
|
@ -596,7 +596,7 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN
|
|||
STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(int64_t));
|
||||
|
||||
tMapDataReset(&pScanInfo->mapData);
|
||||
tsdbReadBlock(pReader->pFileReader, pBlockIdx, &pScanInfo->mapData);
|
||||
tsdbReadDataBlk(pReader->pFileReader, pBlockIdx, &pScanInfo->mapData);
|
||||
|
||||
sizeInDisk += pScanInfo->mapData.nData;
|
||||
for (int32_t j = 0; j < pScanInfo->mapData.nItem; ++j) {
|
||||
|
@ -1385,7 +1385,7 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
|
|||
STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData,
|
||||
bool mergeBlockData) {
|
||||
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
|
||||
int64_t tsLastBlock = getCurrentKeyInLastBlock(pLastBlockReader);
|
||||
int64_t tsLastBlock = getCurrentKeyInLastBlock(pLastBlockReader);
|
||||
|
||||
STSRow* pTSRow = NULL;
|
||||
SRowMerger merge = {0};
|
||||
|
@ -1886,7 +1886,7 @@ static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScan
|
|||
initMemDataIterator(pScanInfo, pReader);
|
||||
pLBlockReader->uid = pScanInfo->uid;
|
||||
|
||||
int32_t step = ASCENDING_TRAVERSE(pLBlockReader->order)? 1:-1;
|
||||
int32_t step = ASCENDING_TRAVERSE(pLBlockReader->order) ? 1 : -1;
|
||||
STimeWindow w = pLBlockReader->window;
|
||||
if (ASCENDING_TRAVERSE(pLBlockReader->order)) {
|
||||
w.skey = pScanInfo->lastKey + step;
|
||||
|
@ -3559,7 +3559,7 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
|
|||
initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader);
|
||||
resetDataBlockIterator(&pReader->status.blockIter, pReader->order);
|
||||
|
||||
int64_t ts = ASCENDING_TRAVERSE(pReader->order)?pReader->window.skey-1:pReader->window.ekey+1;
|
||||
int64_t ts = ASCENDING_TRAVERSE(pReader->order) ? pReader->window.skey - 1 : pReader->window.ekey + 1;
|
||||
resetDataBlockScanInfo(pReader->status.pTableMap, ts);
|
||||
|
||||
int32_t code = 0;
|
||||
|
|
|
@ -418,21 +418,21 @@ _err:
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbWriteBlock(SDataFWriter *pWriter, SMapData *mBlock, SBlockIdx *pBlockIdx) {
|
||||
int32_t tsdbWriteDataBlk(SDataFWriter *pWriter, SMapData *mDataBlk, SBlockIdx *pBlockIdx) {
|
||||
int32_t code = 0;
|
||||
SHeadFile *pHeadFile = &pWriter->fHead;
|
||||
int64_t size;
|
||||
int64_t n;
|
||||
|
||||
ASSERT(mBlock->nItem > 0);
|
||||
ASSERT(mDataBlk->nItem > 0);
|
||||
|
||||
// alloc
|
||||
size = tPutMapData(NULL, mBlock);
|
||||
size = tPutMapData(NULL, mDataBlk);
|
||||
code = tRealloc(&pWriter->aBuf[0], size);
|
||||
if (code) goto _err;
|
||||
|
||||
// build
|
||||
n = tPutMapData(pWriter->aBuf[0], mBlock);
|
||||
n = tPutMapData(pWriter->aBuf[0], mDataBlk);
|
||||
|
||||
// write
|
||||
code = tsdbWriteFile(pWriter->pHeadFD, pHeadFile->size, pWriter->aBuf[0], size);
|
||||
|
@ -446,7 +446,7 @@ int32_t tsdbWriteBlock(SDataFWriter *pWriter, SMapData *mBlock, SBlockIdx *pBloc
|
|||
tsdbTrace("vgId:%d, write block, file ID:%d commit ID:%d suid:%" PRId64 " uid:%" PRId64 " offset:%" PRId64
|
||||
" size:%" PRId64 " nItem:%d",
|
||||
TD_VID(pWriter->pTsdb->pVnode), pWriter->wSet.fid, pHeadFile->commitID, pBlockIdx->suid, pBlockIdx->uid,
|
||||
pBlockIdx->offset, pBlockIdx->size, mBlock->nItem);
|
||||
pBlockIdx->offset, pBlockIdx->size, mDataBlk->nItem);
|
||||
return code;
|
||||
|
||||
_err:
|
||||
|
@ -872,7 +872,7 @@ _err:
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *mBlock) {
|
||||
int32_t tsdbReadDataBlk(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *mDataBlk) {
|
||||
int32_t code = 0;
|
||||
int64_t offset = pBlockIdx->offset;
|
||||
int64_t size = pBlockIdx->size;
|
||||
|
@ -886,7 +886,7 @@ int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *mBl
|
|||
if (code) goto _err;
|
||||
|
||||
// decode
|
||||
int64_t n = tGetMapData(pReader->aBuf[0], mBlock);
|
||||
int64_t n = tGetMapData(pReader->aBuf[0], mDataBlk);
|
||||
if (n < 0) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
|
|
|
@ -91,7 +91,7 @@ static int32_t tsdbSnapReadOpenFile(STsdbSnapReader* pReader) {
|
|||
for (pIter->iBlockIdx = 0; pIter->iBlockIdx < taosArrayGetSize(pIter->aBlockIdx); pIter->iBlockIdx++) {
|
||||
pIter->pBlockIdx = (SBlockIdx*)taosArrayGet(pIter->aBlockIdx, pIter->iBlockIdx);
|
||||
|
||||
code = tsdbReadBlock(pReader->pDataFReader, pIter->pBlockIdx, &pIter->mBlock);
|
||||
code = tsdbReadDataBlk(pReader->pDataFReader, pIter->pBlockIdx, &pIter->mBlock);
|
||||
if (code) goto _err;
|
||||
|
||||
for (pIter->iBlock = 0; pIter->iBlock < pIter->mBlock.nItem; pIter->iBlock++) {
|
||||
|
@ -211,7 +211,7 @@ static int32_t tsdbSnapNextRow(STsdbSnapReader* pReader) {
|
|||
if (pIter->iBlockIdx >= taosArrayGetSize(pIter->aBlockIdx)) break;
|
||||
|
||||
pIter->pBlockIdx = (SBlockIdx*)taosArrayGet(pIter->aBlockIdx, pIter->iBlockIdx);
|
||||
code = tsdbReadBlock(pReader->pDataFReader, pIter->pBlockIdx, &pIter->mBlock);
|
||||
code = tsdbReadDataBlk(pReader->pDataFReader, pIter->pBlockIdx, &pIter->mBlock);
|
||||
if (code) goto _err;
|
||||
pIter->iBlock = -1;
|
||||
}
|
||||
|
@ -725,7 +725,7 @@ static int32_t tsdbSnapWriteTableDataEnd(STsdbSnapWriter* pWriter) {
|
|||
}
|
||||
|
||||
// SDataBlk
|
||||
// code = tsdbWriteBlock(pWriter->pDataFWriter, &pWriter->mBlockW, NULL, pWriter->pBlockIdxW);
|
||||
// code = tsdbWriteDataBlk(pWriter->pDataFWriter, &pWriter->mBlockW, NULL, pWriter->pBlockIdxW);
|
||||
// if (code) goto _err;
|
||||
|
||||
// SBlockIdx
|
||||
|
@ -747,7 +747,7 @@ _err:
|
|||
static int32_t tsdbSnapMoveWriteTableData(STsdbSnapWriter* pWriter, SBlockIdx* pBlockIdx) {
|
||||
int32_t code = 0;
|
||||
|
||||
code = tsdbReadBlock(pWriter->pDataFReader, pBlockIdx, &pWriter->mBlock);
|
||||
code = tsdbReadDataBlk(pWriter->pDataFReader, pBlockIdx, &pWriter->mBlock);
|
||||
if (code) goto _err;
|
||||
|
||||
// SBlockData
|
||||
|
@ -774,7 +774,7 @@ static int32_t tsdbSnapMoveWriteTableData(STsdbSnapWriter* pWriter, SBlockIdx* p
|
|||
|
||||
// SDataBlk
|
||||
SBlockIdx blockIdx = {.suid = pBlockIdx->suid, .uid = pBlockIdx->uid};
|
||||
code = tsdbWriteBlock(pWriter->pDataFWriter, &pWriter->mBlockW, &blockIdx);
|
||||
code = tsdbWriteDataBlk(pWriter->pDataFWriter, &pWriter->mBlockW, &blockIdx);
|
||||
if (code) goto _err;
|
||||
|
||||
// SBlockIdx
|
||||
|
@ -987,7 +987,7 @@ static int32_t tsdbSnapWriteTableData(STsdbSnapWriter* pWriter, TABLEID id) {
|
|||
}
|
||||
|
||||
if (pWriter->pBlockIdx) {
|
||||
code = tsdbReadBlock(pWriter->pDataFReader, pWriter->pBlockIdx, &pWriter->mBlock);
|
||||
code = tsdbReadDataBlk(pWriter->pDataFReader, pWriter->pBlockIdx, &pWriter->mBlock);
|
||||
if (code) goto _err;
|
||||
} else {
|
||||
tMapDataReset(&pWriter->mBlock);
|
||||
|
@ -1074,18 +1074,55 @@ _err:
|
|||
}
|
||||
#endif
|
||||
|
||||
static int32_t tsdbSnapWriteCloseFile(STsdbSnapWriter* pWriter);
|
||||
static int32_t tsdbSnapNextTableData(STsdbSnapWriter* pWriter) {
|
||||
int32_t code = 0;
|
||||
|
||||
pWriter->dReader.iBlockIdx++;
|
||||
if (pWriter->dReader.iBlockIdx < taosArrayGetSize(pWriter->dReader.aBlockIdx)) {
|
||||
pWriter->dReader.pBlockIdx = (SBlockIdx*)taosArrayGet(pWriter->dReader.aBlockIdx, pWriter->dReader.iBlockIdx);
|
||||
|
||||
code = tsdbReadDataBlk(pWriter->dReader.pReader, pWriter->dReader.pBlockIdx, &pWriter->dReader.mDataBlk);
|
||||
if (code) goto _exit;
|
||||
|
||||
pWriter->dReader.iDataBlk = -1;
|
||||
tBlockDataReset(&pWriter->dReader.bData);
|
||||
pWriter->dReader.iRow = 0;
|
||||
} else {
|
||||
pWriter->dReader.pBlockIdx = NULL;
|
||||
}
|
||||
|
||||
_exit:
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbSnapWriteCopyData(STsdbSnapWriter* pWriter, TABLEID* pId) {
|
||||
int32_t code = 0;
|
||||
|
||||
while (true) {
|
||||
if (pWriter->dReader.pBlockIdx == NULL) break;
|
||||
if (tTABLEIDCmprFn(pWriter->dReader.pBlockIdx, pId) >= 0) break;
|
||||
|
||||
SBlockIdx blkIdx = *pWriter->dReader.pBlockIdx;
|
||||
code = tsdbWriteDataBlk(pWriter->dWriter.pWriter, &pWriter->dReader.mDataBlk, &blkIdx);
|
||||
if (code) goto _exit;
|
||||
|
||||
if (taosArrayPush(pWriter->dWriter.aBlockIdx, &blkIdx) == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
code = tsdbSnapNextTableData(pWriter);
|
||||
if (code) goto _exit;
|
||||
}
|
||||
|
||||
_exit:
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbSnapWriteOpenFile(STsdbSnapWriter* pWriter, int32_t fid) {
|
||||
int32_t code = 0;
|
||||
STsdb* pTsdb = pWriter->pTsdb;
|
||||
|
||||
// close last file if need
|
||||
if (pWriter->dWriter.pWriter) {
|
||||
ASSERT(fid > pWriter->fid);
|
||||
code = tsdbSnapWriteCloseFile(pWriter);
|
||||
if (code) goto _err;
|
||||
}
|
||||
|
||||
ASSERT(pWriter->dWriter.pWriter == NULL);
|
||||
|
||||
// open new
|
||||
|
@ -1139,49 +1176,18 @@ _err:
|
|||
|
||||
static int32_t tsdbSnapWriteCloseFile(STsdbSnapWriter* pWriter) {
|
||||
int32_t code = 0;
|
||||
// TODO
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbSnapNextTableData(STsdbSnapWriter* pWriter) {
|
||||
int32_t code = 0;
|
||||
ASSERT(pWriter->dWriter.pWriter);
|
||||
|
||||
pWriter->dReader.iBlockIdx++;
|
||||
if (pWriter->dReader.iBlockIdx < taosArrayGetSize(pWriter->dReader.aBlockIdx)) {
|
||||
pWriter->dReader.pBlockIdx = (SBlockIdx*)taosArrayGet(pWriter->dReader.aBlockIdx, pWriter->dReader.iBlockIdx);
|
||||
// (todo)
|
||||
|
||||
code = tsdbReadBlock(pWriter->dReader.pReader, pWriter->dReader.pBlockIdx, &pWriter->dReader.mDataBlk);
|
||||
if (code) goto _exit;
|
||||
// copy remain table data
|
||||
TABLEID id = {.suid = INT64_MAX, .uid = INT64_MAX};
|
||||
code = tsdbSnapWriteCopyData(pWriter, &id);
|
||||
if (code) goto _exit;
|
||||
|
||||
pWriter->dReader.iDataBlk = -1;
|
||||
tBlockDataReset(&pWriter->dReader.bData);
|
||||
pWriter->dReader.iRow = 0;
|
||||
} else {
|
||||
pWriter->dReader.pBlockIdx = NULL;
|
||||
}
|
||||
|
||||
_exit:
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbSnapWriteCopyData(STsdbSnapWriter* pWriter, TABLEID* pId) {
|
||||
int32_t code = 0;
|
||||
|
||||
while (true) {
|
||||
if (pWriter->dReader.pBlockIdx == NULL) break;
|
||||
if (tTABLEIDCmprFn(pWriter->dReader.pBlockIdx, pId) >= 0) break;
|
||||
|
||||
SBlockIdx blkIdx = *pWriter->dReader.pBlockIdx;
|
||||
code = tsdbWriteBlock(pWriter->dWriter.pWriter, &pWriter->dReader.mDataBlk, &blkIdx);
|
||||
if (code) goto _exit;
|
||||
|
||||
if (taosArrayPush(pWriter->dWriter.aBlockIdx, &blkIdx) == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
code = tsdbSnapNextTableData(pWriter);
|
||||
if (code) goto _exit;
|
||||
if (pWriter->dWriter.sData.nRow > 0) {
|
||||
// TODO: write the last block
|
||||
}
|
||||
|
||||
_exit:
|
||||
|
@ -1299,10 +1305,17 @@ static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint3
|
|||
|
||||
// Loop to handle each row
|
||||
for (int32_t iRow = 0; iRow < pBlockData->nRow; iRow++) {
|
||||
// open file if need
|
||||
TSKEY ts = pBlockData->aTSKEY[iRow];
|
||||
int32_t fid = tsdbKeyFid(ts, pWriter->minutes, pWriter->precision);
|
||||
|
||||
if (pWriter->dWriter.pWriter == NULL || pWriter->fid != fid) {
|
||||
if (pWriter->dWriter.pWriter) {
|
||||
ASSERT(fid > pWriter->fid);
|
||||
|
||||
code = tsdbSnapWriteCloseFile(pWriter);
|
||||
if (code) goto _err;
|
||||
}
|
||||
|
||||
code = tsdbSnapWriteOpenFile(pWriter, fid);
|
||||
if (code) goto _err;
|
||||
}
|
||||
|
@ -1311,8 +1324,6 @@ static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint3
|
|||
if (code) goto _err;
|
||||
}
|
||||
|
||||
// tsdbInfo("vgId:%d, vnode snapshot tsdb write data for %s, fid:%d suid:%" PRId64 " uid:%" PRId64 " nRow:%d",
|
||||
// TD_VID(pTsdb->pVnode), pTsdb->path, fid, id.suid, id.suid, pBlockData->nRow);
|
||||
return code;
|
||||
|
||||
_err:
|
||||
|
|
Loading…
Reference in New Issue